Compare commits

...

1 Commits

Author SHA1 Message Date
ifiguero 169f92c3fb train_secuence 2025-07-21 01:08:48 -04:00
18 changed files with 1313 additions and 0 deletions

4
.gitignore vendored 100644
View File

@ -0,0 +1,4 @@
out_folder/*
models/*
*zst
*png

View File

@ -0,0 +1,35 @@
# Use an official PyTorch image with CUDA support (optional)
FROM pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime
# Set working directory
WORKDIR /app
RUN apt-get update && apt-get install -y \
git \
ffmpeg \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Use the Conda pip path explicitly
RUN /opt/conda/bin/pip install --upgrade pip && \
/opt/conda/bin/pip install --force-reinstall --no-cache-dir "numpy<2.0" \
transformers==4.39.0 \
accelerate==0.30.0 \
torch==2.2.0 \
torchvision==0.17.0 \
datasets \
matplotlib \
scikit-learn \
mediapy \
pillow
RUN /opt/conda/bin/conda install -y -c conda-forge opencv
RUN /opt/conda/bin/pip install --force-reinstall --no-cache-dir tqdm
# Copy project files into container
COPY evaluate.py /app/evaluate.py
# Default command (you can override with docker run)
CMD ["python", "/app/evaluate.py"]

View File

@ -0,0 +1,126 @@
import os
import io
import time
import redis
import sqlite3
import numpy as np
from PIL import Image
from datetime import datetime
import torch
from transformers import AutoProcessor, AutoModelForVideoClassification
# -----------------------------
# ENVIRONMENT CONFIG
# -----------------------------
redis_host = os.getenv("redis_host", "localhost")
redis_port = int(os.getenv("redis_port", 6379))
stream_label = os.getenv("stream_label", "cropped_stream")
output_folder = os.getenv("out_folder", "/app/out_folder")
model_folder = os.getenv("model_folder", "base_model")
model_path = os.path.join(output_folder, model_folder)
threshold = float(os.getenv("threshold", "0.5"))
queue_label = f"{stream_label}_cubes"
sqlite_path = os.path.join(output_folder, f"{stream_label}_results.db")
stream_folder = os.path.join(output_folder, stream_label)
os.makedirs(output_folder, exist_ok=True)
os.makedirs(stream_folder, exist_ok=True)
# -----------------------------
# CONNECT TO REDIS
# -----------------------------
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=False)
# -----------------------------
# LOAD MODEL + PROCESSOR
# -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForVideoClassification.from_pretrained(model_path).to(device)
processor = AutoProcessor.from_pretrained(model_path)
model.eval()
# Warm up
with torch.no_grad():
dummy = torch.randn(1, 3, 16, 224, 224).to(device)
_ = model(pixel_values=dummy)
# -----------------------------
# SETUP SQLITE
# -----------------------------
conn = sqlite3.connect(sqlite_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS inference_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gif_name TEXT,
timestamp TEXT,
prediction INTEGER
)
""")
conn.commit()
# -----------------------------
# REDIS UTILS
# -----------------------------
def from_redis_list(queue_label):
retry = 0
while True:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
frames = [loaded_data[key] for key in sorted(loaded_data.files)]
return frames
else:
retry += 1
if retry % 50 == 0:
print(f"[WAIT] Queue {queue_label} empty for {retry/50:.1f} seconds...")
time.sleep(1/50.0)
if retry > 2000:
raise TimeoutError(f"Queue {queue_label} empty for over 40s")
# -----------------------------
# SAVE GIF
# -----------------------------
def save_gif(frames, gif_path):
images = [Image.fromarray(frame.astype(np.uint8)) for frame in frames]
images[0].save(gif_path, save_all=True, append_images=images[1:], duration=50, loop=0)
# -----------------------------
# MAIN LOOP
# -----------------------------
print(f"[INFO] Listening on Redis queue: {queue_label}")
while True:
try:
frames = from_redis_list(queue_label)
if not all(frame.shape == (224, 224, 3) for frame in frames):
print("[WARN] Skipped frame batch due to incorrect shape")
continue
timestamp = datetime.utcnow().strftime("%y%m%d_%H%M%S_%f")[:-3]
gif_filename = f"{stream_label}/{stream_label}_{timestamp}.gif"
gif_path = os.path.join(stream_folder, gif_filename)
# Save GIF
save_gif(frames, gif_path)
# Preprocess and predict
with torch.no_grad():
inputs = processor(images=[frame[:, :, ::-1] for frame in frames], return_tensors="pt") # convert BGR to RGB
pixel_values = inputs["pixel_values"].to(device) # [1, 3, 16, 224, 224]
logits = model(pixel_values=pixel_values).logits
prob = torch.softmax(logits, dim=-1)[0][1].item()
prediction = int(prob > threshold)
# Insert into SQLite
cursor.execute("INSERT INTO inference_results (gif_name, timestamp, prediction) VALUES (?, ?, ?)",
(gif_filename, timestamp, prediction))
conn.commit()
print(f"[INFO] Saved {gif_filename} | Class={prediction} | Prob={prob:.3f}")
except Exception as e:
print(f"[ERROR] {e}")
time.sleep(1)

View File

@ -0,0 +1,7 @@
FROM python:3.10-slim
RUN pip install numpy redis pillow
COPY dataset.py /app/dataset.py
CMD ["python3", "/app/dataset.py"]

59
dataset/dataset.py 100644
View File

@ -0,0 +1,59 @@
import os
import time
import redis
import io
import numpy as np
from PIL import Image
# Connect to Redis
redis_host = os.getenv("redis_host", "localhost")
redis_port = int(os.getenv("redis_port", 6379))
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=False)
# Environment variables
stream_label = os.getenv("stream_label", "cropped_stream")
out_folder = os.getenv("out_folder", "/app/out_folder")
stream_label_queue = f"{stream_label}_cubes"
# Ensure output folder exists
os.makedirs(out_folder, exist_ok=True)
def fromRedisList(queue_label):
compressed_data = None
retry = 0
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
retry = 0
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
data_list = [loaded_data[key] for key in loaded_data.files]
return data_list
else:
retry += 1
if retry % 50 == 0:
print(f"Queue {queue_label} empty for {retry/50} seconds")
time.sleep(1/50.0)
if retry > 1000:
raise(f'Queue {queue_label} 20s empty')
def save_frames_as_gif(frames, output_filename, duration=1):
images = [Image.fromarray(frame[:, :, ::-1]) for frame in frames]
images[0].save(output_filename, save_all=True, append_images=images[1:], duration=duration, loop=0)
if __name__ == "__main__":
counter = 0
print(f"[INFO] Starting GIF writer for queue: {stream_label}")
while True:
frames = fromRedisList(stream_label_queue)
if frames:
buffer = os.path.join(out_folder, f"{stream_label}_{counter:05}.npz")
filename = os.path.join(out_folder, f"{stream_label}_{counter:05}.gif")
save_frames_as_gif(frames, filename)
np.savez_compressed(buffer, *frames) # Use *data_list to unpack the list into arguments for savez_compressed
print(f"[INFO] Saved as npz: {buffer}")
counter += 1
else:
time.sleep(0.1)

View File

@ -0,0 +1,21 @@
services:
evaluate:
build:
dockerfile: Dockerfile
context: evaluate
container_name: evaluate
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: all
# capabilities: [gpu]
environment:
model_folder: "tuned"
dataset_folder: "/app/dataset_folder"
output_folder: "/app/output_folder"
volumes:
- ./out_folder:/app/dataset_folder
- ./models:/app/output_folder

View File

@ -0,0 +1,20 @@
services:
transfer:
build:
dockerfile: Dockerfile
context: transfer
container_name: transfer
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: all
# capabilities: [gpu]
environment:
dataset_folder: "/app/dataset_folder"
output_folder: "/app/output_folder"
volumes:
- ./out_folder:/app/dataset_folder
- ./models:/app/output_folder

58
docker-compose.yml 100644
View File

@ -0,0 +1,58 @@
services:
redis:
image: redis:7
container_name: redis_server
ports:
- "6379:6379"
stream_reader:
build:
dockerfile: Dockerfile
context: reader
container_name: stream_reader
depends_on:
- redis
environment:
# stream_url: "rtsp://admin:labvision2019@10.1.8.182:554"
stream_url: "/videos/video.mp4"
stream_label: "pelea3"
redis_host: "redis"
redis_port: "6379"
redis_db: "0"
restart: unless-stopped
volumes:
- /home/ifiguero/DIA/dia3/DIA0205 - VISIÓN COMPUTACIONAL APLICADA/Profesor Cristian Aguilera/dataset/pelea3.mp4:/videos/video.mp4:ro
stream_preprocess:
build:
dockerfile: Dockerfile
context: preprocess
container_name: stream_preprocess
depends_on:
- redis
- stream_reader
environment:
stream_label: "pelea3"
redis_host: "redis"
redis_port: "6379"
redis_db: "0"
restart: unless-stopped
dataset_builder:
build:
dockerfile: Dockerfile
context: dataset
container_name: dataset_builder
depends_on:
- redis
- stream_preprocess
environment:
out_folder: "/app/out_folder"
stream_label: "pelea3"
redis_host: "redis"
redis_port: "6379"
redis_db: "0"
restart: unless-stopped
volumes:
- ./out_folder:/app/out_folder

View File

@ -0,0 +1,35 @@
# Use an official PyTorch image with CUDA support (optional)
FROM pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime
# Set working directory
WORKDIR /app
RUN apt-get update && apt-get install -y \
git \
ffmpeg \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Use the Conda pip path explicitly
RUN /opt/conda/bin/pip install --upgrade pip && \
/opt/conda/bin/pip install --force-reinstall --no-cache-dir "numpy<2.0" \
transformers==4.39.0 \
accelerate==0.30.0 \
torch==2.2.0 \
torchvision==0.17.0 \
datasets \
matplotlib \
scikit-learn \
mediapy \
pillow
RUN /opt/conda/bin/conda install -y -c conda-forge opencv
RUN /opt/conda/bin/pip install --force-reinstall --no-cache-dir tqdm
# Copy project files into container
COPY evaluate.py /app/evaluate.py
# Default command (you can override with docker run)
CMD ["python", "/app/evaluate.py"]

View File

@ -0,0 +1,125 @@
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import cv2
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix
from transformers import AutoProcessor, AutoModelForVideoClassification
from tqdm import tqdm
from PIL import Image
# -----------------------------
# Paths
# -----------------------------
dataset_folder = os.getenv("dataset_folder", "/app/dataset_folder")
output_folder = os.getenv("output_folder", "/app/output_folder")
model_folder = os.getenv("model_folder", "base_model")
model_path = f"{output_folder}/{model_folder}"
# -----------------------------
# Load Model + Processor
# -----------------------------
model = AutoModelForVideoClassification.from_pretrained(model_path)
processor = AutoProcessor.from_pretrained(model_path)
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# -----------------------------
# Dataset class
# -----------------------------
class VideoNPZDataset(Dataset):
def __init__(self, files, labels, processor):
self.files = files
self.labels = labels
self.processor = processor
def __len__(self):
return len(self.files)
def __getitem__(self, idx):
file = self.files[idx]
label = self.labels[idx]
data = np.load(file)
frames = [data[key] for key in sorted(data.files)]
frames_rgb = [Image.fromarray(frame[:, :, ::-1]) for frame in frames]
# frames_rgb = [cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for frame in frames]
pixel_values = self.processor(images=frames_rgb, return_tensors="pt")["pixel_values"][0]
label_tensor = torch.tensor(label, dtype=torch.long)
return {
"pixel_values": pixel_values,
"labels": label_tensor,
}
# -----------------------------
# Collate Function
# -----------------------------
def collate_fn(batch):
pixel_values = [item["pixel_values"] for item in batch]
labels = [item["labels"] for item in batch]
pixel_values = torch.stack(pixel_values) # [B, 3, 16, 224, 224]
labels = torch.tensor(labels, dtype=torch.long) # [B]
return {
"pixel_values": pixel_values,
"labels": labels,
}
# -----------------------------
# Load Dataset
# -----------------------------
def load_dataset_from_npz(root_dir):
files = []
labels = []
label_map = {}
for i, class_name in enumerate(sorted(os.listdir(root_dir))):
label_map[i] = class_name
class_dir = os.path.join(root_dir, class_name)
for file in os.listdir(class_dir):
if file.endswith(".npz"):
files.append(os.path.join(class_dir, file))
labels.append(i)
return files, labels, label_map
files, labels, label_map = load_dataset_from_npz(dataset_folder)
dataset = VideoNPZDataset(files, labels, processor)
dataloader = DataLoader(dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)
# -----------------------------
# Evaluation Loop
# -----------------------------
all_preds = []
all_labels = []
all_probs = []
with torch.no_grad():
for batch in tqdm(dataloader, desc="Evaluating"):
inputs = batch["pixel_values"].to(device)
labels = batch["labels"].to(device)
outputs = model(pixel_values=inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1)
preds = torch.argmax(probs, dim=-1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_probs.extend(probs[:, 1].cpu().numpy()) # Class 1 probabilities
# -----------------------------
# Metrics
# -----------------------------
accuracy = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average="binary")
roc_auc = roc_auc_score(all_labels, all_probs)
conf_matrix = confusion_matrix(all_labels, all_preds)
print("\n=== Evaluation Metrics ===")
print(f"Accuracy : {accuracy:.4f}")
print(f"F1 Score : {f1:.4f}")
print(f"ROC AUC : {roc_auc:.4f}")
print(f"Confusion Matrix:\n{conf_matrix}")

View File

@ -0,0 +1,63 @@
FROM python:3.10-slim
# Install system dependencies for OpenCV and GStreamer
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
git \
libgtk2.0-dev \
pkg-config \
libavcodec-dev \
libavformat-dev \
libswscale-dev \
libv4l-dev \
libxvidcore-dev \
libx264-dev \
libjpeg-dev \
libpng-dev \
libtiff-dev \
gfortran \
openexr \
libatlas-base-dev \
python3-dev \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
gstreamer1.0-tools \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly \
gstreamer1.0-libav \
&& rm -rf /var/lib/apt/lists/*
# Install numpy separately to avoid recompilation during opencv build
RUN pip install numpy
# Clone OpenCV and OpenCV contrib
WORKDIR /opt
RUN git clone --branch 4.8.0 https://github.com/opencv/opencv.git && \
git clone --branch 4.8.0 https://github.com/opencv/opencv_contrib.git
# Build OpenCV with GStreamer support
WORKDIR /opt/opencv/build
RUN cmake -D CMAKE_BUILD_TYPE=Release \
-D CMAKE_INSTALL_PREFIX=/usr/local \
-D OPENCV_EXTRA_MODULES_PATH=/opt/opencv_contrib/modules \
-D WITH_GSTREAMER=ON \
-D WITH_V4L=ON \
-D WITH_LIBV4L=ON \
-D BUILD_opencv_python3=ON \
-D BUILD_opencv_python2=OFF \
-D BUILD_EXAMPLES=OFF \
-D BUILD_TESTS=OFF \
-D BUILD_PERF_TESTS=OFF \
-D PYTHON_EXECUTABLE=$(which python3) \
.. && \
make -j"$(nproc)" && \
make install && \
ldconfig
RUN pip install redis mediapipe==0.10.9
COPY preprocess.py /app/preprocess.py
COPY efficientdet_lite2.tflite /app/efficientdet_lite2.tflite
CMD ["python3", "/app/preprocess.py"]

Binary file not shown.

View File

@ -0,0 +1,205 @@
import os
import io
import time
import redis
import cv2
import numpy as np
import struct
import mediapipe as mp
# Environment variables
stream_label = os.getenv("stream_label", "default_stream")
stream_label_queue = f"{stream_label}_cubes"
redis_host = os.getenv("redis_host", "localhost")
redis_port = int(os.getenv("redis_port", "6379"))
# Connect to Redis
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=False)
redis_conn.delete(stream_label_queue)
def toRedis(queue_label, data):
print(f"Pushed data to queue: {queue_label}")
buffer = io.BytesIO()
np.savez(buffer, data=data)
compressed_data = buffer.getvalue()
return redis_conn.rpush(queue_label, compressed_data)
def fromRedis(queue_label):
compressed_data = None
retry = 0
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
retry = 0
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
return loaded_data['data']
else:
retry += 1
if retry % 50 == 0:
print(f"Queue {queue_label} empty for {retry/50} seconds")
time.sleep(1/50.0)
if retry > 1000:
raise(f'Queue {queue_label} 20s empty')
def toRedisList(queue_label, data_list):
print(f"Pushed data to queue: {queue_label}")
buffer = io.BytesIO()
np.savez(buffer, *data_list) # Use *data_list to unpack the list into arguments for savez_compressed
compressed_data = buffer.getvalue()
redis_conn.rpush(queue_label, compressed_data)
def fromRedisList(queue_label):
compressed_data = None
retry = 0
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
retry = 0
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
data_list = [loaded_data[key] for key in loaded_data.files]
return data_list
else:
retry += 1
if retry % 50 == 0:
print(f"Queue {queue_label} empty for {retry/50} seconds")
time.sleep(1/50.0)
if retry > 1000:
raise(f'Queue {queue_label} 20s empty')
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np
options = vision.ObjectDetectorOptions(
base_options=python.BaseOptions(model_asset_path='/app/efficientdet_lite2.tflite'), # You might need to download a model
category_allowlist=['person'],
score_threshold=0.5
)
# Create ObjectDetector
detector = vision.ObjectDetector.create_from_options(options)
def detect_person_bbox(image_frame: np.ndarray) -> list:
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_frame)
detection_result = detector.detect(mp_image)
# Extract bounding boxes for detected persons
if detection_result and detection_result.detections:
h, w, _ = frame.shape
for detection in detection_result.detections:
if 'person' in [category.category_name for category in detection.categories]:
bbox = detection.bounding_box
if bbox.width != bbox.height:
center_x = bbox.origin_x + bbox.width / 2
center_y = bbox.origin_y + bbox.height / 2
size = max(bbox.width, bbox.height)
left = center_x - size / 2
top = center_y - size / 2
width = height = size
else:
width, height = bbox.width, bbox.height
padding = int(max(10, width//10))
width += 2 * padding
height += 2 * padding
left = max(0, left-padding)
top = max(0, top-padding)
if (right := (left+width) ) > w:
left -= right - w
if (bottom := (top+height) ) > h:
top -= bottom - h
yield int(left), int(top), int(width), int(height)
def get_enclosing_box(frame):
mp_pose = mp.solutions.pose
with mp_pose.Pose(min_detection_confidence=0.25, min_tracking_confidence=0) as pose:
results = pose.process(frame)
h, w, _ = frame.shape
if results.pose_landmarks:
x_coords = [landmark.x * w for landmark in results.pose_landmarks.landmark]
y_coords = [landmark.y * h for landmark in results.pose_landmarks.landmark]
min_x, max_x = min(x_coords), max(x_coords)
min_y, max_y = min(y_coords), max(y_coords)
padding = 10
min_x = max(0, min_x - padding)
min_y = max(0, min_y - padding)
max_x = min(w, max_x + padding)
max_y = min(h, max_y + padding)
width = max_x - min_x
height = max_y - min_y
# Make square
if width != height:
center_x = min_x + width / 2
center_y = min_y + height / 2
size = max(width, height)
min_x = center_x - size / 2
min_y = center_y - size / 2
width = height = size
yield int(min_x), int(min_y), int(width), int(height)
# return None
def crop_and_resize_frames(frames, box, target_size=224):
x, y, w, h = box
cropped = []
for frame in frames:
crop = frame[y:y+h, x:x+w]
if crop.shape[0] != target_size or crop.shape[1] != target_size:
crop = cv2.resize(crop, (target_size, target_size))
cropped.append(crop)
return cropped
if __name__ == "__main__":
frame_list = []
frame_count = 0
last_hits = 0
frame_hits = 0
frame_lag = 0
start_time = time.time()
lap_time = start_time
print(f"[INFO] Starting consumer for stream: {stream_label}")
while frame_lag < 100:
frame = fromRedis(stream_label)
if frame is None:
frame_lag += 1
wait_time = frame_lag * 0.01
time.sleep(wait_time)
continue
frame_lag = 0
frame_list.append(frame)
frame_count += 1
if len(frame_list) == 16:
for box in detect_person_bbox(frame_list[0]):
frame_hits += 1
cropped_frames = crop_and_resize_frames(frame_list, box)
toRedisList(stream_label_queue, cropped_frames)
frame_list.pop(0)
if frame_count % 15 == 0:
current_hits = frame_hits - last_hits
last_hits = frame_hits
last_lap = lap_time
lap_time = time.time()
elapsed = lap_time - last_lap
print(f"[INFO] {frame_count} frames, {frame_hits} hits. {current_hits} in {elapsed:.2f} seconds.")
total_time = time.time() - start_time
print(f"[INFO] Finished. Total frames: {frame_count}. Total Hits {frame_hits}. Total time: {total_time:.2f} seconds.")

62
reader/Dockerfile 100644
View File

@ -0,0 +1,62 @@
FROM python:3.10-slim
# Install system dependencies for OpenCV and GStreamer
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
git \
libgtk2.0-dev \
pkg-config \
libavcodec-dev \
libavformat-dev \
libswscale-dev \
libv4l-dev \
libxvidcore-dev \
libx264-dev \
libjpeg-dev \
libpng-dev \
libtiff-dev \
gfortran \
openexr \
libatlas-base-dev \
python3-dev \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
gstreamer1.0-tools \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly \
gstreamer1.0-libav \
&& rm -rf /var/lib/apt/lists/*
# Install numpy separately to avoid recompilation during opencv build
RUN pip install numpy
# Clone OpenCV and OpenCV contrib
WORKDIR /opt
RUN git clone --branch 4.8.0 https://github.com/opencv/opencv.git && \
git clone --branch 4.8.0 https://github.com/opencv/opencv_contrib.git
# Build OpenCV with GStreamer support
WORKDIR /opt/opencv/build
RUN cmake -D CMAKE_BUILD_TYPE=Release \
-D CMAKE_INSTALL_PREFIX=/usr/local \
-D OPENCV_EXTRA_MODULES_PATH=/opt/opencv_contrib/modules \
-D WITH_GSTREAMER=ON \
-D WITH_V4L=ON \
-D WITH_LIBV4L=ON \
-D BUILD_opencv_python3=ON \
-D BUILD_opencv_python2=OFF \
-D BUILD_EXAMPLES=OFF \
-D BUILD_TESTS=OFF \
-D BUILD_PERF_TESTS=OFF \
-D PYTHON_EXECUTABLE=$(which python3) \
.. && \
make -j"$(nproc)" && \
make install && \
ldconfig
RUN pip install redis
COPY reader.py /app/reader.py
CMD ["python3", "/app/reader.py"]

111
reader/reader.py 100644
View File

@ -0,0 +1,111 @@
import os
import io
import time
import redis
import cv2
import numpy as np
import struct
# Read environment variables
stream_url = os.getenv("stream_url")
stream_label = os.getenv("stream_label", "default_stream")
# Connect to Redis
redis_host = os.getenv("redis_host", "localhost")
redis_port = int(os.getenv("redis_port", "6379"))
redis_db = int(os.getenv("redis_db", "0"))
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=False)
# Clean the Redis queue
redis_conn.delete(stream_label)
print(f"[INFO] Cleared Redis queue for label: {stream_label}")
def toRedis(queue_label, data):
print(f"Pushed data to queue: {queue_label}")
buffer = io.BytesIO()
np.savez(buffer, data=data)
compressed_data = buffer.getvalue()
return redis_conn.rpush(queue_label, compressed_data)
def fromRedis(queue_label):
compressed_data = None
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
return loaded_data['data']
else:
time.sleep(1/50.0)
def toRedisList(queue_label, data_list):
print(f"Pushed data to queue: {queue_label}")
buffer = io.BytesIO()
np.savez(buffer, *data_list) # Use *data_list to unpack the list into arguments for savez_compressed
compressed_data = buffer.getvalue()
redis_conn.rpush(queue_label, compressed_data)
def fromRedisList(queue_label):
compressed_data = None
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
data_list = [loaded_data[key] for key in loaded_data.files]
return data_list
else:
time.sleep(1/50.0)
# Main frame processing function
def stream_frames(video_url, label):
if video_url.startswith("rtsp://") or video_url.startswith("rtmp://"):
cap = cv2.VideoCapture(f"rtspsrc location={video_url} latency=0 ! decodebin ! videoconvert ! appsink", cv2.CAP_GSTREAMER)
else:
cap = cv2.VideoCapture(video_url)
if not cap.isOpened():
print(f"[ERROR] Could not open stream: {video_url}")
return
print(f"[INFO] Starting capture from: {video_url}")
frame_count = 0
start_time = time.time()
lap_time = start_time
while cap.isOpened():
ret, frame = cap.read()
if not ret:
print("[INFO] Stream ended or cannot read frame.")
break
toRedis(label,frame)
frame_count += 1
time.sleep(1/30.0)
# Log every 500 frames
if frame_count % 100 == 0:
last_lap = lap_time
lap_time = time.time()
elapsed = lap_time - last_lap
print(f"[INFO] 100 frames processed in {elapsed:.2f} seconds.")
# if frame_count % 16 and (redis_len := redis_conn.llen(label)) > 75:
# print(f"[WARN] {redis_len} items in queue. Pruning to 50 latest.")
# redis_conn.ltrim(label, -50, -1)
cap.release()
total_time = time.time() - start_time
print(f"[INFO] Finished. Total frames: {frame_count}. Total time: {total_time:.2f} seconds.")
# Run on container start
if __name__ == "__main__":
if not stream_url:
print("[ERROR] Missing required environment variable: stream_url")
else:
stream_frames(stream_url, stream_label)

169
transfer.py 100644
View File

@ -0,0 +1,169 @@
import os
import cv2
import torch
import numpy as np
import random
import matplotlib.pyplot as plt
from torch import nn
from transformers import AutoModelForVideoClassification, AutoProcessor, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
dataset_folder = os.getenv("dataset_folder", "/app/dataset_folder")
output_folder = os.getenv("output_folder", "/app/output_folder")
os.makedirs(dataset_folder, exist_ok=True)
os.makedirs(f"{output_folder}/base_model", exist_ok=True)
os.makedirs(f"{output_folder}/tuned", exist_ok=True)
model = AutoModelForVideoClassification.from_pretrained(f"{output_folder}/base_model")
processor = AutoProcessor.from_pretrained(f"{output_folder}/base_model")
for param in model.base_model.parameters():
param.requires_grad = False
# Replace classifier with binary output
model.classifier = nn.Linear(model.classifier.in_features, 2)
class VideoNPZDataset(Dataset):
def __init__(self, files, labels, processor):
self.files = files
self.labels = labels
self.processor = processor
def __len__(self):
return len(self.files)
def __getitem__(self, idx):
file = self.files[idx]
label = self.labels[idx]
data = np.load(file)
frames = [data[key] for key in sorted(data.files)]
# Debug print
# for i, frame in enumerate(frames):
# print(f" Frame {i} shape: {frame.shape}, dtype: {frame.dtype}")
# Convert to RGB (assumes frames are in BGR format)
frames_rgb = [cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for frame in frames]
# for i, frame in enumerate(frames_rgb):
# print(f" Frame {i} post-RGB shape: {frame.shape}")
# Process the video frames
pixel_values = self.processor(images=frames_rgb, return_tensors="pt")["pixel_values"][0] # shape: [16, 3, 224, 224]
label_values = torch.tensor(label, dtype=torch.long)
# print(f"[{file}]")
# print(f" Processor output shape (before permute): {pixel_values.shape}")
# print(f" Processor label shape (before permute): {label_values.shape}")
return {
"pixel_values": pixel_values, # [3, 16, 224, 224]
"labels": label_values # scalar tensor
}
class myTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
print(f"[DEBUG] pixel_values: {inputs['pixel_values'].shape}")
print(f"[DEBUG] labels: {inputs['labels'].shape}")
return super().compute_loss(model, inputs, return_outputs)
def collate_fn(batch):
pixel_values = []
labels = []
for item in batch:
video = item["pixel_values"] # shape: (16, 3, 224, 224)
label = item["labels"]
pixel_values.append(video)
labels.append(label)
pixel_values = torch.stack(pixel_values) # (batch_size, 3, 16, 224, 224)
labels_values = torch.tensor(labels, dtype=torch.long) # (batch_size, 3, 16, 224, 224)
return {
"pixel_values": pixel_values,
"labels": labels_values,
}
def load_dataset_from_npz(root_dir):
files = []
labels = []
label_map = {} # for inference class map
for i, class_name in enumerate(sorted(os.listdir(root_dir))):
label_map[i] = class_name
class_dir = os.path.join(root_dir, class_name)
for file in os.listdir(class_dir):
if file.endswith(".npz"):
files.append(os.path.join(class_dir, file))
labels.append(i)
return files, labels, label_map
files, labels, label_map = load_dataset_from_npz(dataset_folder)
# for file, label in zip(files, labels):
# print(f"{file} {label} {label_map[label]}")
print(f" files: {len(files)}")
print(f" labels: {len(labels)}")
import torch
print("CUDA available:", torch.cuda.is_available())
print("Device count:", torch.cuda.device_count())
print("Current device:", torch.cuda.current_device())
print("Device name:", torch.cuda.get_device_name(0))
train_files, val_files, train_labels, val_labels = train_test_split(files, labels, test_size=0.2, stratify=labels, random_state=random.randint(1,5000))
train_dataset = VideoNPZDataset(train_files, train_labels, processor)
val_dataset = VideoNPZDataset(val_files, val_labels, processor)
def compute_metrics(eval_pred):
logits, labels = eval_pred
preds = torch.sigmoid(torch.tensor(logits)).numpy() > 0.5
accuracy = (preds.flatten() == labels).mean()
return {"accuracy": accuracy}
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
save_strategy="epoch",
per_device_train_batch_size=2,
per_device_eval_batch_size=2,
num_train_epochs=10,
logging_dir="./logs",
logging_steps=10,
save_total_limit=2,
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
data_collator=collate_fn,
)
trainer.train()
logs = trainer.state.log_history
train_loss = [x["loss"] for x in logs if "loss" in x]
eval_loss = [x["eval_loss"] for x in logs if "eval_loss" in x]
plt.plot(train_loss, label="Train Loss")
plt.plot(eval_loss, label="Eval Loss")
plt.xlabel("Log Steps")
plt.ylabel("Loss")
plt.legend()
plt.title("Loss Curve")
plt.savefig(f"{output_folder}/tuned/loss_curve.png")
trainer.save_model(f"{output_folder}/tuned")
processor.save_pretrained(f"{output_folder}/tuned")

View File

@ -0,0 +1,34 @@
# Use an official PyTorch image with CUDA support (optional)
FROM pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime
# Set working directory
WORKDIR /app
RUN apt-get update && apt-get install -y \
git \
ffmpeg \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Use the Conda pip path explicitly
RUN /opt/conda/bin/pip install --upgrade pip && \
/opt/conda/bin/pip install --force-reinstall --no-cache-dir "numpy<2.0" \
transformers==4.39.0 \
accelerate==0.30.0 \
torch==2.2.0 \
torchvision==0.17.0 \
datasets \
matplotlib \
scikit-learn \
mediapy \
pillow
RUN /opt/conda/bin/conda install -y -c conda-forge opencv
# Copy project files into container
COPY transfer.py /app/transfer.py
# Default command (you can override with docker run)
CMD ["python", "/app/transfer.py"]

View File

@ -0,0 +1,179 @@
import os
import cv2
import torch
import numpy as np
import random
import matplotlib.pyplot as plt
from torch import nn
from transformers import AutoModelForVideoClassification, AutoProcessor, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from PIL import Image
dataset_folder = os.getenv("dataset_folder", "/app/dataset_folder")
output_folder = os.getenv("output_folder", "/app/output_folder")
os.makedirs(dataset_folder, exist_ok=True)
os.makedirs(f"{output_folder}/base_model", exist_ok=True)
os.makedirs(f"{output_folder}/tuned", exist_ok=True)
model = AutoModelForVideoClassification.from_pretrained(f"{output_folder}/base_model")
processor = AutoProcessor.from_pretrained(f"{output_folder}/base_model")
# for param in model.base_model.parameters():
# param.requires_grad = False
# Replace classifier with binary output
model.classifier = nn.Linear(model.classifier.in_features, 2)
class VideoNPZDataset(Dataset):
def __init__(self, files, labels, processor):
self.files = files
self.labels = labels
self.processor = processor
def __len__(self):
return len(self.files)
def __getitem__(self, idx):
file = self.files[idx]
label = self.labels[idx]
data = np.load(file)
frames = [data[key] for key in sorted(data.files)]
# Debug print
# for i, frame in enumerate(frames):
# print(f" Frame {i} shape: {frame.shape}, dtype: {frame.dtype}")
# Convert to RGB (assumes frames are in BGR format)
frames_rgb = [Image.fromarray(frame[:, :, ::-1]) for frame in frames]
# for i, frame in enumerate(frames_rgb):
# print(f" Frame {i} post-RGB shape: {frame.shape}")
# Process the video frames
pixel_values = self.processor(images=frames_rgb, return_tensors="pt")["pixel_values"][0] # shape: [16, 3, 224, 224]
label_values = torch.tensor(label, dtype=torch.long)
# print(f"[{file}]")
# print(f" Processor output shape (before permute): {pixel_values.shape}")
# print(f" Processor label shape (before permute): {label_values.shape}")
return {
"pixel_values": pixel_values, # [3, 16, 224, 224]
"labels": label_values # scalar tensor
}
class myTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
print(f"[DEBUG] pixel_values: {inputs['pixel_values'].shape}")
print(f"[DEBUG] labels: {inputs['labels'].shape}")
return super().compute_loss(model, inputs, return_outputs)
def collate_fn(batch):
pixel_values = []
labels = []
for item in batch:
video = item["pixel_values"] # shape: (16, 3, 224, 224)
label = item["labels"]
pixel_values.append(video)
labels.append(label)
pixel_values = torch.stack(pixel_values) # (batch_size, 3, 16, 224, 224)
labels_values = torch.tensor(labels, dtype=torch.long) # (batch_size, 3, 16, 224, 224)
return {
"pixel_values": pixel_values,
"labels": labels_values,
}
def load_dataset_from_npz(root_dir):
files = []
labels = []
groups = []
label_map = {} # for inference class map
group_map = {}
group_counter = 0
for i, class_name in enumerate(sorted(os.listdir(root_dir))):
label_map[i] = class_name
class_dir = os.path.join(root_dir, class_name)
for file in os.listdir(class_dir):
if file.endswith(".npz"):
group_id = f"{file.split('_')[0]}_{i}"
if group_id not in group_map:
group_counter += 1
group_map[group_id] = group_counter
groups.append(group_map[group_id])
files.append(os.path.join(class_dir, file))
labels.append(i)
return files, labels, label_map, groups
files, labels, label_map, groups = load_dataset_from_npz(dataset_folder)
# for file, label in zip(files, labels):
# print(f"{file} {label} {label_map[label]}")
print(f" files: {len(files)}")
print(f" labels: {len(labels)}")
# import torch
# print("CUDA available:", torch.cuda.is_available())
# print("Device count:", torch.cuda.device_count())
# print("Current device:", torch.cuda.current_device())
# print("Device name:", torch.cuda.get_device_name(0))
train_files, val_files, train_labels, val_labels = train_test_split(files, labels, test_size=0.2, stratify=groups, random_state=random.randint(1,5000))
train_dataset = VideoNPZDataset(train_files, train_labels, processor)
val_dataset = VideoNPZDataset(val_files, val_labels, processor)
def compute_metrics(eval_pred):
logits, labels = eval_pred
preds = torch.sigmoid(torch.tensor(logits)).numpy() > 0.5
accuracy = (preds.flatten() == labels).mean()
return {"accuracy": accuracy}
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
save_strategy="epoch",
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
num_train_epochs=3,
logging_dir="./logs",
logging_steps=2,
save_total_limit=2,
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
data_collator=collate_fn,
)
trainer.train()
logs = trainer.state.log_history
train_loss = [x["loss"] for x in logs if "loss" in x]
eval_loss = [x["eval_loss"] for x in logs if "eval_loss" in x]
plt.plot(train_loss, label="Train Loss")
plt.plot(eval_loss, label="Eval Loss")
plt.xlabel("Log Steps")
plt.ylabel("Loss")
plt.legend()
plt.title("Loss Curve")
plt.savefig(f"{output_folder}/tuned/loss_curve.png")
trainer.save_model(f"{output_folder}/tuned")
processor.save_pretrained(f"{output_folder}/tuned")