pysmtp/mayordomo/__init__.py

233 lines
8.2 KiB
Python

# coding: utf-8
import asyncio
import time
import re
import sys
import os
import aiosmtplib
import traceback
import dkim
from email.parser import BytesParser
from email.policy import default
from aiosmtpd.controller import Controller
from sqlalchemy.future import select
from sqlalchemy import delete
from .model import db, update_mx, update_a, FQDN, MXRecord, ARecord, IPV4Addr, Direccion, Destinatario, Carta
from .registro import log
from .smtpd import ilabHandler
from .registro import log
from .resolver import updateDNS
smtprelayport = '10025'
bindip = '0.0.0.0'
dkimKey = None
# cacerts = '/etc/ssl/certs/ca-certificates.crt'
# keyfile = '/var/run/secrets/keyfile'
# certfile = '/var/run/secrets/certfile'
if not os.environ.get('SMTP_HOSTNAME'):
banner_hostname = 'midominio.cl'
else:
banner_hostname = os.environ.get('SMTP_HOSTNAME')
async def enviarCorreosDominio(dominioid):
valido = int(time.time())
if os.environ.get('DKIM_PRIVKEY'):
try:
await log.debug('Leyendo llave: {}.'.format(os.environ.get('DKIM_PRIVKEY')))
dkimKey = open(os.environ.get('DKIM_PRIVKEY'), 'r').read().encode()
except:
await log.warning('Error al abrir el archivo: {}.'.format(os.environ.get('DKIM_PRIVKEY')))
await log.warning('Traceback {}'.format(traceback.format_exc()))
try:
await log.debug('Enviando correos dominio {}'.format(dominioid))
indices = []
servidores = await db.execute(select(ARecord).where(ARecord.fqdnid.in_(select(MXRecord.fqdnmxid).where(MXRecord.fqdnid==dominioid, MXRecord.validohasta>valido))))
for arecordx in servidores.scalars():
indice = arecordx.enviados + arecordx.errores*20
indices.append((indice, arecordx))
# await log.debug('El dominio tiene un total de {} servidores'.format(len(indices)))
for _, arecord in sorted(indices, key=lambda tup: tup[0]):
ipresult = await db.execute(select(IPV4Addr).where(IPV4Addr.id==arecord.ipv4id))
dbdireccion = ipresult.scalar_one_or_none()
try:
conectado = False
try:
smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), local_hostname=banner_hostname, port=25, start_tls=True, use_tls=False, validate_certs=False, timeout=10)
await smtp.connect()
conectado = True
await log.debug('Conectado a START_TLS: 25 {}'.format(dbdireccion.ipaddr))
except Exception as e:
conectado = False
await log.debug('Error al conectar al servidor start_tls 25: {}'.format(e))
if conectado == False:
try:
smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), local_hostname=banner_hostname, port=465, use_tls=True, validate_certs=False, timeout=10)
await smtp.connect()
conectado = True
await log.debug('Conectado a USE_TLS 456: {}'.format(dbdireccion.ipaddr))
except Exception as e:
conectado = False
await log.debug('Error al conectar al servidor start_tls 465: {}'.format(e))
if conectado == False:
try:
smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), local_hostname=banner_hostname, port=587, start_tls=True, validate_certs=False, timeout=10)
await smtp.connect()
conectado = True
await log.debug('Conectado a START_TLS 587: {}'.format(dbdireccion.ipaddr))
except Exception as e:
conectado = False
await log.debug('Error al conectar al servidor start_tls 587: {}'.format(e))
if conectado == False:
try:
smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), timeout=10)
await smtp.connect()
await smtp.helo(banner_hostname)
conectado = True
await log.debug('Conectado a capela: {}'.format(dbdireccion.ipaddr))
except Exception as e:
conectado = False
await log.debug('Error al conectar al servidor {}: {}'.format(dbdireccion.ipaddr, e))
arecord.errores = arecord.errores + 1
await db.commit()
continue
limit = 30
await log.debug('Conectado a SMTP({}) '.format(dbdireccion.ipaddr))
rcartas = await db.execute(select(Carta).join(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.enviado==0))
for carta in rcartas.scalars():
# await log.debug('Componiendo Carta {} '.format(carta.id))
if limit < 3:
await log.info('Se mandaron {} correos a un servidor. Pedimos uno nuevo para seguir.'.format(30-limit))
return False
rteresult = await db.execute(select(Direccion).where(Direccion.id==carta.remitenteid))
remitente = rteresult.scalar_one_or_none()
rcpt_to = []
rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0))
for destinatario in rdest.scalars():
destresult = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid))
rcpt_to.append( str(destresult.scalar_one_or_none().direccion) )
destinatario.intentos = destinatario.intentos + 1
limit -= 1
if limit <= 0:
break
if len(rcpt_to) == 0:
continue
await db.commit()
rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0))
await log.info("Carta Rte '{}' => Destinatarios '{}' ".format(remitente.direccion, ', '.join(rcpt_to)))
try:
if dkimKey is not None:
mail2send = BytesParser(policy=default).parsebytes(carta.contenido.encode('utf-8'))
signature = dkim.sign(mail2send.as_bytes(), b'mayordomo', b'ilab.cl', dkimKey, include_headers=[b'from', b'to', b'subject', b'message-id']).decode().replace("\r\n",'')
mail2send['DKIM-Signature'] = signature[len("DKIM-Signature: "):]
await log.debug("Signature: {}".format(mail2send['DKIM-Signature']))
await smtp.sendmail(remitente.direccion, rcpt_to, mail2send.as_bytes())
else:
await smtp.sendmail(remitente.direccion, rcpt_to, carta.contenido.encode('utf-8'))
for destinatario in rdest.scalars():
destinatario.enviado = 1
arecord.enviados = arecord.enviados + 1
except Exception as e:
await log.warning('Error al enviar el correo {}'.format(e))
await log.warning('Traceback {}'.format(traceback.format_exc()))
for destinatario in rdest.scalars():
if destinatario.intentos > 2:
destinatario.enviado = 2
arecord.errores = arecord.errores + 1
await db.commit()
await smtp.quit()
return True
except Exception as e:
await log.warning('Error en el servidor {}'.format(e))
arecord.errores = arecord.errores + 1
await db.commit()
except:
await log.warning('Traceback {}'.format(traceback.format_exc()))
return False
async def enviaCorreos():
try:
rdestino = await db.execute(select(Destinatario).join(Direccion).join(FQDN).where(Destinatario.enviado==0).distinct(FQDN.id))
for destinatario in rdestino.scalars():
result = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid))
dbemail = result.scalar_one_or_none()
await enviarCorreosDominio(dbemail.dominioid)
except:
await log.error('Traceback {}'.format(traceback.format_exc()))
async def cleanupSent():
from datetime import datetime, timedelta
returnvalue = True
oneMonth = datetime.now() - timedelta(days = 60)
dest = await db.execute(select(Destinatario).where(Destinatario.timestamp<oneMonth, Destinatario.enviado>0).limit(1))
for destinatario in dest.scalars():
await log.debug('Deleting Dest: {}'.format(destinatario.id))
cartaid = destinatario.cartaid
await db.execute(delete(Destinatario).where(Destinatario.id==destinatario.id))
await db.commit()
result = await db.execute(select(Destinatario).where(Destinatario.cartaid==cartaid).limit(1))
more = result.scalar_one_or_none()
if more is None:
await log.info('Deleting Carta: {}'.format(cartaid))
await db.execute(delete(Carta).where(Carta.id==cartaid))
await db.commit()
returnvalue = False
return returnvalue
def create_async_smtp_server():
return Controller(ilabHandler(), hostname=bindip, port=smtprelayport)
async def pre_process():
try:
if await updateDNS():
return True
except:
pass
return False
if __name__ == '__main__':
mayordomo = create_async_smtp_server()
mayordomo.start()
input(u'SMTP server (Mayordomo) esta operativo')
mayordomo.stop()