# coding: utf-8 import asyncio import time import re import sys import os import aiosmtplib import traceback import dkim from email.parser import Parser from email.policy import default from aiosmtpd.controller import Controller from sqlalchemy.future import select from .model import db, update_mx, update_a, FQDN, MXRecord, ARecord, IPV4Addr, Direccion, Destinatario, Carta from .registro import log from .smtpd import ilabHandler from .registro import log from .resolver import updateDNS smtprelayport = '10025' bindip = '0.0.0.0' dkimKey = None # cacerts = '/etc/ssl/certs/ca-certificates.crt' # keyfile = '/var/run/secrets/keyfile' # certfile = '/var/run/secrets/certfile' if not os.environ.get('SMTP_HOSTNAME'): banner_hostname = 'midominio.cl' else: banner_hostname = os.environ.get('SMTP_HOSTNAME') async def enviarCorreosDominio(dominioid): valido = int(time.time()) if os.environ.get('DKIM_PRIVKEY'): try: await log.info('Leyendo llave: {}.'.format(os.environ.get('DKIM_PRIVKEY'))) dkimKey = open(os.environ.get('DKIM_PRIVKEY'), 'r').read().encode() except: await log.warning('Error al abrir el archivo: {}.'.format(os.environ.get('DKIM_PRIVKEY'))) await log.warning('Traceback {}'.format(traceback.format_exc())) try: await log.debug('Enviando correos dominio {}'.format(dominioid)) indices = [] servidores = await db.execute(select(ARecord).where(ARecord.fqdnid.in_(select(MXRecord.fqdnmxid).where(MXRecord.fqdnid==dominioid, MXRecord.validohasta>valido)))) for arecordx in servidores.scalars(): indice = arecordx.enviados + arecordx.errores*20 indices.append((indice, arecordx)) # await log.debug('El dominio tiene un total de {} servidores'.format(len(indices))) for _, arecord in sorted(indices, key=lambda tup: tup[0]): ipresult = await db.execute(select(IPV4Addr).where(IPV4Addr.id==arecord.ipv4id)) dbdireccion = ipresult.scalar_one_or_none() try: conectado = False try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), source_address=banner_hostname, port=25, start_tls=True, use_tls=False, validate_certs=False, timeout=10) await smtp.connect() conectado = True await log.debug('Conectado a START_TLS: 25 {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor start_tls 25: {}'.format(e)) if conectado == False: try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), source_address=banner_hostname, port=465, use_tls=True, validate_certs=False, timeout=10) await smtp.connect() conectado = True await log.debug('Conectado a USE_TLS 456: {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor start_tls 465: {}'.format(e)) if conectado == False: try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), source_address=banner_hostname, port=587, start_tls=True, validate_certs=False, timeout=10) await smtp.connect() conectado = True await log.debug('Conectado a START_TLS 587: {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor start_tls 587: {}'.format(e)) if conectado == False: try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), timeout=10) await smtp.connect() await smtp.helo(banner_hostname) conectado = True await log.debug('Conectado a capela: {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor {}: {}'.format(dbdireccion.ipaddr, e)) continue limit = 30 await log.debug('Conectado a SMTP({}) '.format(dbdireccion.ipaddr)) rcartas = await db.execute(select(Carta).join(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.enviado==0)) for carta in rcartas.scalars(): # await log.debug('Componiendo Carta {} '.format(carta.id)) if limit < 3: await log.info('Se mandaron {} correos a un servidor. Pedimos uno nuevo para seguir.'.format(30-limit)) return False rteresult = await db.execute(select(Direccion).where(Direccion.id==carta.remitenteid)) remitente = rteresult.scalar_one_or_none() rcpt_to = [] rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0)) for destinatario in rdest.scalars(): destresult = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid)) rcpt_to.append( str(destresult.scalar_one_or_none().direccion) ) destinatario.intentos = destinatario.intentos + 1 limit -= 1 if limit <= 0: break if len(rcpt_to) == 0: continue await db.commit() rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0)) await log.info("Carta Rte '{}' => Destinatarios '{}' ".format(remitente.direccion, ', '.join(rcpt_to))) try: if dkimKey is not None: mail2send = Parser(policy=default).parsestr(carta.contenido) signature = dkim.sign(carta.contenido, b'mayordomo', b'ilab.cl', dkimKey, include_headers=[b'from', b'to', b'subject', b'message-id']).decode() mail2send['DKIM-Signature'] = signature[len("DKIM-Signature: "):] await smtp.sendmail(remitente.direccion, rcpt_to, mail2send.encode('utf-8')) else: await smtp.sendmail(remitente.direccion, rcpt_to, carta.contenido.encode('utf-8')) for destinatario in rdest.scalars(): destinatario.enviado = 1 arecord.enviados = arecord.enviados + 1 except Exception as e: await log.warning('Error al enviar el correo {}'.format(e)) await log.warning('Traceback {}'.format(traceback.format_exc())) for destinatario in rdest.scalars(): if destinatario.intentos > 2: destinatario.enviado = 2 arecord.errores = arecord.errores + 1 await db.commit() await smtp.quit() return True except Exception as e: await log.warning('Error en el servidor {}'.format(e)) arecord.errores = arecord.errores + 1 await db.commit() except: await log.warning('Traceback {}'.format(traceback.format_exc())) return False async def enviaCorreos(): try: rdestino = await db.execute(select(Destinatario).join(Direccion).join(FQDN).where(Destinatario.enviado==0).distinct(FQDN.id)) tareas = [] for destinatario in rdestino.scalars(): result = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid)) dbemail = result.scalar_one_or_none() await enviarCorreosDominio(dbemail.dominioid) except: await log.error('Traceback {}'.format(traceback.format_exc())) def create_async_smtp_server(): handler = ilabHandler() controller = Controller(handler, hostname=bindip, port=smtprelayport) return controller async def pre_process(): try: if await updateDNS(): return True except: pass return False if __name__ == '__main__': mayordomo = create_async_smtp_server() mayordomo.start() input(u'SMTP server (Mayordomo) esta operativo') mayordomo.stop()