double ingester
parent
f4320efbaa
commit
8d5ba035a9
7
sync.py
7
sync.py
|
@ -1,11 +1,13 @@
|
|||
# coding: utf-8
|
||||
from updater.model.feed import init_db, db, engine
|
||||
from updater.main import sync_db, log
|
||||
from updater.main import sync_db, log, estado_maximo
|
||||
import time
|
||||
import random
|
||||
|
||||
if __name__ == '__main__':
|
||||
init_db(db, engine)
|
||||
doki = int(time.time()) + 600
|
||||
time.sleep(random.randint(1, 30)) #random wait
|
||||
|
||||
while True:
|
||||
if not sync_db():
|
||||
|
@ -13,6 +15,7 @@ if __name__ == '__main__':
|
|||
if i >= doki:
|
||||
doki = i + 600
|
||||
log.info('Heartbeat')
|
||||
time.sleep(20)
|
||||
if estado_maximo == 0:
|
||||
time.sleep(random.randint(10, 30)) #more randomness
|
||||
else:
|
||||
doki = int(time.time()) + 600
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
from updater.model.feed import db, Registros, Vehiculo, Viaje, Entidades, Posicion
|
||||
import utils.gtfs_realtime_pb2
|
||||
from time import strftime
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
|
@ -17,16 +18,34 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|||
handler.setFormatter(formatter)
|
||||
log.addHandler(handler)
|
||||
|
||||
|
||||
try:
|
||||
estado_maximo = int(os.environ.get('GTFS_RT_FEED_MAXSTATUS'))
|
||||
except:
|
||||
log.warning("failed to load GTFS_RT_FEED_MAXSTATUS enviroment variable, setting default value")
|
||||
estado_maximo = 0
|
||||
|
||||
try:
|
||||
antiguedad_minima = int(os.environ.get('GTFS_RT_MIN_DELAY'))
|
||||
except:
|
||||
log.warning("failed to load GTFS_RT_MIN_DELAY enviroment variable, setting default value")
|
||||
antiguedad_minima = 0
|
||||
|
||||
|
||||
|
||||
def sync_db():
|
||||
inicio = datetime.now()
|
||||
registrodb = db.query(Registros).filter(Registros.status<5).order_by(Registros.id.asc()).first()
|
||||
inicio = datetime.now(tz=ZoneInfo("America/Santiago"))
|
||||
|
||||
ultima_modificacion = inicio - timedelta(0,antiguedad_minima)
|
||||
|
||||
|
||||
registrodb = db.query(Registros).filter(Registros.status<=estado_maximo, Registros.timestamp<ultima_modificacion).order_by(Registros.id.asc()).first()
|
||||
if registrodb is None:
|
||||
return False
|
||||
|
||||
log.info("Record {} has status {}".format(registrodb.filename, registrodb.status))
|
||||
|
||||
registrodb.status = registrodb.status + 1
|
||||
db.commit()
|
||||
log.info("Record {} has status {}".format(registrodb.filename, registrodb.status))
|
||||
|
||||
try:
|
||||
feed = utils.gtfs_realtime_pb2.FeedMessage()
|
||||
|
@ -87,9 +106,9 @@ def sync_db():
|
|||
|
||||
registrodb.status=50
|
||||
db.commit()
|
||||
log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now()-inicio).total_seconds()))
|
||||
log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds()))
|
||||
return True
|
||||
except:
|
||||
import traceback
|
||||
log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now()-inicio).total_seconds(), registrodb.status))
|
||||
log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds(), registrodb.status))
|
||||
log.info('Traceback {}'.format(traceback.format_exc()))
|
||||
|
|
Loading…
Reference in New Issue