Indietronica_Hardtech_Piton/jp_main.py

272 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import socket
import threading
import tkinter as tk
from tkinter import ttk, filedialog
import os
import time
# ============================================================
# UTILIDADES DE RED
# ============================================================
def obtener_ip_local():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "0.0.0.0"
def obtener_puerto_disponible():
for puerto in range(5000, 5101):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("0.0.0.0", puerto))
s.close()
return puerto
except:
continue
raise Exception("No hay puertos disponibles entre 50005100")
MI_IP = obtener_ip_local()
MI_PUERTO = obtener_puerto_disponible()
ANUNCIO_PUERTO = 6000 # UDP broadcast para descubrir dispositivos
BUFFER = 4096
# ============================================================
# SERVIDOR TCP RECEPCIÓN ARCHIVOS
# ============================================================
def servidor_tcp():
if not os.path.exists("recibidos"):
os.makedirs("recibidos")
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.bind(("0.0.0.0", MI_PUERTO))
srv.listen(5)
while True:
try:
conn, addr = srv.accept()
info = conn.recv(1024).decode()
nombre, tam = info.split("|")
tam = int(tam)
ruta = f"recibidos/{nombre}"
with open(ruta, "wb") as f:
recibido = 0
while recibido < tam:
datos = conn.recv(BUFFER)
if not datos:
break
f.write(datos)
recibido += len(datos)
conn.send(b"OK")
conn.close()
except Exception as e:
print("Error en servidor:", e)
# ============================================================
# SERVIDOR DE ANUNCIOS UDP
# ============================================================
def servidor_anuncios():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
time.sleep(1)
anuncio = f"DEVICE|{MI_IP}|{MI_PUERTO}"
s.sendto(anuncio.encode(), ('<broadcast>', ANUNCIO_PUERTO))
# ============================================================
# ESCANEAR RED (recibir anuncios)
# ============================================================
def descubrir_dispositivos(timeout=2):
dispositivos = {}
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("0.0.0.0", ANUNCIO_PUERTO))
s.settimeout(timeout)
start = time.time()
while time.time() - start < timeout:
try:
data, addr = s.recvfrom(1024)
msg = data.decode()
if msg.startswith("DEVICE"):
_, ip, puerto = msg.split("|")
# ignoramos nuestro propio equipo
if ip != MI_IP:
dispositivos[ip] = int(puerto)
except:
continue
return dispositivos
# ============================================================
# CLIENTE TCP
# ============================================================
def enviar_archivo(ruta, ip, puerto):
tam = os.path.getsize(ruta)
nombre = os.path.basename(ruta)
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli.connect((ip, puerto))
cli.send(f"{nombre}|{tam}".encode())
with open(ruta, "rb") as f:
while True:
datos = f.read(BUFFER)
if not datos:
break
cli.send(datos)
resp = cli.recv(1024)
cli.close()
return resp == b"OK"
# ============================================================
# INTERFAZ TKINTER
# ============================================================
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Enviador de archivos")
self.geometry("420x360")
self.resizable(False, False)
self.frame_actual = None
self.mostrar_home()
# --------------------------------------------------------
def reemplazar_pantalla(self, frame):
if self.frame_actual:
self.frame_actual.destroy()
self.frame_actual = frame
self.frame_actual.pack(fill="both", expand=True)
# --------------------------------------------------------
# PANTALLA HOME
# --------------------------------------------------------
def mostrar_home(self):
frame = tk.Frame(self)
tk.Label(frame, text="Tu IP:", font=("Arial", 16)).pack(pady=10)
tk.Label(frame, text=MI_IP, font=("Arial", 20, "bold")).pack(pady=10)
ttk.Button(frame, text="Buscar equipos", command=self.mostrar_dispositivos)\
.pack(pady=20)
self.reemplazar_pantalla(frame)
# --------------------------------------------------------
# LISTA DE DISPOSITIVOS
# --------------------------------------------------------
def mostrar_dispositivos(self):
frame = tk.Frame(self)
tk.Label(frame, text="Buscando equipos...", font=("Arial", 16)).pack(pady=10)
self.reemplazar_pantalla(frame)
self.update()
dispositivos = descubrir_dispositivos()
frame = tk.Frame(self)
tk.Label(frame, text="Equipos encontrados:", font=("Arial", 16)).pack(pady=10)
if not dispositivos:
tk.Label(frame, text="No se encontró ningún equipo.",
font=("Arial", 14), fg="red").pack(pady=20)
ttk.Button(frame, text="Volver", command=self.mostrar_home).pack()
self.reemplazar_pantalla(frame)
return
for ip, puerto in dispositivos.items():
ttk.Button(
frame,
text=f"{ip}:{puerto}",
command=lambda i=ip, p=puerto: self.seleccionar_archivo(i, p)
).pack(pady=5)
ttk.Button(frame, text="Volver", command=self.mostrar_home).pack(pady=20)
self.reemplazar_pantalla(frame)
# --------------------------------------------------------
# SELECCIONAR ARCHIVO
# --------------------------------------------------------
def seleccionar_archivo(self, ip, puerto):
ruta = filedialog.askopenfilename(title="Seleccionar archivo")
if ruta:
ok = enviar_archivo(ruta, ip, puerto)
if ok:
self.mostrar_confirmacion()
else:
self.mostrar_error()
# --------------------------------------------------------
# PANTALLAS DE CONFIRMACIÓN
# --------------------------------------------------------
def mostrar_confirmacion(self):
frame = tk.Frame(self)
tk.Label(frame, text="Archivo enviado con éxito 🎉",
font=("Arial", 16), fg="green").pack(pady=40)
ttk.Button(frame, text="Volver al inicio", command=self.mostrar_home)\
.pack(pady=10)
self.reemplazar_pantalla(frame)
def mostrar_error(self):
frame = tk.Frame(self)
tk.Label(frame, text="Error al enviar archivo", font=("Arial", 16),
fg="red").pack(pady=40)
ttk.Button(frame, text="Volver", command=self.mostrar_home).pack(pady=10)
self.reemplazar_pantalla(frame)
# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
# Iniciar servidores en background
threading.Thread(target=servidor_tcp, daemon=True).start()
threading.Thread(target=servidor_anuncios, daemon=True).start()
app = App()
app.mainloop()