From 975a642d3aa3eac262931bced96808300c3a3f87 Mon Sep 17 00:00:00 2001 From: Abner Astete Date: Fri, 12 Jan 2024 17:51:55 +0100 Subject: [PATCH] Final Tesis --- cargaAPI/carga/procesos.py | 260 ++++++++++++++++---------------- cargaAPI/models/agency.py | 4 +- cargaAPI/models/calendar.py | 1 - cargaAPI/router/router.py | 245 +++++++++++++++++++++--------- cargaAPI/txts/.empty | 0 cargaAPI/zip/.empty | 0 cargaAPI/zips_procesados/.empty | 0 gtfsAPI/models/agency.py | 4 +- 8 files changed, 309 insertions(+), 205 deletions(-) create mode 100644 cargaAPI/txts/.empty create mode 100644 cargaAPI/zip/.empty create mode 100644 cargaAPI/zips_procesados/.empty diff --git a/cargaAPI/carga/procesos.py b/cargaAPI/carga/procesos.py index a7cb136..26158b4 100644 --- a/cargaAPI/carga/procesos.py +++ b/cargaAPI/carga/procesos.py @@ -45,31 +45,17 @@ def verificar_columnas(dataframe, columnas_necesarias, archivo, zip, db): actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Archivo '{archivo}' leído y comprobado que contenga todas las columnas necesarias, siguiendo proceso...", db=db) return dataframe_filtrado -def cargar_datos(df,zip,tabla,db,engine): - try: - df.to_sql(tabla, engine, if_exists='append', index=False) - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Archivo '{tabla}.txt' ha sido cargado a la tabla, siguiendo proceso...", db=db) - - except Exception as e: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje=f"Se presentaron problemas con el archivo '{tabla}.txt' al momento de ingresar los datos a la tabla.", db=db) +def cargar_datos (df , zip, tabla , db , engine): + df.to_sql(tabla , engine , if_exists ='append', index=False) + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado=" Procesando ", + mensaje=f"Archivo ’{tabla }. txt ’ ha sido cargado a la tabla , siguiendo proceso ...", db=db) - - - - -def extraer_zip_futuro(zip,zip_ruta,db): - with zipfile.ZipFile(zip_ruta, 'r') as archivo_zip: - archivo_zip.extractall('txts/') - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje="Extrayendo archivos del último ZIP ingresado.", db=db) - -def extraer_zip(zip,zip_ruta,db): - with zipfile.ZipFile(zip_ruta, 'r') as archivo_zip: - archivo_zip.extractall('txts/') - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje="Extrayendo archivos del último ZIP ingresado.", db=db) - - +def extraer_zip (zip, zip_ruta , db): + with zipfile.ZipFile(zip_ruta , 'r') as archivo_zip : + archivo_zip.extractall ('txts/') + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado=" Procesando ",mensaje=" Extrayendo archivos del ltimo ZIP ingresado .", db=db) def identificar_zip(db: Session): zip = db.query(CargaGtfs).order_by(desc('fecha')).first() @@ -170,7 +156,7 @@ def transformar_txts(zip,db: Session): return df_agency,df_calendar,df_routes,df_stop_times,df_stops,df_trips,df_shapes,df_shapes_reference -def verificar_registros_bd(db: Session): +def verificar_registros_db(db: Session): tablas = [Agency,Calendar,Route,ShapeReference,Shapes,StopTimes,Stop,Trip] for tabla in tablas: tiene_registros = db.query(exists().where(tabla.id_carga_gtfs.isnot(None))).scalar() @@ -188,19 +174,18 @@ def primera_carga(zip,db: Session,engine,df_agency,df_calendar,df_routes,df_stop cargar_datos(df_trips,zip,'trip',db,engine) cargar_datos(df_stop_times,zip,'stop_times', db,engine) -def extraer_db(engine): - with engine.connect() as connection: - df_db_agency = pd.read_sql_table('agency', con=connection) - df_db_routes = pd.read_sql_table('route', con=connection) - df_db_calendar = pd.read_sql_table('calendar', con=connection) - df_db_stops = pd.read_sql_table('stop', con=connection) - df_db_shapes_reference = pd.read_sql_table('shape_reference', con=connection) - df_db_shapes = pd.read_sql_table('shapes', con=connection) - df_db_trips = pd.read_sql_table('trip', con=connection) - df_db_stop_times = pd.read_sql_table('stop_times', con=connection) - - return df_db_agency,df_db_routes,df_db_calendar,df_db_stops,df_db_shapes_reference,df_db_shapes,df_db_trips,df_db_stop_times +def extraer_db (engine): + with engine.connect () as connection : + df_db_agency = pd.read_sql_table ('agency', con= connection ) + df_db_routes = pd.read_sql_table ('route', con= connection ) + df_db_calendar = pd.read_sql_table ('calendar', con= connection ) + df_db_stops = pd.read_sql_table ('stop', con= connection ) + df_db_shapes_reference = pd.read_sql_table ('shape_reference',con= connection ) + df_db_shapes = pd.read_sql_table ('shapes', con= connection ) + df_db_trips = pd.read_sql_table ('trip', con= connection ) + df_db_stop_times = pd.read_sql_table ('stop_times', con= connection ) + return df_db_agency , df_db_routes , df_db_calendar , df_db_stops , df_db_shapes_reference , df_db_shapes , df_db_trips , df_db_stop_times def eliminar_registros_antiguos(session, df_db, df,modelo,tabla): @@ -211,114 +196,135 @@ def eliminar_registros_antiguos(session, df_db, df,modelo,tabla): session.commit() -def eliminar_registros(zip,db,df_db,df,id_tabla,tabla): +def eliminar_registros (zip, db , df_db , df , id_tabla , tabla): df_db_sin_id = df_db.drop(columns='id_carga_gtfs') df_sin_id = df.drop(columns='id_carga_gtfs') + comparar = df_db_sin_id .merge(df_sin_id , indicator =True , how='outer') - comparar = df_db_sin_id.merge(df_sin_id, indicator=True, how='outer') + # Filtrar los registros registros_antiguos = comparar.loc[lambda x: x['_merge'] == 'left_only'] - - registros_antiguos['id_carga_gtfs'] = df_db.loc[registros_antiguos.index, 'id_carga_gtfs'] - registros_antiguos = registros_antiguos.drop(columns='_merge') - try: - registros_a_eliminar = registros_antiguos[id_tabla].tolist() - - if registros_a_eliminar: - query = text(f"DELETE FROM {tabla} WHERE {id_tabla} IN ({', '.join(map(repr, registros_a_eliminar))})") - db.execute(query) - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Se eliminaron los registros obsoletos, no se encontraban en el dataframe '{tabla}' entrante.", db=db) - else: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"No se encontraron registros obsoletos para eliminar en la tabla '{tabla}'.", db=db) - - except Exception as e: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje=f"Se presentaron problemas con el al momento de eliminar los registros que no se encontraban en el'{tabla}' entrante.", db=db) - return registros_antiguos + # Obtener una lista de los valores de la columna ’id_tabla’ de los registros antiguos + registros_a_eliminar = registros_antiguos [id_tabla ].tolist() + if registros_a_eliminar : + query = text(f"DELETE FROM {tabla} WHERE {id_tabla} IN ({','.join(map(repr , registros_a_eliminar ))})") + db.execute(query) + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs ,estado=" Procesando ", mensaje=f"Se eliminaron los registros obsoletos en la tabla ’{tabla }’.", db=db) + else: + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado=" Procesando ", mensaje=f"No se encontraron registros obsoletos para eliminar en la tabla ’{tabla }’.", db=db) -def eliminar_registros_stop_time(zip, db, df_db, df, pk1, pk2, tabla): - df_db_sin_id = df_db.drop(columns='id_carga_gtfs') - df_sin_id = df.drop(columns='id_carga_gtfs') +def eliminar_registros_stop_time (zip , db , df_db , df , pk1 , pk2 , tabla): + df_db_sin_id = df_db.drop( columns ='id_carga_gtfs') + df_sin_id = df.drop( columns ='id_carga_gtfs') + # Realizar una comparacion entre los DataFrames sin la columna’id_carga_gtfs ’ + comparar = df_db_sin_id.merge(df_sin_id , how='left', indicator =True) + # Filtrar los registros que esten en df_db pero no en df ( left_only ) + registros_antiguos = comparar.loc[ lambda x: x['_merge'] == 'left_only'] + if not registros_antiguos.empty: + # Crear una subconsulta para verificar la existencia de registros enla tabla + subquery = (f" SELECT 1 FROM {tabla} AS t "f"WHERE t.{ pk1} = {tabla }.{ pk1} AND t.{ pk2} = {tabla }.{ pk2}") + query = text(f" DELETE FROM {tabla} WHERE EXISTS ({ subquery })") + db.execute(query) + actualizar_estado( id_carga_gtfs =zip. id_carga_gtfs ,estado =" Procesando ", mensaje =f"Se eliminaron los registros obsoletos de la tabla ’{tabla }’.", db=db) + else: + actualizar_estado( id_carga_gtfs =zip. id_carga_gtfs , estado = " Procesando ", mensaje =f"No se encontraron registros obsoletos para eliminar en la tabla ’{tabla }’.", db=db) - comparar = df_db_sin_id.merge(df_sin_id, indicator=True, how='outer') - registros_antiguos = comparar.loc[lambda x: x['_merge'] == 'left_only'] - - registros_antiguos['id_carga_gtfs'] = df_db.loc[registros_antiguos.index, 'id_carga_gtfs'] - registros_antiguos = registros_antiguos.drop(columns='_merge') - registros_a_eliminar = registros_antiguos[[pk1, pk2]].values.tolist() +def agregar_registros (zip , db , df_db , df , tabla): + df_db_sin_id = df_db.drop( columns ='id_carga_gtfs') + df_sin_id = df.drop( columns ='id_carga_gtfs') - try: - registros_a_eliminar_str = ', '.join(f"('{r[0]}', '{r[1]}')" for r in registros_a_eliminar) - if registros_a_eliminar: - query = text(f"DELETE FROM {tabla} WHERE ({pk1}, {pk2}) IN ({registros_a_eliminar_str})") - print(query) - db.execute(query, {"registros_a_eliminar": registros_a_eliminar}) - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Se eliminaron los registros obsoletos de la tabla '{tabla}'.", db=db) - else: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"No se encontraron registros obsoletos para eliminar en la tabla '{tabla}'.", db=db) - - except Exception as e: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje=f"Hubo un error al eliminar los registros obsoletos de la tabla '{tabla}'.", db=db) - -def agregar_registros(zip,db,df_db,df,tabla): - df_db_sin_id = df_db.drop(columns='id_carga_gtfs') - df_sin_id = df.drop(columns='id_carga_gtfs') - comparar = df_db_sin_id.merge(df_sin_id, indicator=True, how='outer') + # Realizar una c o m p a r a c i n entre los DataFrames + comparar = df_db_sin_id.merge(df_sin_id , indicator =True , how='outer') + # Filtrar los registros que e s t n en df pero no en df_db ( right_only ) + registros_nuevos = comparar.loc[ lambda x: x['_merge'] == 'right_only'] + registros_nuevos.reset_index (drop=True , inplace =True) + if not registros_nuevos.empty: + #Asignar los valores de ’id_carga_gtfs ’ utilizando .loc + registros_nuevos.loc[:, 'id_carga_gtfs'] = df.loc[registros_nuevos.index , 'id_carga_gtfs'] - registros_nuevos = comparar.loc[lambda x: x['_merge'] == 'right_only'] - - registros_nuevos['id_carga_gtfs'] = df.loc[registros_nuevos.index]['id_carga_gtfs'] - registros_nuevos = registros_nuevos.drop(columns='_merge') - - try: - registros_nuevos.to_sql(tabla, engine, if_exists='append', index=False) - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Se agregaron los registros nuevos en la tabla de '{tabla}'.", db=db) - except Exception as e: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje=f"Se presentaron problemas al momento de agregar registros nuevos en la tabla de '{tabla}'.", db=db) - - return registros_nuevos + # Eliminar la columna ’_merge ’ + registros_nuevos.drop( columns ='_merge', inplace =True) + # Agregar los registros nuevos a la tabla en la base de datos + registros_nuevos.to_sql (tabla , engine , if_exists ='append', index=False ) + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs ,estado =" Procesando ", mensaje =f"Se agregaron los registros nuevos en la tabla de ’{tabla }’.", db=db) + else: + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado =" Procesando ", mensaje =f"No se encontraron registros nuevos para agregar en la tabla ’{tabla }’.", db=db) -def agregar_registros_stops(zip,db,df_db,df,tabla): - df_db_sin_id = df_db.drop(columns=['id_carga_gtfs', 'stop_id_interno']) - df_sin_id = df.drop(columns='id_carga_gtfs') - comparar = df_db_sin_id.merge(df_sin_id, on=list(df_db_sin_id.columns), indicator=True, how='outer') - registros_nuevos = comparar.loc[lambda x: x['_merge'] == 'right_only'] - registros_nuevos['id_carga_gtfs'] = df.loc[registros_nuevos.index]['id_carga_gtfs'] - registros_nuevos = registros_nuevos.drop(columns='_merge') +def agregar_registros_stops (zip , db , df_db , df , tabla): + df_db_sin_id = df_db.drop( columns =['id_carga_gtfs', 'stop_id_interno']) + df_sin_id = df.drop( columns ='id_carga_gtfs') + # Realizar una c o m p a r a c i n entre los DataFrames + comparar = df_db_sin_id.merge(df_sin_id , on=list( df_db_sin_id.columns ), indicator =True , how='outer') + # Filtrar los registros que e s t n en df pero no en df_db ( right_only ) + registros_nuevos = comparar.loc[ lambda x: x['_merge'] == 'right_only'] + # Asignar valores de ’id_carga_gtfs ’ utilizando .loc + registros_nuevos ['id_carga_gtfs'] = df.reset_index ().loc[ registros_nuevos.index ]['id_carga_gtfs'] + # Eliminar la columna ’_merge ’ + registros_nuevos = registros_nuevos.drop( columns ='_merge') + + # Obtener el valor m x i m o existente de ’stop_id_interno ’ max_existing_id = df_db['stop_id_interno'].max() - registros_nuevos['stop_id_interno'] = range(max_existing_id + 1, max_existing_id + 1 + len(registros_nuevos)) + # Asignar nuevos valores de ’stop_id_interno ’ + registros_nuevos['stop_id_interno'] = range( max_existing_id + 1, max_existing_id + 1 + len( registros_nuevos )) + # Agregar los registros nuevos a la tabla en la base de datos + registros_nuevos.to_sql (tabla , engine , if_exists ='append', index=False) + # Actualizar el estado + actualizar_estado ( id_carga_gtfs =zip.id_carga_gtfs , estado =" Procesando ", mensaje =f"Se agregaron los registros nuevos en la tabla de’{ tabla }’.", db=db) + + +def verificar_registros_obsoletos (zip, db , df_agency , df_calendar , df_routes ,df_stop_times , df_stops , df_trips , df_shapes , df_shapes_reference, df_db_agency , df_db_routes , df_db_calendar , df_db_stops , df_db_shapes_reference , df_db_shapes , df_db_trips , df_db_stop_times ): + # Eliminar registros obsoletos + eliminar_registros_stop_time(zip, db , df_db_stop_times , df_stop_times ,'trip_id', 'stop_id', 'stop_times') + eliminar_registros(zip, db , df_db_trips , df_trips , 'trip_id', 'trip' ) + eliminar_registros(zip, db , df_db_shapes , df_shapes , 'shape_id', 'shapes') + eliminar_registros(zip, db , df_db_shapes_reference , df_shapes_reference , 'shape_id', 'shape_reference') + eliminar_registros(zip, db , df_db_calendar , df_calendar , 'service_id','calendar') + eliminar_registros(zip, db , df_db_routes , df_routes , 'route_id', 'route') + eliminar_registros(zip, db , df_db_agency , df_agency , 'agency_id', 'agency') + eliminar_registros(zip, db , df_db_stops , df_stops ,'stop_id', 'stop') + +def verificar_agregar_registros (zip , db , df_agency , df_calendar , df_routes ,df_stop_times , df_stops , df_trips , df_shapes , df_shapes_reference ,df_db_agency , df_db_routes , df_db_calendar , df_db_stops ,df_db_shapes_reference , df_db_shapes , df_db_trips , df_db_stop_times ): + # Agregar nuevos registros + agregar_registros_stops(zip , db , df_db_stops , df_stops , 'stop') + agregar_registros(zip , db , df_db_agency , df_agency , 'agency') + agregar_registros(zip , db , df_db_routes , df_routes , 'route') + agregar_registros(zip , db , df_db_calendar , df_calendar , 'calendar') + agregar_registros(zip , db , df_db_shapes_reference , df_shapes_reference ,'shape_reference') + agregar_registros(zip , db , df_db_shapes , df_shapes , 'shapes') + agregar_registros(zip , db , df_db_trips , df_trips , 'trip') + agregar_registros(zip , db , df_db_stop_times , df_stop_times , 'stop_times') + + + +def terminar_carga (zip, db , zips_procesados , directorio_txts ): try: - registros_nuevos.to_sql(tabla, engine, if_exists='append', index=False) - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Procesando", mensaje=f"Se agregaron los registros nuevos en la tabla de '{tabla}'.", db=db) + # Eliminar archivos en el directorio_txts + for archivo in os.listdir( directorio_txts ): + ruta_completa = os.path.join(directorio_txts , archivo) + try: + if os.path.isfile( ruta_completa ): + os.unlink( ruta_completa ) + except Exception as e: + print(f"No se pudo eliminar { ruta_completa }. Razon : {e}") + + # Crear un nuevo archivo ZIP procesado + shutil.make_archive(f'{ zips_procesados }/{ zip.zip} _procesado ', 'zip','txts') + # Actualizar el estado + actualizar_estado( id_carga_gtfs =zip.id_carga_gtfs , estado=" Terminado ", mensaje='Nuevo archivo ZIP generado , contiene nicamente archivos esperados ', db=db) except Exception as e: - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje=f"Se presentaron problemas al momento de agregar registros nuevos en la tabla de '{tabla}'.", db=db) + # Manejar errores + actualizar_estado( id_carga_gtfs =zip.id_carga_gtfs , estado="Error",mensaje=f"Se presentaron problemas al terminar la carga: {e}",db=db) +def restaurar_db(db): + tablas = [StopTimes,Trip,Shapes, ShapeReference,Calendar, Route , Shapes,Agency , Stop] + for tabla in tablas: + # Eliminar todos los registros de la tabla + db.query(tabla).delete() + db.commit() - -def verificar_registros_antiguos(zip,db: Session,df_agency,df_calendar,df_routes,df_stop_times,df_stops,df_trips,df_shapes,df_shapes_reference,df_db_agency,df_db_routes,df_db_calendar,df_db_stops,df_db_shapes_reference,df_db_shapes,df_db_trips,df_db_stop_times): - eliminar_registros_stop_time(zip,db,df_db_stop_times,df_stop_times,'trip_id','stop_id','stop_times') - eliminar_registros(zip,db,df_db_trips,df_trips,'trip_id','trip') - eliminar_registros(zip,db,df_db_shapes,df_shapes,'shape_id','shapes') - eliminar_registros(zip,db,df_db_shapes_reference,df_shapes_reference,'shape_id','shape_reference') - eliminar_registros(zip,db,df_db_calendar,df_calendar,'service_id','calendar') - eliminar_registros(zip,db,df_db_routes,df_routes,'route_id','route') - eliminar_registros(zip,db,df_db_agency,df_agency,'agency_id','agency') - eliminar_registros(zip,db,df_db_stops,df_stops,'stop_id','stop') - -def verificar_registros_nuevos(zip,db: Session,df_agency,df_calendar,df_routes,df_stop_times,df_stops,df_trips,df_shapes,df_shapes_reference,df_db_agency,df_db_routes,df_db_calendar,df_db_stops,df_db_shapes_reference,df_db_shapes,df_db_trips,df_db_stop_times): - agregar_registros(zip,db,df_db_agency,df_agency,'agency') - agregar_registros(zip,db,df_db_routes,df_routes,'route') - agregar_registros(zip,db,df_db_calendar,df_calendar,'calendar') - agregar_registros(zip,db,df_db_shapes_reference,df_shapes_reference,'shape_reference') - agregar_registros(zip,db,df_db_shapes,df_shapes,'shapes') - agregar_registros(zip,db,df_db_trips,df_trips,'trip') - agregar_registros_stops(zip,db,df_db_stops,df_stops,'stop') - agregar_registros(zip,db,df_db_stop_times,df_stop_times,'stop_times') - - -def terminar_carga(zip,db,zips_procesados): - shutil.make_archive('zips_procesados/'+zip.zip+'_procesado','zip','txts') - actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Terminado", mensaje='Nuevo archivo ZIP generado, contiene unicamente archivos esperados', db=db) \ No newline at end of file +def carga_error(zip, db, zips_procesados, directorio_txts): + actualizar_estado(id_carga_gtfs=zip.id_carga_gtfs, estado="Error", mensaje='Error al ingresar registros.', db=db) \ No newline at end of file diff --git a/cargaAPI/models/agency.py b/cargaAPI/models/agency.py index 42662f4..fc1ea46 100644 --- a/cargaAPI/models/agency.py +++ b/cargaAPI/models/agency.py @@ -1,4 +1,4 @@ -from sqlalchemy import Table,DateTime, ForeignKey, Column, String, Integer, func, Boolean, Float +from sqlalchemy import Table,DateTime, ForeignKey, Column, String, Integer, func, Boolean, Float,Text from database.db import Base from sqlalchemy.orm import relationship @@ -8,7 +8,7 @@ class Agency(Base): id_carga_gtfs = Column(Integer, ForeignKey('carga_gtfs.id_carga_gtfs'), nullable=True) agency_id = Column(String(255), primary_key=True) agency_name = Column(String(255)) - agency_url = Column(String(255)) + agency_url = Column(Text) agency_timezone = Column(String(50)) agency_lang = Column(String(10)) agency_phone = Column(String(50)) diff --git a/cargaAPI/models/calendar.py b/cargaAPI/models/calendar.py index 1bcaa32..05f750b 100644 --- a/cargaAPI/models/calendar.py +++ b/cargaAPI/models/calendar.py @@ -1,6 +1,5 @@ from sqlalchemy import ForeignKey, Column, String, Integer from database.db import Base -from sqlalchemy.orm import relationship class Calendar(Base): __tablename__ = 'calendar' diff --git a/cargaAPI/router/router.py b/cargaAPI/router/router.py index ba6dde7..07e6d8a 100644 --- a/cargaAPI/router/router.py +++ b/cargaAPI/router/router.py @@ -1,8 +1,21 @@ -from fastapi import APIRouter, Depends -from fastapi.responses import JSONResponse -from sqlalchemy.orm import sessionmaker,Session -from database.db import engine,Base -from carga.procesos import extraer_zip_futuro,extraer_zip,identificar_zip,inspeccionar_txts,transformar_txts,verificar_registros_bd,primera_carga,extraer_db,verificar_registros_antiguos,verificar_registros_nuevos,terminar_carga +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.exc import SQLAlchemyError, InvalidRequestError +from database.db import engine, Base +from carga.procesos import ( + extraer_zip, + identificar_zip, + inspeccionar_txts, + transformar_txts, + verificar_registros_db, + primera_carga, + extraer_db, + verificar_registros_obsoletos, + terminar_carga, + verificar_agregar_registros, + restaurar_db, + carga_error, +) Base.prepare(engine) @@ -12,83 +25,169 @@ def get_db(): try: yield db finally: - db.close() - -almacen_zips = 'zip/' -almacen_txts = 'txts/' -zips_procesados = 'zips_procesados/' + db.close() +config = { + "almacen_zips": "zip/", + "almacen_txts": "txts/", + "zips_procesados": "zips_procesados/", +} carga_gtfs = APIRouter() - @carga_gtfs.get("/") def root(): - return "Pagina Principal. Use /docs en la url para probrar la carga." + return "Pagina Principal. Use /docs en la URL para probar la carga." @carga_gtfs.post("/carga") async def carga(db: Session = Depends(get_db)): + zip_info = identificar_zip(db=db) + zip_ruta = config["almacen_zips"] + zip_info.zip + ".zip" - zip_info = identificar_zip(db=db) - zip_ruta = almacen_zips + zip_info.zip + ".zip" - - extraer_zip(zip=zip_info,zip_ruta=zip_ruta,db=db) - inspeccionar_txts(zip=zip_info,almacen_txts=almacen_txts,zip_ruta=zip_ruta,db=db) - df_agency, df_calendar, df_routes,df_stop_times, df_stops, df_trips, df_shapes, df_shapes_reference = transformar_txts(zip=zip_info, db=db) - bd_vacia = verificar_registros_bd(db=db) - if bd_vacia: - primera_carga(zip=zip_info, - engine=engine, - df_agency=df_agency, - df_calendar=df_calendar, - df_routes=df_routes, - df_stop_times=df_stop_times, - df_stops=df_stops, - df_trips=df_trips, - df_shapes=df_shapes, - df_shapes_reference=df_shapes_reference, - db=db) - else: - df_db_agency,df_db_routes,df_db_calendar,df_db_stops,df_db_shapes_reference,df_db_shapes,df_db_trips,df_db_stop_times = extraer_db(engine=engine) + extraer_zip(zip=zip_info, zip_ruta=zip_ruta, db=db) - verificar_registros_antiguos(zip=zip_info, - df_agency=df_agency, - df_calendar=df_calendar, - df_routes=df_routes, - df_stop_times=df_stop_times, - df_stops=df_stops, - df_trips=df_trips, - df_shapes=df_shapes, - df_shapes_reference=df_shapes_reference, - df_db_agency=df_db_agency, - df_db_routes=df_db_routes, - df_db_calendar=df_db_calendar, - df_db_stops=df_db_stops, - df_db_shapes_reference=df_db_shapes_reference, - df_db_shapes=df_db_shapes, - df_db_trips=df_db_trips, - df_db_stop_times=df_db_stop_times, - db=db) - - verificar_registros_nuevos(zip=zip_info, - df_agency=df_agency, - df_calendar=df_calendar, - df_routes=df_routes, - df_stop_times=df_stop_times, - df_stops=df_stops, - df_trips=df_trips, - df_shapes=df_shapes, - df_shapes_reference=df_shapes_reference, - df_db_agency=df_db_agency, - df_db_routes=df_db_routes, - df_db_calendar=df_db_calendar, - df_db_stops=df_db_stops, - df_db_shapes_reference=df_db_shapes_reference, - df_db_shapes=df_db_shapes, - df_db_trips=df_db_trips, - df_db_stop_times=df_db_stop_times, - db=db) - - terminar_carga(zip=zip_info,zips_procesados=zips_procesados,db=db) + inspeccionar_txts(zip=zip_info, almacen_txts=config["almacen_txts"], zip_ruta=zip_ruta, db=db) + df_agency, df_calendar, df_routes, df_stop_times, df_stops, df_trips, df_shapes, df_shapes_reference = transformar_txts( + zip=zip_info, db=db + ) - return "Proceso Finalizado" \ No newline at end of file + ( + db_inicial_agency, + db_inicial_routes, + db_inicial_calendar, + db_inicial_stops, + db_inicial_shapes_reference, + db_inicial_shapes, + db_inicial_trips, + db_inicial_stop_times, + ) = extraer_db(engine=engine) + + bd_vacia = verificar_registros_db(db=db) + + if bd_vacia: + primera_carga( + zip=zip_info, + db=db, + engine=engine, + df_agency=df_agency, + df_calendar=df_calendar, + df_routes=df_routes, + df_stop_times=df_stop_times, + df_stops=df_stops, + df_trips=df_trips, + df_shapes=df_shapes, + df_shapes_reference=df_shapes_reference, + ) + + terminar_carga(zip=zip_info, db=db, zips_procesados=config["zips_procesados"], directorio_txts=config["almacen_txts"]) + return "Terminó correctamente" + else: + try: + ( + df_db_agency, + df_db_routes, + df_db_calendar, + df_db_stops, + df_db_shapes_reference, + df_db_shapes, + df_db_trips, + df_db_stop_times, + ) = extraer_db(engine=engine) + + verificar_registros_obsoletos( + zip=zip_info, + df_agency=df_agency, + df_calendar=df_calendar, + df_routes=df_routes, + df_stop_times=df_stop_times, + df_stops=df_stops, + df_trips=df_trips, + df_shapes=df_shapes, + df_shapes_reference=df_shapes_reference, + df_db_agency=df_db_agency, + df_db_routes=df_db_routes, + df_db_calendar=df_db_calendar, + df_db_stops=df_db_stops, + df_db_shapes_reference=df_db_shapes_reference, + df_db_shapes=df_db_shapes, + df_db_trips=df_db_trips, + df_db_stop_times=df_db_stop_times, + db=db, + ) + + ( + df_db_agency, + df_db_routes, + df_db_calendar, + df_db_stops, + df_db_shapes_reference, + df_db_shapes, + df_db_trips, + df_db_stop_times, + ) = extraer_db(engine=engine) + + verificar_agregar_registros( + zip=zip_info, + df_agency=df_agency, + df_calendar=df_calendar, + df_routes=df_routes, + df_stop_times=df_stop_times, + df_stops=df_stops, + df_trips=df_trips, + df_shapes=df_shapes, + df_shapes_reference=df_shapes_reference, + df_db_agency=df_db_agency, + df_db_routes=df_db_routes, + df_db_calendar=df_db_calendar, + df_db_stops=df_db_stops, + df_db_shapes_reference=df_db_shapes_reference, + df_db_shapes=df_db_shapes, + df_db_trips=df_db_trips, + df_db_stop_times=df_db_stop_times, + db=db, + ) + + terminar_carga(zip=zip_info, db=db, zips_procesados=config["zips_procesados"], directorio_txts=config["almacen_txts"]) + return "La carga termino con exito." + + + except SQLAlchemyError as e: + + restaurar_db(db) + + primera_carga( + zip=zip_info, + db=db, + engine=engine, + df_agency=db_inicial_agency, + df_calendar=db_inicial_calendar, + df_routes=db_inicial_routes, + df_stop_times=db_inicial_stop_times, + df_stops=db_inicial_stops, + df_trips=db_inicial_trips, + df_shapes=db_inicial_shapes, + df_shapes_reference=db_inicial_shapes_reference, + ) + + carga_error(zip=zip_info, db=db, zips_procesados=config["zips_procesados"], directorio_txts=config["almacen_txts"]) + raise HTTPException(status_code=500, detail=f"Error durante la carga: {str(e)}") + + except Exception as e: + restaurar_db(db) + + primera_carga( + zip=zip_info, + db=db, + engine=engine, + df_agency=db_inicial_agency, + df_calendar=db_inicial_calendar, + df_routes=db_inicial_routes, + df_stop_times=db_inicial_stop_times, + df_stops=db_inicial_stops, + df_trips=db_inicial_trips, + df_shapes=db_inicial_shapes, + df_shapes_reference=db_inicial_shapes_reference, + ) + + carga_error(zip=zip_info, db=db, zips_procesados=config["zips_procesados"], directorio_txts=config["almacen_txts"]) + raise HTTPException(status_code=500, detail=f"Error durante la carga: {str(e)}") diff --git a/cargaAPI/txts/.empty b/cargaAPI/txts/.empty new file mode 100644 index 0000000..e69de29 diff --git a/cargaAPI/zip/.empty b/cargaAPI/zip/.empty new file mode 100644 index 0000000..e69de29 diff --git a/cargaAPI/zips_procesados/.empty b/cargaAPI/zips_procesados/.empty new file mode 100644 index 0000000..e69de29 diff --git a/gtfsAPI/models/agency.py b/gtfsAPI/models/agency.py index e3d3388..58e79ca 100644 --- a/gtfsAPI/models/agency.py +++ b/gtfsAPI/models/agency.py @@ -1,4 +1,4 @@ -from sqlalchemy import Table,DateTime, ForeignKey, Column, String, Integer, func, Boolean, Float +from sqlalchemy import Table,DateTime, ForeignKey, Column, String, Integer, func, Boolean, Float, Text from database.db import Base from sqlalchemy.orm import relationship @@ -8,7 +8,7 @@ class Agency(Base): id_carga_gtfs = Column(Integer, ForeignKey('carga_gtfs.id_carga_gtfs'), nullable=True) agency_id = Column(String(255), primary_key=True) agency_name = Column(String(255)) - agency_url = Column(String(255)) + agency_url = Column(Text) agency_timezone = Column(String(50)) agency_lang = Column(String(10)) agency_phone = Column(String(50))