2023-09-16 12:07:33 -03:00
from updater . model . feed import db , Registros , Vehiculo , Viaje , Entidades , Posicion
import utils . gtfs_realtime_pb2
from time import strftime
2023-10-01 21:28:17 -03:00
from datetime import datetime , timedelta
from zoneinfo import ZoneInfo
2023-09-16 12:07:33 -03:00
import logging
import json
import time
import sys
import os
log = logging . getLogger ( ' updater ' )
log . setLevel ( logging . DEBUG )
handler = logging . StreamHandler ( sys . stdout )
handler . setLevel ( logging . INFO )
formatter = logging . Formatter ( ' %(asctime)s - %(levelname)s - %(message)s ' )
handler . setFormatter ( formatter )
log . addHandler ( handler )
2023-10-01 21:28:17 -03:00
try :
estado_maximo = int ( os . environ . get ( ' GTFS_RT_FEED_MAXSTATUS ' ) )
except :
log . warning ( " failed to load GTFS_RT_FEED_MAXSTATUS enviroment variable, setting default value " )
estado_maximo = 0
try :
antiguedad_minima = int ( os . environ . get ( ' GTFS_RT_MIN_DELAY ' ) )
except :
log . warning ( " failed to load GTFS_RT_MIN_DELAY enviroment variable, setting default value " )
antiguedad_minima = 0
2023-09-16 12:07:33 -03:00
def sync_db ( ) :
2023-10-01 21:28:17 -03:00
inicio = datetime . now ( tz = ZoneInfo ( " America/Santiago " ) )
ultima_modificacion = inicio - timedelta ( 0 , antiguedad_minima )
registrodb = db . query ( Registros ) . filter ( Registros . status < = estado_maximo , Registros . timestamp < ultima_modificacion ) . order_by ( Registros . id . asc ( ) ) . first ( )
2023-09-16 12:07:33 -03:00
if registrodb is None :
return False
registrodb . status = registrodb . status + 1
db . commit ( )
2023-10-01 21:28:17 -03:00
log . info ( " Record {} has status {} " . format ( registrodb . filename , registrodb . status ) )
2023-09-16 12:07:33 -03:00
try :
feed = utils . gtfs_realtime_pb2 . FeedMessage ( )
with open ( os . path . abspath ( registrodb . filename ) , ' rb ' ) as file :
feed . ParseFromString ( file . read ( ) )
for item in feed . entity :
patente = item . vehicle . vehicle . license_plate
itemts = item . vehicle . timestamp
patentedb = db . query ( Vehiculo ) . filter ( Vehiculo . patente == patente ) . one_or_none ( )
if patentedb is None :
patentedb = Vehiculo ( patente = patente , timestamp = itemts )
db . add ( patentedb )
2023-09-29 18:46:55 -03:00
db . commit ( )
2023-09-16 12:07:33 -03:00
elif patentedb . timestamp < itemts :
patentedb . timestamp = itemts
if item . HasField ( ' trip_update ' ) :
tripid = item . trip_update . trip . trip_id
tripdb = db . query ( Viaje ) . filter ( Viaje . trip_id == tripid ) . one_or_none ( )
if tripdb is None :
tripid = item . trip_update . trip . trip_id
routeid = item . trip_update . trip . route_id
direccionid = item . trip_update . trip . direction_id
tstar = item . trip_update . trip . start_time
dstart = item . trip_update . trip . start_date
tripdb = Viaje ( trip_id = tripid , route_id = routeid , direction_id = direccionid , start_time = tstar , start_date = dstart , timestamp = itemts )
db . add ( tripdb )
2023-09-29 18:46:55 -03:00
db . commit ( )
2023-09-16 12:07:33 -03:00
elif tripdb . timestamp < itemts :
tripdb . timestamp = itemts
tripdbid = tripdb . id
else :
tripdbid = None
entidadesdb = db . query ( Entidades ) . filter ( Entidades . entity == str ( item . id ) ) . one_or_none ( )
if entidadesdb is None :
entidadesdb = Entidades ( entity = str ( item . id ) , vehicleid = patentedb . id , tripid = tripdbid , timestamp = itemts )
db . add ( entidadesdb )
2023-09-29 18:46:55 -03:00
db . commit ( )
2023-09-16 12:07:33 -03:00
elif entidadesdb . timestamp < itemts :
entidadesdb . timestamp = itemts
posiciondb = db . query ( Posicion ) . filter ( Posicion . vehicleid == patentedb . id , Posicion . timestamp == itemts ) . one_or_none ( )
if posiciondb is None :
lat = item . vehicle . position . latitude
long = item . vehicle . position . longitude
bearing = item . vehicle . position . bearing
odometer = item . vehicle . position . odometer
speed = item . vehicle . position . speed
posiciondb = Posicion ( vehicleid = patentedb . id , tripid = tripdbid , latitude = lat , longitude = long , bearing = bearing , odometer = odometer , speed = speed , timestamp = itemts )
db . add ( posiciondb )
db . commit ( )
registrodb . status = 50
db . commit ( )
2023-10-01 21:28:17 -03:00
log . info ( " GTFS-RT {} Ingested in {} s " . format ( registrodb . filename , ( datetime . now ( tz = ZoneInfo ( " America/Santiago " ) ) - inicio ) . total_seconds ( ) ) )
2023-09-16 12:07:33 -03:00
return True
except :
import traceback
2023-10-01 21:28:17 -03:00
log . error ( " Failed to Ingest Record {} in {} s (attempt: {} /5) " . format ( registrodb . filename , ( datetime . now ( tz = ZoneInfo ( " America/Santiago " ) ) - inicio ) . total_seconds ( ) , registrodb . status ) )
2023-09-16 12:07:33 -03:00
log . info ( ' Traceback {} ' . format ( traceback . format_exc ( ) ) )