Compare commits

...

2 Commits

Author SHA1 Message Date
ifiguero 8d5ba035a9 double ingester 2023-10-01 21:28:17 -03:00
ifiguero f4320efbaa fixes 2023-09-29 18:46:55 -03:00
3 changed files with 38 additions and 15 deletions

View File

@ -3,6 +3,7 @@ from fetcher.model.feed import db, Registros
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from shutil import copyfile from shutil import copyfile
from datetime import datetime from datetime import datetime
from zoneinfo import ZoneInfo
import requests import requests
import logging import logging
import time import time
@ -18,7 +19,7 @@ handler.setFormatter(formatter)
log.addHandler(handler) log.addHandler(handler)
def load_proto(): def load_proto():
inicio = datetime.now() inicio = datetime.now(tz=ZoneInfo("America/Santiago"))
final_dir = "static/ccp/{}/{}/{}".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d")) final_dir = "static/ccp/{}/{}/{}".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"))
final_name = "static/ccp/{}/{}/{}/ccp_gtfs_{}.proto".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"), inicio.strftime("%Y%m%d_%H%M_%S")) final_name = "static/ccp/{}/{}/{}/ccp_gtfs_{}.proto".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"), inicio.strftime("%Y%m%d_%H%M_%S"))
@ -32,10 +33,10 @@ def load_proto():
nuevo_registro = Registros(filename=final_name, status=0) nuevo_registro = Registros(filename=final_name, status=0)
db.add(nuevo_registro) db.add(nuevo_registro)
db.commit() db.commit()
log.info("Fetched GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now()-inicio).total_seconds())) log.info("Fetched GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
except: except:
import traceback import traceback
log.error("Failed to fetch GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now()-inicio).total_seconds())) log.error("Failed to fetch GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
log.info('Traceback {}'.format(traceback.format_exc())) log.info('Traceback {}'.format(traceback.format_exc()))
nuevo_registro = Registros(filename=final_name, status=100) nuevo_registro = Registros(filename=final_name, status=100)
db.add(nuevo_registro) db.add(nuevo_registro)

View File

@ -1,11 +1,13 @@
# coding: utf-8 # coding: utf-8
from updater.model.feed import init_db, db, engine from updater.model.feed import init_db, db, engine
from updater.main import sync_db, log from updater.main import sync_db, log, estado_maximo
import time import time
import random
if __name__ == '__main__': if __name__ == '__main__':
init_db(db, engine) init_db(db, engine)
doki = int(time.time()) + 600 doki = int(time.time()) + 600
time.sleep(random.randint(1, 30)) #random wait
while True: while True:
if not sync_db(): if not sync_db():
@ -13,6 +15,7 @@ if __name__ == '__main__':
if i >= doki: if i >= doki:
doki = i + 600 doki = i + 600
log.info('Heartbeat') log.info('Heartbeat')
time.sleep(20) if estado_maximo == 0:
time.sleep(random.randint(10, 30)) #more randomness
else: else:
doki = int(time.time()) + 600 doki = int(time.time()) + 600

View File

@ -1,7 +1,8 @@
from updater.model.feed import db, Registros, Vehiculo, Viaje, Entidades, Posicion from updater.model.feed import db, Registros, Vehiculo, Viaje, Entidades, Posicion
import utils.gtfs_realtime_pb2 import utils.gtfs_realtime_pb2
from time import strftime from time import strftime
from datetime import datetime from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import logging import logging
import json import json
import time import time
@ -17,16 +18,34 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter) handler.setFormatter(formatter)
log.addHandler(handler) log.addHandler(handler)
try:
estado_maximo = int(os.environ.get('GTFS_RT_FEED_MAXSTATUS'))
except:
log.warning("failed to load GTFS_RT_FEED_MAXSTATUS enviroment variable, setting default value")
estado_maximo = 0
try:
antiguedad_minima = int(os.environ.get('GTFS_RT_MIN_DELAY'))
except:
log.warning("failed to load GTFS_RT_MIN_DELAY enviroment variable, setting default value")
antiguedad_minima = 0
def sync_db(): def sync_db():
inicio = datetime.now() inicio = datetime.now(tz=ZoneInfo("America/Santiago"))
registrodb = db.query(Registros).filter(Registros.status<5).order_by(Registros.id.asc()).first()
ultima_modificacion = inicio - timedelta(0,antiguedad_minima)
registrodb = db.query(Registros).filter(Registros.status<=estado_maximo, Registros.timestamp<ultima_modificacion).order_by(Registros.id.asc()).first()
if registrodb is None: if registrodb is None:
return False return False
log.info("Record {} has status {}".format(registrodb.filename, registrodb.status))
registrodb.status = registrodb.status + 1 registrodb.status = registrodb.status + 1
db.commit() db.commit()
log.info("Record {} has status {}".format(registrodb.filename, registrodb.status))
try: try:
feed = utils.gtfs_realtime_pb2.FeedMessage() feed = utils.gtfs_realtime_pb2.FeedMessage()
@ -41,9 +60,9 @@ def sync_db():
if patentedb is None: if patentedb is None:
patentedb = Vehiculo(patente=patente, timestamp=itemts) patentedb = Vehiculo(patente=patente, timestamp=itemts)
db.add(patentedb) db.add(patentedb)
db.commit()
elif patentedb.timestamp < itemts: elif patentedb.timestamp < itemts:
patentedb.timestamp = itemts patentedb.timestamp = itemts
db.commit()
if item.HasField('trip_update'): if item.HasField('trip_update'):
tripid = item.trip_update.trip.trip_id tripid = item.trip_update.trip.trip_id
@ -58,9 +77,9 @@ def sync_db():
tripdb = Viaje(trip_id=tripid, route_id=routeid, direction_id=direccionid, start_time=tstar, start_date=dstart, timestamp=itemts) tripdb = Viaje(trip_id=tripid, route_id=routeid, direction_id=direccionid, start_time=tstar, start_date=dstart, timestamp=itemts)
db.add(tripdb) db.add(tripdb)
db.commit()
elif tripdb.timestamp < itemts: elif tripdb.timestamp < itemts:
tripdb.timestamp = itemts tripdb.timestamp = itemts
db.commit()
tripdbid = tripdb.id tripdbid = tripdb.id
else: else:
tripdbid = None tripdbid = None
@ -69,9 +88,9 @@ def sync_db():
if entidadesdb is None: if entidadesdb is None:
entidadesdb = Entidades(entity=str(item.id), vehicleid=patentedb.id, tripid=tripdbid, timestamp=itemts) entidadesdb = Entidades(entity=str(item.id), vehicleid=patentedb.id, tripid=tripdbid, timestamp=itemts)
db.add(entidadesdb) db.add(entidadesdb)
db.commit()
elif entidadesdb.timestamp < itemts: elif entidadesdb.timestamp < itemts:
entidadesdb.timestamp = itemts entidadesdb.timestamp = itemts
db.commit()
posiciondb = db.query(Posicion).filter(Posicion.vehicleid==patentedb.id, Posicion.timestamp==itemts).one_or_none() posiciondb = db.query(Posicion).filter(Posicion.vehicleid==patentedb.id, Posicion.timestamp==itemts).one_or_none()
if posiciondb is None: if posiciondb is None:
@ -87,9 +106,9 @@ def sync_db():
registrodb.status=50 registrodb.status=50
db.commit() db.commit()
log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now()-inicio).total_seconds())) log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
return True return True
except: except:
import traceback import traceback
log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now()-inicio).total_seconds(), registrodb.status)) log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds(), registrodb.status))
log.info('Traceback {}'.format(traceback.format_exc())) log.info('Traceback {}'.format(traceback.format_exc()))