diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8162494 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +out_folder/* +models/* +*zst +*png diff --git a/Inference/Dockerfile b/Inference/Dockerfile new file mode 100644 index 0000000..ec68654 --- /dev/null +++ b/Inference/Dockerfile @@ -0,0 +1,35 @@ +# Use an official PyTorch image with CUDA support (optional) +FROM pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime + +# Set working directory +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + git \ + ffmpeg \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + + +# Use the Conda pip path explicitly +RUN /opt/conda/bin/pip install --upgrade pip && \ + /opt/conda/bin/pip install --force-reinstall --no-cache-dir "numpy<2.0" \ + transformers==4.39.0 \ + accelerate==0.30.0 \ + torch==2.2.0 \ + torchvision==0.17.0 \ + datasets \ + matplotlib \ + scikit-learn \ + mediapy \ + pillow + +RUN /opt/conda/bin/conda install -y -c conda-forge opencv +RUN /opt/conda/bin/pip install --force-reinstall --no-cache-dir tqdm + +# Copy project files into container +COPY evaluate.py /app/evaluate.py + +# Default command (you can override with docker run) +CMD ["python", "/app/evaluate.py"] diff --git a/Inference/infrence.py b/Inference/infrence.py new file mode 100644 index 0000000..17b64a9 --- /dev/null +++ b/Inference/infrence.py @@ -0,0 +1,126 @@ +import os +import io +import time +import redis +import sqlite3 +import numpy as np +from PIL import Image +from datetime import datetime + +import torch +from transformers import AutoProcessor, AutoModelForVideoClassification + +# ----------------------------- +# ENVIRONMENT CONFIG +# ----------------------------- +redis_host = os.getenv("redis_host", "localhost") +redis_port = int(os.getenv("redis_port", 6379)) +stream_label = os.getenv("stream_label", "cropped_stream") +output_folder = os.getenv("out_folder", "/app/out_folder") +model_folder = os.getenv("model_folder", "base_model") +model_path = os.path.join(output_folder, model_folder) +threshold = float(os.getenv("threshold", "0.5")) +queue_label = f"{stream_label}_cubes" + +sqlite_path = os.path.join(output_folder, f"{stream_label}_results.db") +stream_folder = os.path.join(output_folder, stream_label) + +os.makedirs(output_folder, exist_ok=True) +os.makedirs(stream_folder, exist_ok=True) + +# ----------------------------- +# CONNECT TO REDIS +# ----------------------------- +redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=False) + +# ----------------------------- +# LOAD MODEL + PROCESSOR +# ----------------------------- +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +model = AutoModelForVideoClassification.from_pretrained(model_path).to(device) +processor = AutoProcessor.from_pretrained(model_path) +model.eval() + +# Warm up +with torch.no_grad(): + dummy = torch.randn(1, 3, 16, 224, 224).to(device) + _ = model(pixel_values=dummy) + +# ----------------------------- +# SETUP SQLITE +# ----------------------------- +conn = sqlite3.connect(sqlite_path) +cursor = conn.cursor() +cursor.execute(""" +CREATE TABLE IF NOT EXISTS inference_results ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + gif_name TEXT, + timestamp TEXT, + prediction INTEGER +) +""") +conn.commit() + +# ----------------------------- +# REDIS UTILS +# ----------------------------- +def from_redis_list(queue_label): + retry = 0 + while True: + compressed_data = redis_conn.lpop(queue_label) + if compressed_data: + buffer = io.BytesIO(compressed_data) + loaded_data = np.load(buffer) + frames = [loaded_data[key] for key in sorted(loaded_data.files)] + return frames + else: + retry += 1 + if retry % 50 == 0: + print(f"[WAIT] Queue {queue_label} empty for {retry/50:.1f} seconds...") + time.sleep(1/50.0) + if retry > 2000: + raise TimeoutError(f"Queue {queue_label} empty for over 40s") + +# ----------------------------- +# SAVE GIF +# ----------------------------- +def save_gif(frames, gif_path): + images = [Image.fromarray(frame.astype(np.uint8)) for frame in frames] + images[0].save(gif_path, save_all=True, append_images=images[1:], duration=50, loop=0) + +# ----------------------------- +# MAIN LOOP +# ----------------------------- +print(f"[INFO] Listening on Redis queue: {queue_label}") +while True: + try: + frames = from_redis_list(queue_label) + if not all(frame.shape == (224, 224, 3) for frame in frames): + print("[WARN] Skipped frame batch due to incorrect shape") + continue + + timestamp = datetime.utcnow().strftime("%y%m%d_%H%M%S_%f")[:-3] + gif_filename = f"{stream_label}/{stream_label}_{timestamp}.gif" + gif_path = os.path.join(stream_folder, gif_filename) + + # Save GIF + save_gif(frames, gif_path) + + # Preprocess and predict + with torch.no_grad(): + inputs = processor(images=[frame[:, :, ::-1] for frame in frames], return_tensors="pt") # convert BGR to RGB + pixel_values = inputs["pixel_values"].to(device) # [1, 3, 16, 224, 224] + logits = model(pixel_values=pixel_values).logits + prob = torch.softmax(logits, dim=-1)[0][1].item() + prediction = int(prob > threshold) + + # Insert into SQLite + cursor.execute("INSERT INTO inference_results (gif_name, timestamp, prediction) VALUES (?, ?, ?)", + (gif_filename, timestamp, prediction)) + conn.commit() + + print(f"[INFO] Saved {gif_filename} | Class={prediction} | Prob={prob:.3f}") + + except Exception as e: + print(f"[ERROR] {e}") + time.sleep(1) diff --git a/dataset/Dockerfile b/dataset/Dockerfile new file mode 100644 index 0000000..998ad21 --- /dev/null +++ b/dataset/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.10-slim + +RUN pip install numpy redis pillow + +COPY dataset.py /app/dataset.py + +CMD ["python3", "/app/dataset.py"] diff --git a/dataset/dataset.py b/dataset/dataset.py new file mode 100644 index 0000000..50d21e5 --- /dev/null +++ b/dataset/dataset.py @@ -0,0 +1,59 @@ +import os +import time +import redis +import io +import numpy as np +from PIL import Image + +# Connect to Redis +redis_host = os.getenv("redis_host", "localhost") +redis_port = int(os.getenv("redis_port", 6379)) +redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=False) + +# Environment variables +stream_label = os.getenv("stream_label", "cropped_stream") +out_folder = os.getenv("out_folder", "/app/out_folder") +stream_label_queue = f"{stream_label}_cubes" + +# Ensure output folder exists +os.makedirs(out_folder, exist_ok=True) + +def fromRedisList(queue_label): + compressed_data = None + retry = 0 + while compressed_data == None: + compressed_data = redis_conn.lpop(queue_label) + if compressed_data: + retry = 0 + print(f"Popped data from queue: {queue_label}") + buffer = io.BytesIO(compressed_data) + loaded_data = np.load(buffer) + data_list = [loaded_data[key] for key in loaded_data.files] + return data_list + else: + retry += 1 + if retry % 50 == 0: + print(f"Queue {queue_label} empty for {retry/50} seconds") + time.sleep(1/50.0) + if retry > 1000: + raise(f'Queue {queue_label} 20s empty') + +def save_frames_as_gif(frames, output_filename, duration=1): + images = [Image.fromarray(frame[:, :, ::-1]) for frame in frames] + images[0].save(output_filename, save_all=True, append_images=images[1:], duration=duration, loop=0) + +if __name__ == "__main__": + counter = 0 + print(f"[INFO] Starting GIF writer for queue: {stream_label}") + while True: + frames = fromRedisList(stream_label_queue) + if frames: + buffer = os.path.join(out_folder, f"{stream_label}_{counter:05}.npz") + filename = os.path.join(out_folder, f"{stream_label}_{counter:05}.gif") + save_frames_as_gif(frames, filename) + np.savez_compressed(buffer, *frames) # Use *data_list to unpack the list into arguments for savez_compressed + + print(f"[INFO] Saved as npz: {buffer}") + counter += 1 + else: + time.sleep(0.1) diff --git a/docker-compose.eval.yml b/docker-compose.eval.yml new file mode 100644 index 0000000..f0e44e9 --- /dev/null +++ b/docker-compose.eval.yml @@ -0,0 +1,21 @@ + +services: + evaluate: + build: + dockerfile: Dockerfile + context: evaluate + container_name: evaluate + # deploy: + # resources: + # reservations: + # devices: + # - driver: nvidia + # count: all + # capabilities: [gpu] + environment: + model_folder: "tuned" + dataset_folder: "/app/dataset_folder" + output_folder: "/app/output_folder" + volumes: + - ./out_folder:/app/dataset_folder + - ./models:/app/output_folder diff --git a/docker-compose.trainer.yml b/docker-compose.trainer.yml new file mode 100644 index 0000000..b59a040 --- /dev/null +++ b/docker-compose.trainer.yml @@ -0,0 +1,20 @@ + +services: + transfer: + build: + dockerfile: Dockerfile + context: transfer + container_name: transfer + # deploy: + # resources: + # reservations: + # devices: + # - driver: nvidia + # count: all + # capabilities: [gpu] + environment: + dataset_folder: "/app/dataset_folder" + output_folder: "/app/output_folder" + volumes: + - ./out_folder:/app/dataset_folder + - ./models:/app/output_folder diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..9331acf --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,58 @@ + +services: + redis: + image: redis:7 + container_name: redis_server + ports: + - "6379:6379" + + stream_reader: + build: + dockerfile: Dockerfile + context: reader + container_name: stream_reader + depends_on: + - redis + environment: +# stream_url: "rtsp://admin:labvision2019@10.1.8.182:554" + stream_url: "/videos/video.mp4" + stream_label: "pelea3" + redis_host: "redis" + redis_port: "6379" + redis_db: "0" + restart: unless-stopped + volumes: + - /home/ifiguero/DIA/dia3/DIA0205 - VISIÓN COMPUTACIONAL APLICADA/Profesor Cristian Aguilera/dataset/pelea3.mp4:/videos/video.mp4:ro + + stream_preprocess: + build: + dockerfile: Dockerfile + context: preprocess + container_name: stream_preprocess + depends_on: + - redis + - stream_reader + environment: + stream_label: "pelea3" + redis_host: "redis" + redis_port: "6379" + redis_db: "0" + restart: unless-stopped + + dataset_builder: + build: + dockerfile: Dockerfile + context: dataset + container_name: dataset_builder + depends_on: + - redis + - stream_preprocess + environment: + out_folder: "/app/out_folder" + stream_label: "pelea3" + redis_host: "redis" + redis_port: "6379" + redis_db: "0" + restart: unless-stopped + volumes: + - ./out_folder:/app/out_folder diff --git a/evaluate/Dockerfile b/evaluate/Dockerfile new file mode 100644 index 0000000..ec68654 --- /dev/null +++ b/evaluate/Dockerfile @@ -0,0 +1,35 @@ +# Use an official PyTorch image with CUDA support (optional) +FROM pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime + +# Set working directory +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + git \ + ffmpeg \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + + +# Use the Conda pip path explicitly +RUN /opt/conda/bin/pip install --upgrade pip && \ + /opt/conda/bin/pip install --force-reinstall --no-cache-dir "numpy<2.0" \ + transformers==4.39.0 \ + accelerate==0.30.0 \ + torch==2.2.0 \ + torchvision==0.17.0 \ + datasets \ + matplotlib \ + scikit-learn \ + mediapy \ + pillow + +RUN /opt/conda/bin/conda install -y -c conda-forge opencv +RUN /opt/conda/bin/pip install --force-reinstall --no-cache-dir tqdm + +# Copy project files into container +COPY evaluate.py /app/evaluate.py + +# Default command (you can override with docker run) +CMD ["python", "/app/evaluate.py"] diff --git a/evaluate/evaluate.py b/evaluate/evaluate.py new file mode 100644 index 0000000..72bb741 --- /dev/null +++ b/evaluate/evaluate.py @@ -0,0 +1,125 @@ +import os +import numpy as np +import torch +from torch.utils.data import Dataset, DataLoader +import cv2 +from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix +from transformers import AutoProcessor, AutoModelForVideoClassification +from tqdm import tqdm +from PIL import Image + +# ----------------------------- +# Paths +# ----------------------------- +dataset_folder = os.getenv("dataset_folder", "/app/dataset_folder") +output_folder = os.getenv("output_folder", "/app/output_folder") +model_folder = os.getenv("model_folder", "base_model") +model_path = f"{output_folder}/{model_folder}" + +# ----------------------------- +# Load Model + Processor +# ----------------------------- +model = AutoModelForVideoClassification.from_pretrained(model_path) +processor = AutoProcessor.from_pretrained(model_path) +model.eval() + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +model.to(device) + +# ----------------------------- +# Dataset class +# ----------------------------- +class VideoNPZDataset(Dataset): + def __init__(self, files, labels, processor): + self.files = files + self.labels = labels + self.processor = processor + + def __len__(self): + return len(self.files) + + def __getitem__(self, idx): + file = self.files[idx] + label = self.labels[idx] + + data = np.load(file) + frames = [data[key] for key in sorted(data.files)] + frames_rgb = [Image.fromarray(frame[:, :, ::-1]) for frame in frames] +# frames_rgb = [cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for frame in frames] + pixel_values = self.processor(images=frames_rgb, return_tensors="pt")["pixel_values"][0] + label_tensor = torch.tensor(label, dtype=torch.long) + return { + "pixel_values": pixel_values, + "labels": label_tensor, + } + +# ----------------------------- +# Collate Function +# ----------------------------- +def collate_fn(batch): + pixel_values = [item["pixel_values"] for item in batch] + labels = [item["labels"] for item in batch] + + pixel_values = torch.stack(pixel_values) # [B, 3, 16, 224, 224] + labels = torch.tensor(labels, dtype=torch.long) # [B] + + return { + "pixel_values": pixel_values, + "labels": labels, + } + +# ----------------------------- +# Load Dataset +# ----------------------------- +def load_dataset_from_npz(root_dir): + files = [] + labels = [] + label_map = {} + for i, class_name in enumerate(sorted(os.listdir(root_dir))): + label_map[i] = class_name + class_dir = os.path.join(root_dir, class_name) + for file in os.listdir(class_dir): + if file.endswith(".npz"): + files.append(os.path.join(class_dir, file)) + labels.append(i) + return files, labels, label_map + +files, labels, label_map = load_dataset_from_npz(dataset_folder) +dataset = VideoNPZDataset(files, labels, processor) +dataloader = DataLoader(dataset, batch_size=2, shuffle=False, collate_fn=collate_fn) + +# ----------------------------- +# Evaluation Loop +# ----------------------------- +all_preds = [] +all_labels = [] +all_probs = [] + +with torch.no_grad(): + for batch in tqdm(dataloader, desc="Evaluating"): + inputs = batch["pixel_values"].to(device) + labels = batch["labels"].to(device) + + outputs = model(pixel_values=inputs) + logits = outputs.logits + probs = torch.softmax(logits, dim=-1) + + preds = torch.argmax(probs, dim=-1) + + all_preds.extend(preds.cpu().numpy()) + all_labels.extend(labels.cpu().numpy()) + all_probs.extend(probs[:, 1].cpu().numpy()) # Class 1 probabilities + +# ----------------------------- +# Metrics +# ----------------------------- +accuracy = accuracy_score(all_labels, all_preds) +f1 = f1_score(all_labels, all_preds, average="binary") +roc_auc = roc_auc_score(all_labels, all_probs) +conf_matrix = confusion_matrix(all_labels, all_preds) + +print("\n=== Evaluation Metrics ===") +print(f"Accuracy : {accuracy:.4f}") +print(f"F1 Score : {f1:.4f}") +print(f"ROC AUC : {roc_auc:.4f}") +print(f"Confusion Matrix:\n{conf_matrix}") diff --git a/preprocess/Dockerfile b/preprocess/Dockerfile new file mode 100644 index 0000000..ba87b5e --- /dev/null +++ b/preprocess/Dockerfile @@ -0,0 +1,63 @@ +FROM python:3.10-slim + +# Install system dependencies for OpenCV and GStreamer +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + git \ + libgtk2.0-dev \ + pkg-config \ + libavcodec-dev \ + libavformat-dev \ + libswscale-dev \ + libv4l-dev \ + libxvidcore-dev \ + libx264-dev \ + libjpeg-dev \ + libpng-dev \ + libtiff-dev \ + gfortran \ + openexr \ + libatlas-base-dev \ + python3-dev \ + libgstreamer1.0-dev \ + libgstreamer-plugins-base1.0-dev \ + gstreamer1.0-tools \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly \ + gstreamer1.0-libav \ + && rm -rf /var/lib/apt/lists/* + +# Install numpy separately to avoid recompilation during opencv build +RUN pip install numpy + +# Clone OpenCV and OpenCV contrib +WORKDIR /opt +RUN git clone --branch 4.8.0 https://github.com/opencv/opencv.git && \ + git clone --branch 4.8.0 https://github.com/opencv/opencv_contrib.git + +# Build OpenCV with GStreamer support +WORKDIR /opt/opencv/build +RUN cmake -D CMAKE_BUILD_TYPE=Release \ + -D CMAKE_INSTALL_PREFIX=/usr/local \ + -D OPENCV_EXTRA_MODULES_PATH=/opt/opencv_contrib/modules \ + -D WITH_GSTREAMER=ON \ + -D WITH_V4L=ON \ + -D WITH_LIBV4L=ON \ + -D BUILD_opencv_python3=ON \ + -D BUILD_opencv_python2=OFF \ + -D BUILD_EXAMPLES=OFF \ + -D BUILD_TESTS=OFF \ + -D BUILD_PERF_TESTS=OFF \ + -D PYTHON_EXECUTABLE=$(which python3) \ + .. && \ + make -j"$(nproc)" && \ + make install && \ + ldconfig + +RUN pip install redis mediapipe==0.10.9 +COPY preprocess.py /app/preprocess.py +COPY efficientdet_lite2.tflite /app/efficientdet_lite2.tflite + +CMD ["python3", "/app/preprocess.py"] diff --git a/preprocess/efficientdet_lite2.tflite b/preprocess/efficientdet_lite2.tflite new file mode 100644 index 0000000..fffedaa Binary files /dev/null and b/preprocess/efficientdet_lite2.tflite differ diff --git a/preprocess/preprocess.py b/preprocess/preprocess.py new file mode 100644 index 0000000..aa90b90 --- /dev/null +++ b/preprocess/preprocess.py @@ -0,0 +1,205 @@ +import os +import io +import time +import redis +import cv2 +import numpy as np +import struct +import mediapipe as mp + +# Environment variables +stream_label = os.getenv("stream_label", "default_stream") +stream_label_queue = f"{stream_label}_cubes" +redis_host = os.getenv("redis_host", "localhost") +redis_port = int(os.getenv("redis_port", "6379")) + +# Connect to Redis +redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=False) +redis_conn.delete(stream_label_queue) + +def toRedis(queue_label, data): + print(f"Pushed data to queue: {queue_label}") + buffer = io.BytesIO() + np.savez(buffer, data=data) + compressed_data = buffer.getvalue() + return redis_conn.rpush(queue_label, compressed_data) + +def fromRedis(queue_label): + compressed_data = None + retry = 0 + while compressed_data == None: + compressed_data = redis_conn.lpop(queue_label) + if compressed_data: + retry = 0 + print(f"Popped data from queue: {queue_label}") + buffer = io.BytesIO(compressed_data) + loaded_data = np.load(buffer) + return loaded_data['data'] + else: + retry += 1 + if retry % 50 == 0: + print(f"Queue {queue_label} empty for {retry/50} seconds") + time.sleep(1/50.0) + if retry > 1000: + raise(f'Queue {queue_label} 20s empty') + +def toRedisList(queue_label, data_list): + print(f"Pushed data to queue: {queue_label}") + buffer = io.BytesIO() + np.savez(buffer, *data_list) # Use *data_list to unpack the list into arguments for savez_compressed + compressed_data = buffer.getvalue() + redis_conn.rpush(queue_label, compressed_data) + +def fromRedisList(queue_label): + compressed_data = None + retry = 0 + while compressed_data == None: + compressed_data = redis_conn.lpop(queue_label) + if compressed_data: + retry = 0 + print(f"Popped data from queue: {queue_label}") + buffer = io.BytesIO(compressed_data) + loaded_data = np.load(buffer) + data_list = [loaded_data[key] for key in loaded_data.files] + return data_list + else: + retry += 1 + if retry % 50 == 0: + print(f"Queue {queue_label} empty for {retry/50} seconds") + time.sleep(1/50.0) + + if retry > 1000: + raise(f'Queue {queue_label} 20s empty') + + +import mediapipe as mp +from mediapipe.tasks import python +from mediapipe.tasks.python import vision +import numpy as np +options = vision.ObjectDetectorOptions( + base_options=python.BaseOptions(model_asset_path='/app/efficientdet_lite2.tflite'), # You might need to download a model + category_allowlist=['person'], + score_threshold=0.5 +) + +# Create ObjectDetector +detector = vision.ObjectDetector.create_from_options(options) + +def detect_person_bbox(image_frame: np.ndarray) -> list: + mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_frame) + detection_result = detector.detect(mp_image) + + # Extract bounding boxes for detected persons + if detection_result and detection_result.detections: + h, w, _ = frame.shape + for detection in detection_result.detections: + if 'person' in [category.category_name for category in detection.categories]: + bbox = detection.bounding_box + if bbox.width != bbox.height: + center_x = bbox.origin_x + bbox.width / 2 + center_y = bbox.origin_y + bbox.height / 2 + size = max(bbox.width, bbox.height) + left = center_x - size / 2 + top = center_y - size / 2 + width = height = size + else: + width, height = bbox.width, bbox.height + + padding = int(max(10, width//10)) + + width += 2 * padding + height += 2 * padding + left = max(0, left-padding) + top = max(0, top-padding) + + if (right := (left+width) ) > w: + left -= right - w + if (bottom := (top+height) ) > h: + top -= bottom - h + + yield int(left), int(top), int(width), int(height) + +def get_enclosing_box(frame): + mp_pose = mp.solutions.pose + with mp_pose.Pose(min_detection_confidence=0.25, min_tracking_confidence=0) as pose: + results = pose.process(frame) + h, w, _ = frame.shape + + if results.pose_landmarks: + x_coords = [landmark.x * w for landmark in results.pose_landmarks.landmark] + y_coords = [landmark.y * h for landmark in results.pose_landmarks.landmark] + + min_x, max_x = min(x_coords), max(x_coords) + min_y, max_y = min(y_coords), max(y_coords) + + padding = 10 + min_x = max(0, min_x - padding) + min_y = max(0, min_y - padding) + max_x = min(w, max_x + padding) + max_y = min(h, max_y + padding) + + width = max_x - min_x + height = max_y - min_y + + # Make square + if width != height: + center_x = min_x + width / 2 + center_y = min_y + height / 2 + size = max(width, height) + min_x = center_x - size / 2 + min_y = center_y - size / 2 + width = height = size + + yield int(min_x), int(min_y), int(width), int(height) +# return None + +def crop_and_resize_frames(frames, box, target_size=224): + x, y, w, h = box + cropped = [] + for frame in frames: + crop = frame[y:y+h, x:x+w] + if crop.shape[0] != target_size or crop.shape[1] != target_size: + crop = cv2.resize(crop, (target_size, target_size)) + cropped.append(crop) + return cropped + +if __name__ == "__main__": + frame_list = [] + frame_count = 0 + last_hits = 0 + frame_hits = 0 + frame_lag = 0 + start_time = time.time() + lap_time = start_time + + print(f"[INFO] Starting consumer for stream: {stream_label}") + while frame_lag < 100: + frame = fromRedis(stream_label) + if frame is None: + frame_lag += 1 + wait_time = frame_lag * 0.01 + time.sleep(wait_time) + continue + frame_lag = 0 + + frame_list.append(frame) + frame_count += 1 + + if len(frame_list) == 16: + for box in detect_person_bbox(frame_list[0]): + frame_hits += 1 + cropped_frames = crop_and_resize_frames(frame_list, box) + toRedisList(stream_label_queue, cropped_frames) + + frame_list.pop(0) + + if frame_count % 15 == 0: + current_hits = frame_hits - last_hits + last_hits = frame_hits + last_lap = lap_time + lap_time = time.time() + elapsed = lap_time - last_lap + print(f"[INFO] {frame_count} frames, {frame_hits} hits. {current_hits} in {elapsed:.2f} seconds.") + + total_time = time.time() - start_time + print(f"[INFO] Finished. Total frames: {frame_count}. Total Hits {frame_hits}. Total time: {total_time:.2f} seconds.") diff --git a/reader/Dockerfile b/reader/Dockerfile new file mode 100644 index 0000000..1813e85 --- /dev/null +++ b/reader/Dockerfile @@ -0,0 +1,62 @@ +FROM python:3.10-slim + +# Install system dependencies for OpenCV and GStreamer +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + git \ + libgtk2.0-dev \ + pkg-config \ + libavcodec-dev \ + libavformat-dev \ + libswscale-dev \ + libv4l-dev \ + libxvidcore-dev \ + libx264-dev \ + libjpeg-dev \ + libpng-dev \ + libtiff-dev \ + gfortran \ + openexr \ + libatlas-base-dev \ + python3-dev \ + libgstreamer1.0-dev \ + libgstreamer-plugins-base1.0-dev \ + gstreamer1.0-tools \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly \ + gstreamer1.0-libav \ + && rm -rf /var/lib/apt/lists/* + +# Install numpy separately to avoid recompilation during opencv build +RUN pip install numpy + +# Clone OpenCV and OpenCV contrib +WORKDIR /opt +RUN git clone --branch 4.8.0 https://github.com/opencv/opencv.git && \ + git clone --branch 4.8.0 https://github.com/opencv/opencv_contrib.git + +# Build OpenCV with GStreamer support +WORKDIR /opt/opencv/build +RUN cmake -D CMAKE_BUILD_TYPE=Release \ + -D CMAKE_INSTALL_PREFIX=/usr/local \ + -D OPENCV_EXTRA_MODULES_PATH=/opt/opencv_contrib/modules \ + -D WITH_GSTREAMER=ON \ + -D WITH_V4L=ON \ + -D WITH_LIBV4L=ON \ + -D BUILD_opencv_python3=ON \ + -D BUILD_opencv_python2=OFF \ + -D BUILD_EXAMPLES=OFF \ + -D BUILD_TESTS=OFF \ + -D BUILD_PERF_TESTS=OFF \ + -D PYTHON_EXECUTABLE=$(which python3) \ + .. && \ + make -j"$(nproc)" && \ + make install && \ + ldconfig + +RUN pip install redis +COPY reader.py /app/reader.py + +CMD ["python3", "/app/reader.py"] diff --git a/reader/reader.py b/reader/reader.py new file mode 100644 index 0000000..ff75efb --- /dev/null +++ b/reader/reader.py @@ -0,0 +1,111 @@ +import os +import io +import time +import redis +import cv2 +import numpy as np +import struct + + +# Read environment variables +stream_url = os.getenv("stream_url") +stream_label = os.getenv("stream_label", "default_stream") + + +# Connect to Redis +redis_host = os.getenv("redis_host", "localhost") +redis_port = int(os.getenv("redis_port", "6379")) +redis_db = int(os.getenv("redis_db", "0")) +redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=False) + +# Clean the Redis queue +redis_conn.delete(stream_label) +print(f"[INFO] Cleared Redis queue for label: {stream_label}") + +def toRedis(queue_label, data): + print(f"Pushed data to queue: {queue_label}") + buffer = io.BytesIO() + np.savez(buffer, data=data) + compressed_data = buffer.getvalue() + return redis_conn.rpush(queue_label, compressed_data) + +def fromRedis(queue_label): + compressed_data = None + while compressed_data == None: + compressed_data = redis_conn.lpop(queue_label) + if compressed_data: + print(f"Popped data from queue: {queue_label}") + buffer = io.BytesIO(compressed_data) + loaded_data = np.load(buffer) + return loaded_data['data'] + else: + time.sleep(1/50.0) + +def toRedisList(queue_label, data_list): + print(f"Pushed data to queue: {queue_label}") + buffer = io.BytesIO() + np.savez(buffer, *data_list) # Use *data_list to unpack the list into arguments for savez_compressed + compressed_data = buffer.getvalue() + redis_conn.rpush(queue_label, compressed_data) + +def fromRedisList(queue_label): + compressed_data = None + while compressed_data == None: + compressed_data = redis_conn.lpop(queue_label) + if compressed_data: + print(f"Popped data from queue: {queue_label}") + buffer = io.BytesIO(compressed_data) + loaded_data = np.load(buffer) + data_list = [loaded_data[key] for key in loaded_data.files] + return data_list + else: + time.sleep(1/50.0) + + +# Main frame processing function +def stream_frames(video_url, label): + if video_url.startswith("rtsp://") or video_url.startswith("rtmp://"): + cap = cv2.VideoCapture(f"rtspsrc location={video_url} latency=0 ! decodebin ! videoconvert ! appsink", cv2.CAP_GSTREAMER) + else: + cap = cv2.VideoCapture(video_url) + + if not cap.isOpened(): + print(f"[ERROR] Could not open stream: {video_url}") + return + + print(f"[INFO] Starting capture from: {video_url}") + frame_count = 0 + start_time = time.time() + lap_time = start_time + + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + print("[INFO] Stream ended or cannot read frame.") + break + + toRedis(label,frame) + frame_count += 1 + time.sleep(1/30.0) + + # Log every 500 frames + if frame_count % 100 == 0: + last_lap = lap_time + lap_time = time.time() + elapsed = lap_time - last_lap + print(f"[INFO] 100 frames processed in {elapsed:.2f} seconds.") + + # if frame_count % 16 and (redis_len := redis_conn.llen(label)) > 75: + # print(f"[WARN] {redis_len} items in queue. Pruning to 50 latest.") + # redis_conn.ltrim(label, -50, -1) + + cap.release() + total_time = time.time() - start_time + print(f"[INFO] Finished. Total frames: {frame_count}. Total time: {total_time:.2f} seconds.") + +# Run on container start +if __name__ == "__main__": + if not stream_url: + print("[ERROR] Missing required environment variable: stream_url") + else: + stream_frames(stream_url, stream_label) diff --git a/transfer.py b/transfer.py new file mode 100644 index 0000000..737a504 --- /dev/null +++ b/transfer.py @@ -0,0 +1,169 @@ +import os +import cv2 +import torch +import numpy as np +import random +import matplotlib.pyplot as plt +from torch import nn +from transformers import AutoModelForVideoClassification, AutoProcessor, TrainingArguments, Trainer +from sklearn.model_selection import train_test_split +from torch.utils.data import Dataset + +dataset_folder = os.getenv("dataset_folder", "/app/dataset_folder") +output_folder = os.getenv("output_folder", "/app/output_folder") +os.makedirs(dataset_folder, exist_ok=True) +os.makedirs(f"{output_folder}/base_model", exist_ok=True) +os.makedirs(f"{output_folder}/tuned", exist_ok=True) + + +model = AutoModelForVideoClassification.from_pretrained(f"{output_folder}/base_model") +processor = AutoProcessor.from_pretrained(f"{output_folder}/base_model") + +for param in model.base_model.parameters(): + param.requires_grad = False + +# Replace classifier with binary output +model.classifier = nn.Linear(model.classifier.in_features, 2) + +class VideoNPZDataset(Dataset): + def __init__(self, files, labels, processor): + self.files = files + self.labels = labels + self.processor = processor + + def __len__(self): + return len(self.files) + + def __getitem__(self, idx): + file = self.files[idx] + label = self.labels[idx] + + + data = np.load(file) + frames = [data[key] for key in sorted(data.files)] + + # Debug print + # for i, frame in enumerate(frames): + # print(f" Frame {i} shape: {frame.shape}, dtype: {frame.dtype}") + + # Convert to RGB (assumes frames are in BGR format) + frames_rgb = [cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for frame in frames] + + # for i, frame in enumerate(frames_rgb): + # print(f" Frame {i} post-RGB shape: {frame.shape}") + + # Process the video frames + pixel_values = self.processor(images=frames_rgb, return_tensors="pt")["pixel_values"][0] # shape: [16, 3, 224, 224] + label_values = torch.tensor(label, dtype=torch.long) + # print(f"[{file}]") + # print(f" Processor output shape (before permute): {pixel_values.shape}") + # print(f" Processor label shape (before permute): {label_values.shape}") + + return { + "pixel_values": pixel_values, # [3, 16, 224, 224] + "labels": label_values # scalar tensor + } + +class myTrainer(Trainer): + def compute_loss(self, model, inputs, return_outputs=False): + print(f"[DEBUG] pixel_values: {inputs['pixel_values'].shape}") + print(f"[DEBUG] labels: {inputs['labels'].shape}") + + return super().compute_loss(model, inputs, return_outputs) + +def collate_fn(batch): + pixel_values = [] + labels = [] + + for item in batch: + video = item["pixel_values"] # shape: (16, 3, 224, 224) + label = item["labels"] + + pixel_values.append(video) + labels.append(label) + + pixel_values = torch.stack(pixel_values) # (batch_size, 3, 16, 224, 224) + labels_values = torch.tensor(labels, dtype=torch.long) # (batch_size, 3, 16, 224, 224) + + return { + "pixel_values": pixel_values, + "labels": labels_values, + } + + +def load_dataset_from_npz(root_dir): + files = [] + labels = [] + label_map = {} # for inference class map + for i, class_name in enumerate(sorted(os.listdir(root_dir))): + label_map[i] = class_name + class_dir = os.path.join(root_dir, class_name) + for file in os.listdir(class_dir): + if file.endswith(".npz"): + files.append(os.path.join(class_dir, file)) + labels.append(i) + return files, labels, label_map + +files, labels, label_map = load_dataset_from_npz(dataset_folder) + +# for file, label in zip(files, labels): +# print(f"{file} {label} {label_map[label]}") + +print(f" files: {len(files)}") +print(f" labels: {len(labels)}") + +import torch +print("CUDA available:", torch.cuda.is_available()) +print("Device count:", torch.cuda.device_count()) +print("Current device:", torch.cuda.current_device()) +print("Device name:", torch.cuda.get_device_name(0)) + +train_files, val_files, train_labels, val_labels = train_test_split(files, labels, test_size=0.2, stratify=labels, random_state=random.randint(1,5000)) + +train_dataset = VideoNPZDataset(train_files, train_labels, processor) +val_dataset = VideoNPZDataset(val_files, val_labels, processor) + +def compute_metrics(eval_pred): + logits, labels = eval_pred + preds = torch.sigmoid(torch.tensor(logits)).numpy() > 0.5 + accuracy = (preds.flatten() == labels).mean() + return {"accuracy": accuracy} + +training_args = TrainingArguments( + output_dir="./results", + evaluation_strategy="epoch", + save_strategy="epoch", + per_device_train_batch_size=2, + per_device_eval_batch_size=2, + num_train_epochs=10, + logging_dir="./logs", + logging_steps=10, + save_total_limit=2, + load_best_model_at_end=True, +) + +trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=val_dataset, + data_collator=collate_fn, +) + + +trainer.train() + +logs = trainer.state.log_history +train_loss = [x["loss"] for x in logs if "loss" in x] +eval_loss = [x["eval_loss"] for x in logs if "eval_loss" in x] + +plt.plot(train_loss, label="Train Loss") +plt.plot(eval_loss, label="Eval Loss") +plt.xlabel("Log Steps") +plt.ylabel("Loss") +plt.legend() +plt.title("Loss Curve") +plt.savefig(f"{output_folder}/tuned/loss_curve.png") + +trainer.save_model(f"{output_folder}/tuned") +processor.save_pretrained(f"{output_folder}/tuned") diff --git a/transfer/Dockerfile b/transfer/Dockerfile new file mode 100644 index 0000000..2c01b03 --- /dev/null +++ b/transfer/Dockerfile @@ -0,0 +1,34 @@ +# Use an official PyTorch image with CUDA support (optional) +FROM pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime + +# Set working directory +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + git \ + ffmpeg \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + + +# Use the Conda pip path explicitly +RUN /opt/conda/bin/pip install --upgrade pip && \ + /opt/conda/bin/pip install --force-reinstall --no-cache-dir "numpy<2.0" \ + transformers==4.39.0 \ + accelerate==0.30.0 \ + torch==2.2.0 \ + torchvision==0.17.0 \ + datasets \ + matplotlib \ + scikit-learn \ + mediapy \ + pillow + +RUN /opt/conda/bin/conda install -y -c conda-forge opencv + +# Copy project files into container +COPY transfer.py /app/transfer.py + +# Default command (you can override with docker run) +CMD ["python", "/app/transfer.py"] diff --git a/transfer/transfer.py b/transfer/transfer.py new file mode 100644 index 0000000..ec43e67 --- /dev/null +++ b/transfer/transfer.py @@ -0,0 +1,179 @@ +import os +import cv2 +import torch +import numpy as np +import random +import matplotlib.pyplot as plt +from torch import nn +from transformers import AutoModelForVideoClassification, AutoProcessor, TrainingArguments, Trainer +from sklearn.model_selection import train_test_split +from torch.utils.data import Dataset +from PIL import Image + +dataset_folder = os.getenv("dataset_folder", "/app/dataset_folder") +output_folder = os.getenv("output_folder", "/app/output_folder") +os.makedirs(dataset_folder, exist_ok=True) +os.makedirs(f"{output_folder}/base_model", exist_ok=True) +os.makedirs(f"{output_folder}/tuned", exist_ok=True) + + +model = AutoModelForVideoClassification.from_pretrained(f"{output_folder}/base_model") +processor = AutoProcessor.from_pretrained(f"{output_folder}/base_model") + +# for param in model.base_model.parameters(): +# param.requires_grad = False + +# Replace classifier with binary output +model.classifier = nn.Linear(model.classifier.in_features, 2) + +class VideoNPZDataset(Dataset): + def __init__(self, files, labels, processor): + self.files = files + self.labels = labels + self.processor = processor + + def __len__(self): + return len(self.files) + + def __getitem__(self, idx): + file = self.files[idx] + label = self.labels[idx] + + + data = np.load(file) + frames = [data[key] for key in sorted(data.files)] + + # Debug print + # for i, frame in enumerate(frames): + # print(f" Frame {i} shape: {frame.shape}, dtype: {frame.dtype}") + + # Convert to RGB (assumes frames are in BGR format) + frames_rgb = [Image.fromarray(frame[:, :, ::-1]) for frame in frames] + + # for i, frame in enumerate(frames_rgb): + # print(f" Frame {i} post-RGB shape: {frame.shape}") + + # Process the video frames + pixel_values = self.processor(images=frames_rgb, return_tensors="pt")["pixel_values"][0] # shape: [16, 3, 224, 224] + label_values = torch.tensor(label, dtype=torch.long) + # print(f"[{file}]") + # print(f" Processor output shape (before permute): {pixel_values.shape}") + # print(f" Processor label shape (before permute): {label_values.shape}") + + return { + "pixel_values": pixel_values, # [3, 16, 224, 224] + "labels": label_values # scalar tensor + } + +class myTrainer(Trainer): + def compute_loss(self, model, inputs, return_outputs=False): + print(f"[DEBUG] pixel_values: {inputs['pixel_values'].shape}") + print(f"[DEBUG] labels: {inputs['labels'].shape}") + + return super().compute_loss(model, inputs, return_outputs) + +def collate_fn(batch): + pixel_values = [] + labels = [] + + for item in batch: + video = item["pixel_values"] # shape: (16, 3, 224, 224) + label = item["labels"] + + pixel_values.append(video) + labels.append(label) + + pixel_values = torch.stack(pixel_values) # (batch_size, 3, 16, 224, 224) + labels_values = torch.tensor(labels, dtype=torch.long) # (batch_size, 3, 16, 224, 224) + + return { + "pixel_values": pixel_values, + "labels": labels_values, + } + + +def load_dataset_from_npz(root_dir): + files = [] + labels = [] + groups = [] + label_map = {} # for inference class map + group_map = {} + group_counter = 0 + for i, class_name in enumerate(sorted(os.listdir(root_dir))): + label_map[i] = class_name + class_dir = os.path.join(root_dir, class_name) + for file in os.listdir(class_dir): + if file.endswith(".npz"): + group_id = f"{file.split('_')[0]}_{i}" + if group_id not in group_map: + group_counter += 1 + group_map[group_id] = group_counter + + groups.append(group_map[group_id]) + files.append(os.path.join(class_dir, file)) + labels.append(i) + return files, labels, label_map, groups + +files, labels, label_map, groups = load_dataset_from_npz(dataset_folder) + +# for file, label in zip(files, labels): +# print(f"{file} {label} {label_map[label]}") + +print(f" files: {len(files)}") +print(f" labels: {len(labels)}") + +# import torch +# print("CUDA available:", torch.cuda.is_available()) +# print("Device count:", torch.cuda.device_count()) +# print("Current device:", torch.cuda.current_device()) +# print("Device name:", torch.cuda.get_device_name(0)) + +train_files, val_files, train_labels, val_labels = train_test_split(files, labels, test_size=0.2, stratify=groups, random_state=random.randint(1,5000)) + +train_dataset = VideoNPZDataset(train_files, train_labels, processor) +val_dataset = VideoNPZDataset(val_files, val_labels, processor) + +def compute_metrics(eval_pred): + logits, labels = eval_pred + preds = torch.sigmoid(torch.tensor(logits)).numpy() > 0.5 + accuracy = (preds.flatten() == labels).mean() + return {"accuracy": accuracy} + +training_args = TrainingArguments( + output_dir="./results", + evaluation_strategy="epoch", + save_strategy="epoch", + per_device_train_batch_size=8, + per_device_eval_batch_size=8, + num_train_epochs=3, + logging_dir="./logs", + logging_steps=2, + save_total_limit=2, + load_best_model_at_end=True, +) + +trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=val_dataset, + data_collator=collate_fn, +) + + +trainer.train() + +logs = trainer.state.log_history +train_loss = [x["loss"] for x in logs if "loss" in x] +eval_loss = [x["eval_loss"] for x in logs if "eval_loss" in x] + +plt.plot(train_loss, label="Train Loss") +plt.plot(eval_loss, label="Eval Loss") +plt.xlabel("Log Steps") +plt.ylabel("Loss") +plt.legend() +plt.title("Loss Curve") +plt.savefig(f"{output_folder}/tuned/loss_curve.png") + +trainer.save_model(f"{output_folder}/tuned") +processor.save_pretrained(f"{output_folder}/tuned")