from updater.model.feed import db, Registros, Vehiculo, Viaje, Entidades, Posicion import utils.gtfs_realtime_pb2 from time import strftime from datetime import datetime import logging import json import time import sys import os log = logging.getLogger('updater') log.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) log.addHandler(handler) def sync_db(): inicio = datetime.now() registrodb = db.query(Registros).filter(Registros.status<5).order_by(Registros.id.asc()).first() if registrodb is None: return False log.info("Record {} has status {}".format(registrodb.filename, registrodb.status)) registrodb.status = registrodb.status + 1 db.commit() try: feed = utils.gtfs_realtime_pb2.FeedMessage() with open(os.path.abspath(registrodb.filename), 'rb') as file: feed.ParseFromString(file.read()) for item in feed.entity: patente = item.vehicle.vehicle.license_plate itemts = item.vehicle.timestamp patentedb = db.query(Vehiculo).filter(Vehiculo.patente==patente).one_or_none() if patentedb is None: patentedb = Vehiculo(patente=patente, timestamp=itemts) db.add(patentedb) db.commit() elif patentedb.timestamp < itemts: patentedb.timestamp = itemts if item.HasField('trip_update'): tripid = item.trip_update.trip.trip_id tripdb = db.query(Viaje).filter(Viaje.trip_id==tripid).one_or_none() if tripdb is None: tripid = item.trip_update.trip.trip_id routeid = item.trip_update.trip.route_id direccionid = item.trip_update.trip.direction_id tstar = item.trip_update.trip.start_time dstart = item.trip_update.trip.start_date tripdb = Viaje(trip_id=tripid, route_id=routeid, direction_id=direccionid, start_time=tstar, start_date=dstart, timestamp=itemts) db.add(tripdb) db.commit() elif tripdb.timestamp < itemts: tripdb.timestamp = itemts tripdbid = tripdb.id else: tripdbid = None entidadesdb = db.query(Entidades).filter(Entidades.entity==str(item.id)).one_or_none() if entidadesdb is None: entidadesdb = Entidades(entity=str(item.id), vehicleid=patentedb.id, tripid=tripdbid, timestamp=itemts) db.add(entidadesdb) db.commit() elif entidadesdb.timestamp < itemts: entidadesdb.timestamp = itemts posiciondb = db.query(Posicion).filter(Posicion.vehicleid==patentedb.id, Posicion.timestamp==itemts).one_or_none() if posiciondb is None: lat = item.vehicle.position.latitude long = item.vehicle.position.longitude bearing = item.vehicle.position.bearing odometer = item.vehicle.position.odometer speed = item.vehicle.position.speed posiciondb = Posicion(vehicleid=patentedb.id, tripid=tripdbid, latitude=lat, longitude=long, bearing=bearing, odometer=odometer, speed=speed, timestamp=itemts) db.add(posiciondb) db.commit() registrodb.status=50 db.commit() log.info("GTFS-RT {} Ingested in {}s".format(registrodb.filename, (datetime.now()-inicio).total_seconds())) return True except: import traceback log.error("Failed to Ingest Record {} in {}s (attempt: {}/5)".format(registrodb.filename, (datetime.now()-inicio).total_seconds(), registrodb.status)) log.info('Traceback {}'.format(traceback.format_exc()))