pysmtp/mayordomo/model.py

230 lines
7.3 KiB
Python

# coding: utf-8
import asyncio
import time
import re
import sys
import os
from .registro import log
from sqlalchemy.future import select
from sqlalchemy import MetaData, Column, ForeignKey, Enum, UniqueConstraint, Integer, String, Text, DateTime, PickleType
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship as Relationship, sessionmaker
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
if not os.environ.get('SQLALCHEMY_DATABASE_URI'):
database_uri = 'postgresql+asyncpg://docker:docker@db/docker'
else:
database_uri = os.environ.get('SQLALCHEMY_DATABASE_URI')
engine = create_async_engine(database_uri, echo=False)
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
db = async_session()
Base = declarative_base()
async def get_origen(ip, hostname):
result = await db.execute(select(FQDN).where(FQDN.fqdn==hostname))
fqdn = result.scalar_one_or_none()
if fqdn == None:
fqdn = FQDN(fqdn=hostname)
db.add(fqdn)
await db.commit()
result = await db.execute(select(IPV4Addr).where(IPV4Addr.ipaddr==ip))
ipv4addr = result.scalar_one_or_none()
if ipv4addr == None:
ipv4addr = IPV4Addr(ipaddr=ip)
db.add(ipv4addr)
await db.commit()
result = await db.execute(select(ARecord).where(ARecord.ipv4id==ipv4addr.id, ARecord.fqdnid==fqdn.id))
arecord = result.scalar_one_or_none()
if arecord == None:
arecord = ARecord(ipv4id=ipv4addr.id, fqdnid=fqdn.id, validohasta=int(time.time()), recibidos=0, enviados=0, errores=0)
db.add(arecord)
await db.commit()
return arecord
async def validate_direccion(email):
await log.info(u"validate '{}'".format(email))
if len(email) > 7:
if re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", email) != None:
direccion = email.strip()
result = await db.execute(select(Direccion).where(Direccion.direccion==direccion))
dbemail = result.scalar_one_or_none()
if dbemail is None:
local, domain = email.split('@')
result = await db.execute(select(FQDN).where(FQDN.fqdn==domain))
dbdomain = result.scalar_one_or_none()
if dbdomain is None:
dbdomain = FQDN(fqdn=domain)
db.add(dbdomain)
await db.commit()
dbemail = Direccion(direccion=direccion, dominioid=dbdomain.id)
db.add(dbemail)
await db.commit()
await log.debug(u"Agregando la dirección: '{}'".format(direccion))
return dbemail
return None
async def update_a(fqdnid, ipv4, ttl):
result = await db.execute(select(FQDN).where(FQDN.id==fqdnid))
dbfqdn = result.scalar_one_or_none()
result = await db.execute(select(IPV4Addr).where(IPV4Addr.ipaddr==ipv4))
dbip = result.scalar_one_or_none()
if dbip is None:
dbip = IPV4Addr(ipaddr=ipv4)
db.add(dbip)
await db.commit()
result = await db.execute(select(ARecord).where(ARecord.ipv4id==dbip.id, ARecord.fqdnid==dbfqdn.id))
dbarecord = result.scalar_one_or_none()
if dbarecord is None:
dbarecord = ARecord(ipv4id=dbip.id, fqdnid=dbfqdn.id, recibidos=0, enviados=0, errores=0,sesiones=0, validohasta=ttl)
db.add(dbarecord)
else:
dbarecord.validohasta=ttl
await db.commit()
return dbarecord
async def update_mx(fqdnid, mxfqdn, prioridad):
result = await db.execute(select(FQDN).where(FQDN.id==fqdnid))
dbdomain = result.scalar_one_or_none()
result = await db.execute(select(FQDN).where(FQDN.fqdn==mxfqdn))
dbmxdomain = result.scalar_one_or_none()
if dbmxdomain is None:
dbmxdomain = FQDN(fqdn=mxfqdn)
db.add(dbmxdomain)
await db.commit()
result = await db.execute(select(MXRecord).where(MXRecord.fqdnid==dbdomain.id, MXRecord.fqdnmxid==dbmxdomain.id))
dbmxrecord = result.scalar_one_or_none()
if dbmxrecord is None:
dbmxrecord = MXRecord(fqdnid=dbdomain.id, fqdnmxid=dbmxdomain.id, prioridad=prioridad)
db.add(dbmxrecord)
else:
dbmxrecord.prioridad=prioridad
await db.commit()
return dbmxrecord
class Direccion(Base):
__tablename__ = 'direcciones'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
direccion = Column(String, nullable=False, unique=True)
dominioid = Column(Integer, ForeignKey('correos.fqdns.id'), nullable=False)
nombre = Column(String)
dominio = Relationship("FQDN")
class Destinatario(Base):
__tablename__ = 'destinatarios'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
direccionid = Column(Integer, ForeignKey('correos.direcciones.id'), nullable=False)
cartaid = Column(Integer, ForeignKey('correos.cartas.id'), nullable=False)
enviado = Column(Integer, default=0)
intentos = Column(Integer, default=0)
timestamp = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now())
correo = Relationship("Direccion")
carta = Relationship("Carta", back_populates='destinatarios')
___table_args__ = (UniqueConstraint('direccionid', 'cartaid', name='_direccion_carta_uc'),{ 'schema': 'correos' })
class Carta(Base):
__tablename__ = 'cartas'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
remitenteid = Column(Integer, ForeignKey('correos.direcciones.id'), nullable=False)
contenido = Column(Text)
recibido = Column(DateTime(timezone=True), default=func.now())
remitente = Relationship("Direccion")
destinatarios = Relationship("Destinatario")
class FQDN(Base):
__tablename__ = 'fqdns'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
fqdn = Column(String, unique=True)
aRecords = Relationship("ARecord")
mxRecords = Relationship("MXRecord")
class MXRecord(Base):
__tablename__ = 'mxrecords'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
fqdnid = Column(Integer, ForeignKey('correos.fqdns.id'), nullable=False)
fqdnmxid = Column(Integer, nullable=False)
prioridad = Column(Integer, default=10000)
validohasta = Column(Integer, default=0)
dominio = Relationship("FQDN", back_populates='mxRecords')
# servidorMX = Relationship("FQDN", foreign_keys=[fqdnmxid])
# aRecords = Relationship("ARecord")
___table_args__ = (UniqueConstraint('dominioid', 'mxid', name='_dominioid_mxid_uc'),{ 'schema': 'correos' })
class ARecord(Base):
__tablename__ = 'arecords'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
fqdnid = Column(Integer, ForeignKey('correos.fqdns.id'), nullable=False)
ipv4id = Column(Integer, ForeignKey('correos.ipv4addrs.id'), nullable=False)
sesiones = Column(Integer, default=0)
recibidos = Column(Integer, default=0)
enviados = Column(Integer, default=0)
errores = Column(Integer, default=0)
validohasta = Column(Integer, default=0)
fqdn = Relationship("FQDN", back_populates='aRecords')
ipv4s = Relationship("IPV4Addr", back_populates='aRecords')
___table_args__ = (UniqueConstraint('fqdnid', 'ipv4id', name='_fqdnid_ipv4id_uc'),{ 'schema': 'correos' })
class IPV4Addr(Base):
__tablename__ = 'ipv4addrs'
__table_args__ = { 'schema': 'correos' }
id = Column(Integer, primary_key=True)
ipaddr = Column(String(15).with_variant(postgresql.INET(), 'postgresql'), nullable=False, unique=True)
aRecords = Relationship("ARecord", back_populates='ipv4s')