Carga_GTFS/cargaAPI/carga/procesos.py

330 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
from os import remove
import zipfile
import pandas as pd
import shutil
from database.db import engine
from sqlalchemy import desc,func,case,text
from sqlalchemy.orm import Session
from sqlalchemy.sql import exists
from models.carga_gtfs import CargaGtfs
from models.agency import Agency
from models.calendar import Calendar
from models.route import Route
from models.shape_refence import ShapeReference
from models.shapes import Shapes
from models.stop_times import StopTimes
from models.stops import Stop
from models.trip import Trip
def actualizar_estado(id_carga_gtfs:int, estado:str, mensaje:str, db: Session):
zip = db.query(CargaGtfs).filter(CargaGtfs.id_carga_gtfs == id_carga_gtfs).first()
zip.estado = estado
zip.mensaje = mensaje
db.commit()
def verificar_columnas(dataframe, columnas_necesarias, archivo, zip, db):
columnas_sobrantes = [columna for columna in dataframe.columns if columna not in columnas_necesarias]
columnas_faltantes = [columna for columna in columnas_necesarias if columna not in dataframe.columns]
dataframe_filtrado = dataframe.drop(columnas_sobrantes, axis=1)
if len(columnas_faltantes) > 0:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje=f"Columnas faltantes en archivo {archivo}, por favor asegúrese de que el archivo contenga las siguientes columnas: {', '.join(columnas_faltantes)}", db=db)
raise ValueError(f"Columnas faltantes en archivo {archivo}, por favor asegúrese de que el archivo contenga las siguientes columnas: {', '.join(columnas_faltantes)}")
else:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Archivo '{archivo}' leído y comprobado que contenga todas las columnas necesarias, siguiendo proceso...", db=db)
return dataframe_filtrado
def cargar_datos (df , zip, tabla , db , engine):
df.to_sql(tabla , engine , if_exists ='append', index=False)
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado=" Procesando ",
mensaje=f"Archivo {tabla }. txt ha sido cargado a la tabla , siguiendo proceso ...", db=db)
def extraer_zip (zip, zip_ruta , db):
with zipfile.ZipFile(zip_ruta , 'r') as archivo_zip :
archivo_zip.extractall ('txts/')
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado=" Procesando ",mensaje=" Extrayendo archivos del ltimo ZIP ingresado .", db=db)
def identificar_zip(db: Session):
zip = db.query(CargaGtfs).order_by(desc('fecha')).first()
return zip
def inspeccionar_txts(zip,almacen_txts:str,zip_ruta:str,db: Session):
archivos_en_txt = os.listdir(almacen_txts)
archivos_esperados = ['agency.txt', 'calendar.txt', 'routes.txt', 'stop_times.txt', 'stops.txt', 'trips.txt', 'shapes.txt']
archivos_faltantes = [archivo for archivo in archivos_esperados if archivo not in archivos_en_txt]
if len(archivos_faltantes) > 0:
for archivo in archivos_en_txt:
archivo_ruta = os.path.join(almacen_txts, archivo)
os.remove(archivo_ruta)
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Archivos Faltantes, por favor ingrese los siguientes archivos: "+",".join(archivos_faltantes), db=db)
remove(zip_ruta)
raise ValueError("Archivos Faltantes, por favor ingrese los siguientes archivos: "+",".join(archivos_faltantes))
for archivo in archivos_en_txt:
if archivo not in archivos_esperados:
os.remove(almacen_txts+archivo)
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje="Total de archivos esperados, siguiendo proceso...", db=db)
def transformar_txts(zip,db: Session):
id_carga_gtfs = zip.id_carga_gtfs
try:
df_agency = pd.read_csv('txts/agency.txt', dtype=str)
columnas_necesarias_agency = ['agency_id','agency_name','agency_url','agency_timezone','agency_lang','agency_phone','agency_fare_url']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo agency.txt a dataframe", db=db)
df_agency = verificar_columnas(df_agency, columnas_necesarias_agency, 'agency.txt', zip, db)
df_agency['id_carga_gtfs'] = id_carga_gtfs
try:
df_calendar = pd.read_csv('txts/calendar.txt', dtype=str)
columnas_necesarias_calendar = ['service_id','start_date','end_date','monday','tuesday','wednesday','thursday','friday','saturday','sunday']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo calendar.txt a dataframe", db=db)
df_calendar = verificar_columnas(df_calendar, columnas_necesarias_calendar, 'calendar.txt', zip, db)
df_calendar['id_carga_gtfs'] = id_carga_gtfs
try:
df_routes = pd.read_csv('txts/routes.txt', dtype=str)
columnas_necesarias_route = ['route_id','agency_id','route_short_name','route_long_name','route_desc','route_type','route_url','route_color','route_text_color']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo routes.txt a dataframe", db=db)
df_routes = verificar_columnas(df_routes, columnas_necesarias_route, 'routes.txt', zip, db)
df_routes['id_carga_gtfs'] = id_carga_gtfs
#Por este caso que leemos todos los campos como str hay que cambiar la id de route a int para que tenga el mismo tipo de dato que la db
df_routes['route_id'] = df_routes['route_id'].astype(int)
try:
df_stop_times = pd.read_csv('txts/stop_times.txt', dtype=str)
columnas_necesarias_stop_times = ['trip_id','arrival_time','departure_time','stop_id','stop_sequence','stop_headsign','pickup_type','drop_off_type','timepoint']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo stop_times.txt a dataframe", db=db)
df_stop_times = verificar_columnas(df_stop_times, columnas_necesarias_stop_times, 'stop_times.txt', zip, db)
df_stop_times['id_carga_gtfs'] = id_carga_gtfs
try:
df_stops = pd.read_csv('txts/stops.txt', dtype=str)
columnas_necesarias_stops = ['stop_id','stop_code','stop_name','stop_desc','stop_lat','stop_lon','zone_id','stop_url','location_type','parent_station','wheelchair_boarding']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo stops.txt a dataframe", db=db)
df_stops = verificar_columnas(df_stops, columnas_necesarias_stops, 'stops.txt', zip, db)
df_stops['id_carga_gtfs'] = id_carga_gtfs
df_stops['stop_id_interno'] = range(1, len(df_stops) + 1)
try:
df_trips = pd.read_csv('txts/trips.txt', dtype=str)
columnas_necesarias_trips = ['route_id','service_id','trip_id','trip_headsign','trip_short_name','direction_id','block_id','shape_id','wheelchair_accessible','bikes_allowed']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo trips.txt a dataframe", db=db)
df_trips = verificar_columnas(df_trips, columnas_necesarias_trips, 'trips.txt', zip, db)
df_trips['id_carga_gtfs'] = id_carga_gtfs
df_trips.rename(columns={'shape_id': 'shape_reference_id'}, inplace=True)
#Por este caso que leemos todos los campos como str hay que cambiar la id de route a int para que tenga el mismo tipo de dato que la db
df_trips['route_id'] = df_trips['route_id'].astype(int)
try:
df_shapes = pd.read_csv('txts/shapes.txt', dtype=str)
columnas_necesarias_shapes= ['shape_id','shape_pt_sequence','shape_pt_lat','shape_pt_lon']
except Exception as e:
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje="Se presentaron problemas al momento de transformar el archivo trips.txt a dataframe", db=db)
df_shapes = verificar_columnas(df_shapes, columnas_necesarias_shapes, 'shapes.txt', zip, db)
df_shapes['id_carga_gtfs'] = id_carga_gtfs
# Se genera el dataframe correspondiente a la tabla shapes_reference.
df_shapes_reference = df_shapes[['shape_id']].drop_duplicates().reset_index(drop=True)
df_shapes_reference['id_carga_gtfs'] = id_carga_gtfs
return df_agency,df_calendar,df_routes,df_stop_times,df_stops,df_trips,df_shapes,df_shapes_reference
def verificar_registros_db(db: Session):
tablas = [Agency,Calendar,Route,ShapeReference,Shapes,StopTimes,Stop,Trip]
for tabla in tablas:
tiene_registros = db.query(exists().where(tabla.id_carga_gtfs.isnot(None))).scalar()
if tiene_registros:
return False # Si al menos una tabla tiene registros, la base de datos no está vacía
return True #Si ninguna tabla tiene registros, la base de datos está vacía
def primera_carga(zip,db: Session,engine,df_agency,df_calendar,df_routes,df_stop_times,df_stops,df_trips,df_shapes,df_shapes_reference):
cargar_datos(df_agency,zip,'agency',db,engine)
cargar_datos(df_routes,zip,'route',db,engine)
cargar_datos(df_calendar,zip,'calendar',db,engine)
cargar_datos(df_stops,zip,'stop',db,engine)
cargar_datos(df_shapes_reference,zip,'shape_reference',db,engine)
cargar_datos(df_shapes,zip,'shapes', db,engine)
cargar_datos(df_trips,zip,'trip',db,engine)
cargar_datos(df_stop_times,zip,'stop_times', db,engine)
def extraer_db (engine):
with engine.connect () as connection :
df_db_agency = pd.read_sql_table ('agency', con= connection )
df_db_routes = pd.read_sql_table ('route', con= connection )
df_db_calendar = pd.read_sql_table ('calendar', con= connection )
df_db_stops = pd.read_sql_table ('stop', con= connection )
df_db_shapes_reference = pd.read_sql_table ('shape_reference',con= connection )
df_db_shapes = pd.read_sql_table ('shapes', con= connection )
df_db_trips = pd.read_sql_table ('trip', con= connection )
df_db_stop_times = pd.read_sql_table ('stop_times', con= connection )
return df_db_agency , df_db_routes , df_db_calendar , df_db_stops , df_db_shapes_reference , df_db_shapes , df_db_trips , df_db_stop_times
def eliminar_registros_antiguos(session, df_db, df,modelo,tabla):
comparar = df_db.merge(df, indicator=True, how='outer')
registros_antiguos = comparar.loc[lambda x: x['_merge'] == 'left_only'].drop(columns='_merge')
for i, row in registros_antiguos.iterrows():
session.query(modelo).filter_by(**{modelo.__table__.primary_key.columns.keys()[0]: row[0]}).delete()
session.commit()
def eliminar_registros (zip, db , df_db , df , id_tabla , tabla):
df_db_sin_id = df_db.drop(columns='id_carga_gtfs')
df_sin_id = df.drop(columns='id_carga_gtfs')
comparar = df_db_sin_id .merge(df_sin_id , indicator =True , how='outer')
# Filtrar los registros
registros_antiguos = comparar.loc[lambda x: x['_merge'] == 'left_only']
# Obtener una lista de los valores de la columna id_tabla de los registros antiguos
registros_a_eliminar = registros_antiguos [id_tabla ].tolist()
if registros_a_eliminar :
query = text(f"DELETE FROM {tabla} WHERE {id_tabla} IN ({','.join(map(repr , registros_a_eliminar ))})")
db.execute(query)
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs ,estado=" Procesando ", mensaje=f"Se eliminaron los registros obsoletos en la tabla {tabla }.", db=db)
else:
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado=" Procesando ", mensaje=f"No se encontraron registros obsoletos para eliminar en la tabla {tabla }.", db=db)
def eliminar_registros_stop_time (zip , db , df_db , df , pk1 , pk2 , tabla):
df_db_sin_id = df_db.drop( columns ='id_carga_gtfs')
df_sin_id = df.drop( columns ='id_carga_gtfs')
# Realizar una comparacion entre los DataFrames sin la columnaid_carga_gtfs
comparar = df_db_sin_id.merge(df_sin_id , how='left', indicator =True)
# Filtrar los registros que esten en df_db pero no en df ( left_only )
registros_antiguos = comparar.loc[ lambda x: x['_merge'] == 'left_only']
if not registros_antiguos.empty:
# Crear una subconsulta para verificar la existencia de registros enla tabla
subquery = (f" SELECT 1 FROM {tabla} AS t "f"WHERE t.{ pk1} = {tabla }.{ pk1} AND t.{ pk2} = {tabla }.{ pk2}")
query = text(f" DELETE FROM {tabla} WHERE EXISTS ({ subquery })")
db.execute(query)
actualizar_estado( id_carga_gtfs =zip. id_carga_gtfs ,estado =" Procesando ", mensaje =f"Se eliminaron los registros obsoletos de la tabla {tabla }.", db=db)
else:
actualizar_estado( id_carga_gtfs =zip. id_carga_gtfs , estado = " Procesando ", mensaje =f"No se encontraron registros obsoletos para eliminar en la tabla {tabla }.", db=db)
def agregar_registros (zip , db , df_db , df , tabla):
df_db_sin_id = df_db.drop( columns ='id_carga_gtfs')
df_sin_id = df.drop( columns ='id_carga_gtfs')
# Realizar una c o m p a r a c i n entre los DataFrames
comparar = df_db_sin_id.merge(df_sin_id , indicator =True , how='outer')
# Filtrar los registros que e s t n en df pero no en df_db ( right_only )
registros_nuevos = comparar.loc[ lambda x: x['_merge'] == 'right_only']
registros_nuevos.reset_index (drop=True , inplace =True)
if not registros_nuevos.empty:
#Asignar los valores de id_carga_gtfs utilizando .loc
registros_nuevos.loc[:, 'id_carga_gtfs'] = df.loc[registros_nuevos.index , 'id_carga_gtfs']
# Eliminar la columna _merge
registros_nuevos.drop( columns ='_merge', inplace =True)
# Agregar los registros nuevos a la tabla en la base de datos
registros_nuevos.to_sql (tabla , engine , if_exists ='append', index=False )
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs ,estado =" Procesando ", mensaje =f"Se agregaron los registros nuevos en la tabla de {tabla }.", db=db)
else:
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado =" Procesando ", mensaje =f"No se encontraron registros nuevos para agregar en la tabla {tabla }.", db=db)
def agregar_registros_stops (zip , db , df_db , df , tabla):
df_db_sin_id = df_db.drop( columns =['id_carga_gtfs', 'stop_id_interno'])
df_sin_id = df.drop( columns ='id_carga_gtfs')
# Realizar una c o m p a r a c i n entre los DataFrames
comparar = df_db_sin_id.merge(df_sin_id , on=list( df_db_sin_id.columns ), indicator =True , how='outer')
# Filtrar los registros que e s t n en df pero no en df_db ( right_only )
registros_nuevos = comparar.loc[ lambda x: x['_merge'] == 'right_only']
# Asignar valores de id_carga_gtfs utilizando .loc
registros_nuevos ['id_carga_gtfs'] = df.reset_index ().loc[ registros_nuevos.index ]['id_carga_gtfs']
# Eliminar la columna _merge
registros_nuevos = registros_nuevos.drop( columns ='_merge')
# Obtener el valor m x i m o existente de stop_id_interno
max_existing_id = df_db['stop_id_interno'].max()
# Asignar nuevos valores de stop_id_interno
registros_nuevos['stop_id_interno'] = range( max_existing_id + 1, max_existing_id + 1 + len( registros_nuevos ))
# Agregar los registros nuevos a la tabla en la base de datos
registros_nuevos.to_sql (tabla , engine , if_exists ='append', index=False)
# Actualizar el estado
actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado =" Procesando ", mensaje =f"Se agregaron los registros nuevos en la tabla de{ tabla }.", db=db)
def verificar_registros_obsoletos (zip, db , df_agency , df_calendar , df_routes ,df_stop_times , df_stops , df_trips , df_shapes , df_shapes_reference, df_db_agency , df_db_routes , df_db_calendar , df_db_stops , df_db_shapes_reference , df_db_shapes , df_db_trips , df_db_stop_times ):
# Eliminar registros obsoletos
eliminar_registros_stop_time(zip, db , df_db_stop_times , df_stop_times ,'trip_id', 'stop_id', 'stop_times')
eliminar_registros(zip, db , df_db_trips , df_trips , 'trip_id', 'trip' )
eliminar_registros(zip, db , df_db_shapes , df_shapes , 'shape_id', 'shapes')
eliminar_registros(zip, db , df_db_shapes_reference , df_shapes_reference , 'shape_id', 'shape_reference')
eliminar_registros(zip, db , df_db_calendar , df_calendar , 'service_id','calendar')
eliminar_registros(zip, db , df_db_routes , df_routes , 'route_id', 'route')
eliminar_registros(zip, db , df_db_agency , df_agency , 'agency_id', 'agency')
eliminar_registros(zip, db , df_db_stops , df_stops ,'stop_id', 'stop')
def verificar_agregar_registros (zip , db , df_agency , df_calendar , df_routes ,df_stop_times , df_stops , df_trips , df_shapes , df_shapes_reference ,df_db_agency , df_db_routes , df_db_calendar , df_db_stops ,df_db_shapes_reference , df_db_shapes , df_db_trips , df_db_stop_times ):
# Agregar nuevos registros
agregar_registros_stops(zip , db , df_db_stops , df_stops , 'stop')
agregar_registros(zip , db , df_db_agency , df_agency , 'agency')
agregar_registros(zip , db , df_db_routes , df_routes , 'route')
agregar_registros(zip , db , df_db_calendar , df_calendar , 'calendar')
agregar_registros(zip , db , df_db_shapes_reference , df_shapes_reference ,'shape_reference')
agregar_registros(zip , db , df_db_shapes , df_shapes , 'shapes')
agregar_registros(zip , db , df_db_trips , df_trips , 'trip')
agregar_registros(zip , db , df_db_stop_times , df_stop_times , 'stop_times')
def terminar_carga (zip, db , zips_procesados , directorio_txts ):
try:
# Eliminar archivos en el directorio_txts
for archivo in os.listdir( directorio_txts ):
ruta_completa = os.path.join(directorio_txts , archivo)
try:
if os.path.isfile( ruta_completa ):
os.unlink( ruta_completa )
except Exception as e:
print(f"No se pudo eliminar { ruta_completa }. Razon : {e}")
# Crear un nuevo archivo ZIP procesado
shutil.make_archive(f'{ zips_procesados }/{ zip.zip} _procesado ', 'zip','txts')
# Actualizar el estado
actualizar_estado( id_carga_gtfs =zip.id_carga_gtfs , estado=" Terminado ", mensaje='Nuevo archivo ZIP generado , contiene nicamente archivos esperados ', db=db)
except Exception as e:
# Manejar errores
actualizar_estado( id_carga_gtfs =zip.id_carga_gtfs , estado="Error",mensaje=f"Se presentaron problemas al terminar la carga: {e}",db=db)
def restaurar_db(db):
tablas = [StopTimes,Trip,Shapes, ShapeReference,Calendar, Route , Shapes,Agency , Stop]
for tabla in tablas:
# Eliminar todos los registros de la tabla
db.query(tabla).delete()
db.commit()
def carga_error(zip, db, zips_procesados, directorio_txts):
actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje='Error al ingresar registros.', db=db)