Limitar correos mandados a un solo servidor cada envío. Corregir la logica de actualización de direcciones. Corregir logica de envío de una misma carta a varios destinatarios en un mismo dominio.
parent
e08694f6c8
commit
83bfa0e8f3
|
@ -70,10 +70,14 @@ async def enviarCorreosDominio(dominioid):
|
||||||
await log.debug('Error al conectar al servidor {}: {}'.format(dbdireccion.ipaddr, e))
|
await log.debug('Error al conectar al servidor {}: {}'.format(dbdireccion.ipaddr, e))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
limit = 30
|
||||||
await log.debug('Conectado a SMTP({}) '.format(dbdireccion.ipaddr))
|
await log.debug('Conectado a SMTP({}) '.format(dbdireccion.ipaddr))
|
||||||
rcartas = await db.execute(select(Carta).join(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.enviado==0))
|
rcartas = await db.execute(select(Carta).join(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.enviado==0))
|
||||||
for carta in rcartas.scalars():
|
for carta in rcartas.scalars():
|
||||||
# await log.debug('Componiendo Carta {} '.format(carta.id))
|
# await log.debug('Componiendo Carta {} '.format(carta.id))
|
||||||
|
if limit < 3:
|
||||||
|
await log.info('Se mandaron {} correos a un servidor. Pedimos uno nuevo para seguir.'.format(30-limit))
|
||||||
|
return False
|
||||||
|
|
||||||
rteresult = await db.execute(select(Direccion).where(Direccion.id==carta.remitenteid))
|
rteresult = await db.execute(select(Direccion).where(Direccion.id==carta.remitenteid))
|
||||||
remitente = rteresult.scalar_one_or_none()
|
remitente = rteresult.scalar_one_or_none()
|
||||||
|
@ -84,6 +88,13 @@ async def enviarCorreosDominio(dominioid):
|
||||||
destresult = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid))
|
destresult = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid))
|
||||||
rcpt_to.append( str(destresult.scalar_one_or_none().direccion) )
|
rcpt_to.append( str(destresult.scalar_one_or_none().direccion) )
|
||||||
destinatario.intentos = destinatario.intentos + 1
|
destinatario.intentos = destinatario.intentos + 1
|
||||||
|
limit -= 1
|
||||||
|
if limit <= 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(rcpt_to) == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0))
|
rdest = await db.execute(select(Destinatario).join(Direccion).where(Direccion.dominioid==dominioid, Destinatario.cartaid==carta.id, Destinatario.enviado==0))
|
||||||
|
|
||||||
|
|
|
@ -17,37 +17,48 @@ else:
|
||||||
dns_relay = os.environ.get('DNS_RELAY')
|
dns_relay = os.environ.get('DNS_RELAY')
|
||||||
|
|
||||||
async def updateDNS():
|
async def updateDNS():
|
||||||
updateExitoso = True
|
|
||||||
pendientes = 0
|
pendientes = 0
|
||||||
|
destinos = 0
|
||||||
# await log.debug('Updatedns')
|
# await log.debug('Updatedns')
|
||||||
try:
|
try:
|
||||||
rdestino = await db.execute(select(Destinatario).join(Direccion).join(FQDN).where(Destinatario.enviado==0).distinct(FQDN.id))
|
rdestino = await db.execute(select(Destinatario).join(Direccion).join(FQDN).where(Destinatario.enviado==0).distinct(FQDN.id))
|
||||||
for destinatario in rdestino.scalars():
|
for destinatario in rdestino.scalars():
|
||||||
# await log.info('Destinatario {}'.format(destinatario.direccionid))
|
|
||||||
pendientes = pendientes + 1
|
|
||||||
result = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid))
|
result = await db.execute(select(Direccion).where(Direccion.id==destinatario.direccionid))
|
||||||
dbemail = result.scalar_one_or_none()
|
dbemail = result.scalar_one_or_none()
|
||||||
|
destinos += 1
|
||||||
|
|
||||||
tieneMXValido = False
|
tieneMXValido = False
|
||||||
valido = int(time.time())
|
valido = int(time.time())
|
||||||
|
|
||||||
|
retries = 3
|
||||||
|
while tiemeMXValido is False and retries > 0:
|
||||||
|
retries -= 1
|
||||||
dbmx = await db.execute(select(MXRecord).where(MXRecord.fqdnid==dbemail.dominioid))
|
dbmx = await db.execute(select(MXRecord).where(MXRecord.fqdnid==dbemail.dominioid))
|
||||||
for mx_record in dbmx.scalars():
|
for mx_record in dbmx.scalars():
|
||||||
# await log.debug('mxrecord: {}'.format(mx_record.id))
|
|
||||||
if mx_record.validohasta > valido:
|
if mx_record.validohasta > valido:
|
||||||
tieneMXValido = True
|
tieneMXValido = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if tiemeMXValido is False:
|
||||||
|
updateExitoso = updateExitoso and await servidores_correo(dbemail.dominioid)
|
||||||
|
|
||||||
|
if retries <= 0:
|
||||||
|
await log.error('Error al actualizar los servidores MX de {}'.format(dbemail.direccion))
|
||||||
|
pendientes += 1
|
||||||
|
|
||||||
|
|
||||||
if tieneMXValido is not True:
|
|
||||||
await servidores_correo(dbemail.dominioid)
|
|
||||||
updateExitoso = False
|
|
||||||
await log.info('Actualización Incompleta')
|
|
||||||
except:
|
except:
|
||||||
await log.error('Traceback {}'.format(traceback.format_exc()))
|
await log.error('Traceback {}'.format(traceback.format_exc()))
|
||||||
|
|
||||||
if tieneMXValido is True and pendientes > 0:
|
if destinos>0 and pendientes == 0:
|
||||||
await log.info('Actualización exitosa')
|
await log.info('Todos los servidores MX estan disponibles para enviar los correos')
|
||||||
|
elif destinos>0 and pendientes>0:
|
||||||
|
await log.info('Algunos servidores MX no pudieron resolverse')
|
||||||
|
|
||||||
return updateExitoso
|
if destinos > pendientes:
|
||||||
|
return True #hay correos que mandar
|
||||||
|
else:
|
||||||
|
return False #nada que hacer
|
||||||
|
|
||||||
async def servidores_correo(dominioid):
|
async def servidores_correo(dominioid):
|
||||||
|
|
||||||
|
@ -57,8 +68,8 @@ async def servidores_correo(dominioid):
|
||||||
servidores = await resolver(types.MX, dominio.fqdn)
|
servidores = await resolver(types.MX, dominio.fqdn)
|
||||||
except:
|
except:
|
||||||
# await log.error('Traceback {}'.format(traceback.format_exc()))
|
# await log.error('Traceback {}'.format(traceback.format_exc()))
|
||||||
await log.warning("No se pudo resolver los servidores de correo de '{}'".format(dominio.fqdn))
|
await log.error("No se pudo resolver los servidores de correo de '{}'".format(dominio.fqdn))
|
||||||
return
|
return False
|
||||||
|
|
||||||
await log.info("Actualizando los servidores de correo de: {}".format(dominio.fqdn))
|
await log.info("Actualizando los servidores de correo de: {}".format(dominio.fqdn))
|
||||||
|
|
||||||
|
@ -66,7 +77,7 @@ async def servidores_correo(dominioid):
|
||||||
typename = host.data.type_name
|
typename = host.data.type_name
|
||||||
prioridad, fqdn = host.data.data
|
prioridad, fqdn = host.data.data
|
||||||
|
|
||||||
tiempo = int(time.time())+600
|
tiempo = int(time.time())+10 #add the 10, because of the sleep delay
|
||||||
validohasta = tiempo
|
validohasta = tiempo
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -94,6 +105,7 @@ async def servidores_correo(dominioid):
|
||||||
|
|
||||||
dbmx.validohasta = validohasta
|
dbmx.validohasta = validohasta
|
||||||
await db.commit()
|
await db.commit()
|
||||||
|
return True
|
||||||
# await log.debug("Servidor {}: '{}' prioridad {} validohasta: {}".format(typename, fqdn, prioridad, dbmx.validohasta))
|
# await log.debug("Servidor {}: '{}' prioridad {} validohasta: {}".format(typename, fqdn, prioridad, dbmx.validohasta))
|
||||||
|
|
||||||
async def resolver(tipo=types.A, fqdn='ilab.cl'):
|
async def resolver(tipo=types.A, fqdn='ilab.cl'):
|
||||||
|
@ -111,11 +123,10 @@ async def resolver(tipo=types.A, fqdn='ilab.cl'):
|
||||||
# else:
|
# else:
|
||||||
# await log.debug('data => {}'.format(valor.data))
|
# await log.debug('data => {}'.format(valor.data))
|
||||||
# prioridad, fqdn = valor.data.data
|
# prioridad, fqdn = valor.data.data
|
||||||
|
# return res
|
||||||
return res
|
|
||||||
except asyncio.exceptions.TimeoutError:
|
except asyncio.exceptions.TimeoutError:
|
||||||
retry = retry - 1
|
retry = retry - 1
|
||||||
log.info(u'Quedan {} intentos'.format(retry))
|
log.warning(u'Quedan {} intentos'.format(retry))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
raise asyncio.exceptions.TimeoutError('Intentos excedidos')
|
raise asyncio.exceptions.TimeoutError('Intentos excedidos')
|
||||||
|
|
Loading…
Reference in New Issue