# coding: utf-8 import asyncio import time import re import sys import os from .registro import log from sqlalchemy.future import select from sqlalchemy import MetaData, Column, ForeignKey, Enum, UniqueConstraint, Integer, String, Text, DateTime, PickleType from sqlalchemy.sql import func from sqlalchemy.orm import relationship as Relationship, sessionmaker from sqlalchemy.dialects import postgresql from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine if not os.environ.get('SQLALCHEMY_DATABASE_URI'): database_uri = 'postgresql+asyncpg://docker:docker@db/docker' else: database_uri = os.environ.get('SQLALCHEMY_DATABASE_URI') engine = create_async_engine(database_uri, echo=False) async_session = sessionmaker( engine, expire_on_commit=False, class_=AsyncSession ) db = async_session() Base = declarative_base() async def get_origen(ip, hostname): result = await db.execute(select(FQDN).where(FQDN.fqdn==hostname)) fqdn = result.scalar_one_or_none() if fqdn == None: fqdn = FQDN(fqdn=hostname) db.add(fqdn) await db.commit() result = await db.execute(select(IPV4Addr).where(IPV4Addr.ipaddr==ip)) ipv4addr = result.scalar_one_or_none() if ipv4addr == None: ipv4addr = IPV4Addr(ipaddr=ip) db.add(ipv4addr) await db.commit() result = await db.execute(select(ARecord).where(ARecord.ipv4id==ipv4addr.id, ARecord.fqdnid==fqdn.id)) arecord = result.scalar_one_or_none() if arecord == None: arecord = ARecord(ipv4id=ipv4addr.id, fqdnid=fqdn.id, validohasta=int(time.time()), recibidos=0, enviados=0, errores=0) db.add(arecord) await db.commit() return arecord async def validate_direccion(email): await log.info(u"validate '{}'".format(email)) if len(email) > 7: if re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", email) != None: direccion = email.strip() result = await db.execute(select(Direccion).where(Direccion.direccion==direccion)) dbemail = result.scalar_one_or_none() if dbemail is None: local, domain = email.split('@') result = await db.execute(select(FQDN).where(FQDN.fqdn==domain)) dbdomain = result.scalar_one_or_none() if dbdomain is None: dbdomain = FQDN(fqdn=domain) db.add(dbdomain) await db.commit() dbemail = Direccion(direccion=direccion, dominioid=dbdomain.id) db.add(dbemail) await db.commit() await log.debug(u"Agregando la dirección: '{}'".format(direccion)) return dbemail return None async def update_a(fqdnid, ipv4, ttl): result = await db.execute(select(FQDN).where(FQDN.id==fqdnid)) dbfqdn = result.scalar_one_or_none() result = await db.execute(select(IPV4Addr).where(IPV4Addr.ipaddr==ipv4)) dbip = result.scalar_one_or_none() if dbip is None: dbip = IPV4Addr(ipaddr=ipv4) db.add(dbip) await db.commit() result = await db.execute(select(ARecord).where(ARecord.ipv4id==dbip.id, ARecord.fqdnid==dbfqdn.id)) dbarecord = result.scalar_one_or_none() if dbarecord is None: dbarecord = ARecord(ipv4id=dbip.id, fqdnid=dbfqdn.id, recibidos=0, enviados=0, errores=0,sesiones=0, validohasta=ttl) db.add(dbarecord) else: dbarecord.validohasta=ttl await db.commit() return dbarecord async def update_mx(fqdnid, mxfqdn, prioridad): result = await db.execute(select(FQDN).where(FQDN.id==fqdnid)) dbdomain = result.scalar_one_or_none() result = await db.execute(select(FQDN).where(FQDN.fqdn==mxfqdn)) dbmxdomain = result.scalar_one_or_none() if dbmxdomain is None: dbmxdomain = FQDN(fqdn=mxfqdn) db.add(dbmxdomain) await db.commit() result = await db.execute(select(MXRecord).where(MXRecord.fqdnid==dbdomain.id, MXRecord.fqdnmxid==dbmxdomain.id)) dbmxrecord = result.scalar_one_or_none() if dbmxrecord is None: dbmxrecord = MXRecord(fqdnid=dbdomain.id, fqdnmxid=dbmxdomain.id, prioridad=prioridad) db.add(dbmxrecord) else: dbmxrecord.prioridad=prioridad await db.commit() return dbmxrecord class Direccion(Base): __tablename__ = 'direcciones' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) direccion = Column(String, nullable=False, unique=True) dominioid = Column(Integer, ForeignKey('correos.fqdns.id'), nullable=False) nombre = Column(String) dominio = Relationship("FQDN") class Destinatario(Base): __tablename__ = 'destinatarios' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) direccionid = Column(Integer, ForeignKey('correos.direcciones.id'), nullable=False) cartaid = Column(Integer, ForeignKey('correos.cartas.id'), nullable=False) enviado = Column(Integer, default=0) intentos = Column(Integer, default=0) timestamp = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now()) correo = Relationship("Direccion") carta = Relationship("Carta", back_populates='destinatarios') ___table_args__ = (UniqueConstraint('direccionid', 'cartaid', name='_direccion_carta_uc'),{ 'schema': 'correos' }) class Carta(Base): __tablename__ = 'cartas' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) remitenteid = Column(Integer, ForeignKey('correos.direcciones.id'), nullable=False) contenido = Column(Text) recibido = Column(DateTime(timezone=True), default=func.now()) remitente = Relationship("Direccion") destinatarios = Relationship("Destinatario") class FQDN(Base): __tablename__ = 'fqdns' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) fqdn = Column(String, unique=True) aRecords = Relationship("ARecord") mxRecords = Relationship("MXRecord") class MXRecord(Base): __tablename__ = 'mxrecords' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) fqdnid = Column(Integer, ForeignKey('correos.fqdns.id'), nullable=False) fqdnmxid = Column(Integer, nullable=False) prioridad = Column(Integer, default=10000) validohasta = Column(Integer, default=0) dominio = Relationship("FQDN", back_populates='mxRecords') # servidorMX = Relationship("FQDN", foreign_keys=[fqdnmxid]) # aRecords = Relationship("ARecord") ___table_args__ = (UniqueConstraint('dominioid', 'mxid', name='_dominioid_mxid_uc'),{ 'schema': 'correos' }) class ARecord(Base): __tablename__ = 'arecords' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) fqdnid = Column(Integer, ForeignKey('correos.fqdns.id'), nullable=False) ipv4id = Column(Integer, ForeignKey('correos.ipv4addrs.id'), nullable=False) sesiones = Column(Integer, default=0) recibidos = Column(Integer, default=0) enviados = Column(Integer, default=0) errores = Column(Integer, default=0) validohasta = Column(Integer, default=0) fqdn = Relationship("FQDN", back_populates='aRecords') ipv4s = Relationship("IPV4Addr", back_populates='aRecords') ___table_args__ = (UniqueConstraint('fqdnid', 'ipv4id', name='_fqdnid_ipv4id_uc'),{ 'schema': 'correos' }) class IPV4Addr(Base): __tablename__ = 'ipv4addrs' __table_args__ = { 'schema': 'correos' } id = Column(Integer, primary_key=True) ipaddr = Column(String(15).with_variant(postgresql.INET(), 'postgresql'), nullable=False, unique=True) aRecords = Relationship("ARecord", back_populates='ipv4s')