# coding: utf-8 import asyncio import time import re import sys import os import aiosmtplib import traceback import dkim from email.parser import BytesParser from email.policy import default from aiosmtpd.controller import Controller from sqlalchemy.future import select from sqlalchemy import delete from .model import db, update_mx, update_a, FQDN, MXRecord, ARecord, IPV4Addr, Direccion, Destinatario, Carta from .registro import log from .smtpd import ilabHandler from .registro import log from .resolver import updateDNS smtprelayport = '10025' bindip = '0.0.0.0' dkimKey = None # cacerts = '/etc/ssl/certs/ca-certificates.crt' # keyfile = '/var/run/secrets/keyfile' # certfile = '/var/run/secrets/certfile' if not os.environ.get('SMTP_HOSTNAME'): banner_hostname = 'midominio.cl' else: banner_hostname = os.environ.get('SMTP_HOSTNAME') async def enviarCorreosDominio(dominioid): valido = int(time.time()) if os.environ.get('DKIM_PRIVKEY'): try: await log.debug('Leyendo llave: {}.'.format(os.environ.get('DKIM_PRIVKEY'))) dkimKey = open(os.environ.get('DKIM_PRIVKEY'), 'r').read().encode() except: await log.warning('Error al abrir el archivo: {}.'.format(os.environ.get('DKIM_PRIVKEY'))) await log.warning('Traceback {}'.format(traceback.format_exc())) try: await log.debug('Enviando correos dominio {}'.format(dominioid)) indices = [] servidores = await db.execute(select(ARecord).where(ARecord.fqdnid.in_(select(MXRecord.fqdnmxid).where(MXRecord.fqdnid==dominioid, MXRecord.validohasta>valido)))) for arecordx in servidores.scalars(): indice = arecordx.enviados + arecordx.errores*20 indices.append((indice, arecordx)) # await log.debug('El dominio tiene un total de {} servidores'.format(len(indices))) for _, arecord in sorted(indices, key=lambda tup: tup[0]): ipresult = await db.execute(select(IPV4Addr).where(IPV4Addr.id==arecord.ipv4id)) dbdireccion = ipresult.scalar_one_or_none() try: conectado = False try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), local_hostname=banner_hostname, port=25, start_tls=True, use_tls=False, validate_certs=False, timeout=10) await smtp.connect() conectado = True await log.debug('Conectado a START_TLS: 25 {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor start_tls 25: {}'.format(e)) if conectado == False: try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), local_hostname=banner_hostname, port=465, use_tls=True, validate_certs=False, timeout=10) await smtp.connect() conectado = True await log.debug('Conectado a USE_TLS 456: {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor start_tls 465: {}'.format(e)) if conectado == False: try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), local_hostname=banner_hostname, port=587, start_tls=True, validate_certs=False, timeout=10) await smtp.connect() conectado = True await log.debug('Conectado a START_TLS 587: {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor start_tls 587: {}'.format(e)) if conectado == False: try: smtp = aiosmtplib.SMTP(hostname=str(dbdireccion.ipaddr), timeout=10) await smtp.connect() await smtp.helo(banner_hostname) conectado = True await log.debug('Conectado a capela: {}'.format(dbdireccion.ipaddr)) except Exception as e: conectado = False await log.debug('Error al conectar al servidor {}: {}'.format(dbdireccion.ipaddr, e)) arecord.errores = arecord.errores + 1 await db.commit() continue limit = 30 await log.debug('Conectado a SMTP({}) '.format(dbdireccion.ipaddr)) rcartas = await db.execute(select(Carta).join(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.enviado==0)) for carta in rcartas.scalars(): # await log.debug('Componiendo Carta {} '.format(carta.id)) if limit < 3: await log.info('Se mandaron {} correos a un servidor. Pedimos uno nuevo para seguir.'.format(30-limit)) return False rteresult = await db.execute(select(Direccion).where(Direccion.id==carta.remitenteid)) remitente = rteresult.scalar_one_or_none() rcpt_to = [] rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0)) for destinatario in rdest.scalars(): destresult = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid)) rcpt_to.append( str(destresult.scalar_one_or_none().direccion) ) destinatario.intentos = destinatario.intentos + 1 limit -= 1 if limit <= 0: break if len(rcpt_to) == 0: continue await db.commit() rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0)) await log.info("Carta Rte '{}' => Destinatarios '{}' ".format(remitente.direccion, ', '.join(rcpt_to))) try: if dkimKey is not None: mail2send = BytesParser(policy=default).parsebytes(carta.contenido.encode('utf-8')) signature = dkim.sign(mail2send.as_bytes(), b'mayordomo', b'ilab.cl', dkimKey, include_headers=[b'from', b'to', b'subject', b'message-id']).decode().replace("\r\n",'') mail2send['DKIM-Signature'] = signature[len("DKIM-Signature: "):] await log.debug("Signature: {}".format(mail2send['DKIM-Signature'])) await smtp.sendmail(remitente.direccion, rcpt_to, mail2send.as_bytes()) else: await smtp.sendmail(remitente.direccion, rcpt_to, carta.contenido.encode('utf-8')) for destinatario in rdest.scalars(): destinatario.enviado = 1 arecord.enviados = arecord.enviados + 1 except Exception as e: await log.warning('Error al enviar el correo {}'.format(e)) await log.warning('Traceback {}'.format(traceback.format_exc())) for destinatario in rdest.scalars(): if destinatario.intentos > 2: destinatario.enviado = 2 arecord.errores = arecord.errores + 1 await db.commit() await smtp.quit() return True except Exception as e: await log.warning('Error en el servidor {}'.format(e)) arecord.errores = arecord.errores + 1 await db.commit() except: await log.warning('Traceback {}'.format(traceback.format_exc())) return False async def enviaCorreos(): try: rdestino = await db.execute(select(Destinatario).join(Direccion).join(FQDN).where(Destinatario.enviado==0).distinct(FQDN.id)) for destinatario in rdestino.scalars(): result = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid)) dbemail = result.scalar_one_or_none() if not await enviarCorreosDominio(dbemail.dominioid): destinatario.intentos = destinatario.intentos + 1 if destinatario.intentos > 2: destinatario.enviado = 2 db.commit() except: await log.error('Traceback {}'.format(traceback.format_exc())) async def cleanupSent(): from datetime import datetime, timedelta returnvalue = True oneMonth = datetime.now() - timedelta(days = 60) dest = await db.execute(select(Destinatario).where(Destinatario.timestamp0).limit(1)) for destinatario in dest.scalars(): await log.debug('Deleting Dest: {}'.format(destinatario.id)) cartaid = destinatario.cartaid await db.execute(delete(Destinatario).where(Destinatario.id==destinatario.id)) await db.commit() result = await db.execute(select(Destinatario).where(Destinatario.cartaid==cartaid).limit(1)) more = result.scalar_one_or_none() if more is None: await log.info('Deleting Carta: {}'.format(cartaid)) await db.execute(delete(Carta).where(Carta.id==cartaid)) await db.commit() returnvalue = False return returnvalue def create_async_smtp_server(): return Controller(ilabHandler(), hostname=bindip, port=smtprelayport) async def pre_process(): try: if await updateDNS(): return True except: pass return False if __name__ == '__main__': mayordomo = create_async_smtp_server() mayordomo.start() input(u'SMTP server (Mayordomo) esta operativo') mayordomo.stop()