from apscheduler.schedulers.background import BlockingScheduler from fetcher.model.feed import db, Registros from tempfile import NamedTemporaryFile from shutil import copyfile from datetime import datetime from zoneinfo import ZoneInfo from sqlalchemy.exc import SQLAlchemyError import requests import logging import time import sys import os log = logging.getLogger('Fetcher') log.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) log.addHandler(handler) def load_proto(): inicio = datetime.now(tz=ZoneInfo("America/Santiago")) final_dir = "static/ccp/{}/{}/{}".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d")) final_name = "static/ccp/{}/{}/{}/ccp_gtfs_{}.proto".format(inicio.strftime("%Y"), inicio.strftime("%m"), inicio.strftime("%d"), inicio.strftime("%Y%m%d_%H%M_%S")) try: response = requests.get("{}?apikey={}".format(os.environ.get('GTFS_RT_ENDPOINT'), os.environ.get('GTFS_RT_KEY')), timeout=15) response.raise_for_status() with NamedTemporaryFile() as tmp: tmp.write(response.content) os.makedirs(os.path.abspath(final_dir), exist_ok=True) copyfile(tmp.name, os.path.abspath(final_name)) nuevo_registro = Registros(filename=final_name, status=0) db.add(nuevo_registro) db.commit() log.info("Fetched GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds())) except PendingRollbackError: db.rollback() except SQLAlchemyError as e: import traceback log.error("Failed to fetch GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds())) log.info('Traceback {}'.format(traceback.format_exc())) raise(e) except: import traceback log.error("Failed to fetch GTFS-RT Record {} in {}s".format(inicio.strftime("%Y%m%d_%H%M_%S"), (datetime.now(tz=ZoneInfo("America/Santiago"))-inicio).total_seconds())) log.info('Traceback {}'.format(traceback.format_exc())) nuevo_registro = Registros(filename=final_name, status=100) db.add(nuevo_registro) db.commit() sched = BlockingScheduler() sched.add_job(load_proto, 'interval', seconds=int(os.environ.get('GTFS_RT_INTERVAL'))) #will do the print_t work for every 30 seconds