Tiresias/reader/reader.py

112 lines
3.6 KiB
Python

import os
import io
import time
import redis
import cv2
import numpy as np
import struct
# Read environment variables
stream_url = os.getenv("stream_url")
stream_label = os.getenv("stream_label", "default_stream")
# Connect to Redis
redis_host = os.getenv("redis_host", "localhost")
redis_port = int(os.getenv("redis_port", "6379"))
redis_db = int(os.getenv("redis_db", "0"))
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=False)
# Clean the Redis queue
redis_conn.delete(stream_label)
print(f"[INFO] Cleared Redis queue for label: {stream_label}")
def toRedis(queue_label, data):
print(f"Pushed data to queue: {queue_label}")
buffer = io.BytesIO()
np.savez(buffer, data=data)
compressed_data = buffer.getvalue()
return redis_conn.rpush(queue_label, compressed_data)
def fromRedis(queue_label):
compressed_data = None
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
return loaded_data['data']
else:
time.sleep(1/50.0)
def toRedisList(queue_label, data_list):
print(f"Pushed data to queue: {queue_label}")
buffer = io.BytesIO()
np.savez(buffer, *data_list) # Use *data_list to unpack the list into arguments for savez_compressed
compressed_data = buffer.getvalue()
redis_conn.rpush(queue_label, compressed_data)
def fromRedisList(queue_label):
compressed_data = None
while compressed_data == None:
compressed_data = redis_conn.lpop(queue_label)
if compressed_data:
print(f"Popped data from queue: {queue_label}")
buffer = io.BytesIO(compressed_data)
loaded_data = np.load(buffer)
data_list = [loaded_data[key] for key in loaded_data.files]
return data_list
else:
time.sleep(1/50.0)
# Main frame processing function
def stream_frames(video_url, label):
if video_url.startswith("rtsp://") or video_url.startswith("rtmp://"):
cap = cv2.VideoCapture(f"rtspsrc location={video_url} latency=0 ! decodebin ! videoconvert ! appsink", cv2.CAP_GSTREAMER)
else:
cap = cv2.VideoCapture(video_url)
if not cap.isOpened():
print(f"[ERROR] Could not open stream: {video_url}")
return
print(f"[INFO] Starting capture from: {video_url}")
frame_count = 0
start_time = time.time()
lap_time = start_time
while cap.isOpened():
ret, frame = cap.read()
if not ret:
print("[INFO] Stream ended or cannot read frame.")
break
toRedis(label,frame)
frame_count += 1
time.sleep(1/30.0)
# Log every 500 frames
if frame_count % 100 == 0:
last_lap = lap_time
lap_time = time.time()
elapsed = lap_time - last_lap
print(f"[INFO] 100 frames processed in {elapsed:.2f} seconds.")
# if frame_count % 16 and (redis_len := redis_conn.llen(label)) > 75:
# print(f"[WARN] {redis_len} items in queue. Pruning to 50 latest.")
# redis_conn.ltrim(label, -50, -1)
cap.release()
total_time = time.time() - start_time
print(f"[INFO] Finished. Total frames: {frame_count}. Total time: {total_time:.2f} seconds.")
# Run on container start
if __name__ == "__main__":
if not stream_url:
print("[ERROR] Missing required environment variable: stream_url")
else:
stream_frames(stream_url, stream_label)