Compare commits
2 Commits
f7465517e6
...
8d5ba035a9
Author | SHA1 | Date |
---|---|---|
|
8d5ba035a9 | |
|
f4320efbaa |
|
@ -3,6 +3,7 @@ from fetcher.model.feed import db, Registros
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from shutil import copyfile
|
from shutil import copyfile
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
import requests
|
import requests
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
@ -18,7 +19,7 @@ handler.setFormatter(formatter)
|
||||||
log.addHandler(handler)
|
log.addHandler(handler)
|
||||||
|
|
||||||
def load_proto():
|
def load_proto():
|
||||||
inicio = datetime.now()
|
inicio = datetime.now(tz=ZoneInfo("America/Santiago"))
|
||||||
final_dir = "static/ccp/{}/{}/{}".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"))
|
final_dir = "static/ccp/{}/{}/{}".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"))
|
||||||
final_name = "static/ccp/{}/{}/{}/ccp_gtfs_{}.proto".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"), inicio.strftime("%Y%m%d_%H%M_%S"))
|
final_name = "static/ccp/{}/{}/{}/ccp_gtfs_{}.proto".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"), inicio.strftime("%Y%m%d_%H%M_%S"))
|
||||||
|
|
||||||
|
@ -32,10 +33,10 @@ def load_proto():
|
||||||
nuevo_registro = Registros(filename=final_name, status=0)
|
nuevo_registro = Registros(filename=final_name, status=0)
|
||||||
db.add(nuevo_registro)
|
db.add(nuevo_registro)
|
||||||
db.commit()
|
db.commit()
|
||||||
log.info("Fetched GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now()-inicio).total_seconds()))
|
log.info("Fetched GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
|
||||||
except:
|
except:
|
||||||
import traceback
|
import traceback
|
||||||
log.error("Failed to fetch GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now()-inicio).total_seconds()))
|
log.error("Failed to fetch GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
|
||||||
log.info('Traceback {}'.format(traceback.format_exc()))
|
log.info('Traceback {}'.format(traceback.format_exc()))
|
||||||
nuevo_registro = Registros(filename=final_name, status=100)
|
nuevo_registro = Registros(filename=final_name, status=100)
|
||||||
db.add(nuevo_registro)
|
db.add(nuevo_registro)
|
||||||
|
|
7
sync.py
7
sync.py
|
@ -1,11 +1,13 @@
|
||||||
# coding: utf-8
|
# coding: utf-8
|
||||||
from updater.model.feed import init_db, db, engine
|
from updater.model.feed import init_db, db, engine
|
||||||
from updater.main import sync_db, log
|
from updater.main import sync_db, log, estado_maximo
|
||||||
import time
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
init_db(db, engine)
|
init_db(db, engine)
|
||||||
doki = int(time.time()) + 600
|
doki = int(time.time()) + 600
|
||||||
|
time.sleep(random.randint(1, 30)) #random wait
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if not sync_db():
|
if not sync_db():
|
||||||
|
@ -13,6 +15,7 @@ if __name__ == '__main__':
|
||||||
if i >= doki:
|
if i >= doki:
|
||||||
doki = i + 600
|
doki = i + 600
|
||||||
log.info('Heartbeat')
|
log.info('Heartbeat')
|
||||||
time.sleep(20)
|
if estado_maximo == 0:
|
||||||
|
time.sleep(random.randint(10, 30)) #more randomness
|
||||||
else:
|
else:
|
||||||
doki = int(time.time()) + 600
|
doki = int(time.time()) + 600
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
from updater.model.feed import db, Registros, Vehiculo, Viaje, Entidades, Posicion
|
from updater.model.feed import db, Registros, Vehiculo, Viaje, Entidades, Posicion
|
||||||
import utils.gtfs_realtime_pb2
|
import utils.gtfs_realtime_pb2
|
||||||
from time import strftime
|
from time import strftime
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
@ -17,16 +18,34 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
log.addHandler(handler)
|
log.addHandler(handler)
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
estado_maximo = int(os.environ.get('GTFS_RT_FEED_MAXSTATUS'))
|
||||||
|
except:
|
||||||
|
log.warning("failed to load GTFS_RT_FEED_MAXSTATUS enviroment variable, setting default value")
|
||||||
|
estado_maximo = 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
antiguedad_minima = int(os.environ.get('GTFS_RT_MIN_DELAY'))
|
||||||
|
except:
|
||||||
|
log.warning("failed to load GTFS_RT_MIN_DELAY enviroment variable, setting default value")
|
||||||
|
antiguedad_minima = 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def sync_db():
|
def sync_db():
|
||||||
inicio = datetime.now()
|
inicio = datetime.now(tz=ZoneInfo("America/Santiago"))
|
||||||
registrodb = db.query(Registros).filter(Registros.status<5).order_by(Registros.id.asc()).first()
|
|
||||||
|
ultima_modificacion = inicio - timedelta(0,antiguedad_minima)
|
||||||
|
|
||||||
|
|
||||||
|
registrodb = db.query(Registros).filter(Registros.status<=estado_maximo, Registros.timestamp<ultima_modificacion).order_by(Registros.id.asc()).first()
|
||||||
if registrodb is None:
|
if registrodb is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
log.info("Record {} has status {}".format(registrodb.filename, registrodb.status))
|
|
||||||
|
|
||||||
registrodb.status = registrodb.status + 1
|
registrodb.status = registrodb.status + 1
|
||||||
db.commit()
|
db.commit()
|
||||||
|
log.info("Record {} has status {}".format(registrodb.filename, registrodb.status))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
feed = utils.gtfs_realtime_pb2.FeedMessage()
|
feed = utils.gtfs_realtime_pb2.FeedMessage()
|
||||||
|
@ -41,9 +60,9 @@ def sync_db():
|
||||||
if patentedb is None:
|
if patentedb is None:
|
||||||
patentedb = Vehiculo(patente=patente, timestamp=itemts)
|
patentedb = Vehiculo(patente=patente, timestamp=itemts)
|
||||||
db.add(patentedb)
|
db.add(patentedb)
|
||||||
|
db.commit()
|
||||||
elif patentedb.timestamp < itemts:
|
elif patentedb.timestamp < itemts:
|
||||||
patentedb.timestamp = itemts
|
patentedb.timestamp = itemts
|
||||||
db.commit()
|
|
||||||
|
|
||||||
if item.HasField('trip_update'):
|
if item.HasField('trip_update'):
|
||||||
tripid = item.trip_update.trip.trip_id
|
tripid = item.trip_update.trip.trip_id
|
||||||
|
@ -58,9 +77,9 @@ def sync_db():
|
||||||
|
|
||||||
tripdb = Viaje(trip_id=tripid, route_id=routeid, direction_id=direccionid, start_time=tstar, start_date=dstart, timestamp=itemts)
|
tripdb = Viaje(trip_id=tripid, route_id=routeid, direction_id=direccionid, start_time=tstar, start_date=dstart, timestamp=itemts)
|
||||||
db.add(tripdb)
|
db.add(tripdb)
|
||||||
|
db.commit()
|
||||||
elif tripdb.timestamp < itemts:
|
elif tripdb.timestamp < itemts:
|
||||||
tripdb.timestamp = itemts
|
tripdb.timestamp = itemts
|
||||||
db.commit()
|
|
||||||
tripdbid = tripdb.id
|
tripdbid = tripdb.id
|
||||||
else:
|
else:
|
||||||
tripdbid = None
|
tripdbid = None
|
||||||
|
@ -69,9 +88,9 @@ def sync_db():
|
||||||
if entidadesdb is None:
|
if entidadesdb is None:
|
||||||
entidadesdb = Entidades(entity=str(item.id), vehicleid=patentedb.id, tripid=tripdbid, timestamp=itemts)
|
entidadesdb = Entidades(entity=str(item.id), vehicleid=patentedb.id, tripid=tripdbid, timestamp=itemts)
|
||||||
db.add(entidadesdb)
|
db.add(entidadesdb)
|
||||||
|
db.commit()
|
||||||
elif entidadesdb.timestamp < itemts:
|
elif entidadesdb.timestamp < itemts:
|
||||||
entidadesdb.timestamp = itemts
|
entidadesdb.timestamp = itemts
|
||||||
db.commit()
|
|
||||||
|
|
||||||
posiciondb = db.query(Posicion).filter(Posicion.vehicleid==patentedb.id, Posicion.timestamp==itemts).one_or_none()
|
posiciondb = db.query(Posicion).filter(Posicion.vehicleid==patentedb.id, Posicion.timestamp==itemts).one_or_none()
|
||||||
if posiciondb is None:
|
if posiciondb is None:
|
||||||
|
@ -87,9 +106,9 @@ def sync_db():
|
||||||
|
|
||||||
registrodb.status=50
|
registrodb.status=50
|
||||||
db.commit()
|
db.commit()
|
||||||
log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now()-inicio).total_seconds()))
|
log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
|
||||||
return True
|
return True
|
||||||
except:
|
except:
|
||||||
import traceback
|
import traceback
|
||||||
log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now()-inicio).total_seconds(), registrodb.status))
|
log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds(), registrodb.status))
|
||||||
log.info('Traceback {}'.format(traceback.format_exc()))
|
log.info('Traceback {}'.format(traceback.format_exc()))
|
||||||
|
|
Loading…
Reference in New Issue