-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreamer.py
192 lines (160 loc) · 7.06 KB
/
streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import sqlite3
import base64
import time
import cv2
import numpy as np
import imutils
from typing import Union, Any, Mapping
from fastapi import UploadFile, File, BackgroundTasks
from starlette.responses import StreamingResponse
import asyncio
SQLLITE_CONN_STR = "file:framestreamerdb1?mode=memory&cache=shared"
class FrameStreamer:
"""The FrameStreamer class allows you to send frames and visualize them as a stream"""
def __init__(self):
self.conn = sqlite3.connect(SQLLITE_CONN_STR, uri=True)
self._initialize_db()
def _initialize_db(self):
with self.conn:
self.conn.execute('''CREATE TABLE IF NOT EXISTS images (
id TEXT PRIMARY KEY,
image TEXT
)''')
def _get_image(self, img_id: str) -> Union[str, None]:
"""Get an image from the SQLite DB.
Args:
img_id (str): ID (primary key) of the image to be retrieved in the DB
Returns:
Union[str, None]: Image (in base64) or None if not found
"""
try:
with sqlite3.connect(SQLLITE_CONN_STR, uri=True) as conn:
cursor = conn.cursor()
cursor.execute("SELECT image FROM images WHERE id = ?", (img_id,))
row = cursor.fetchone()
return row[0] if row else None
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
async def _store_image_str(self, img_id: str, img_str_b64: str) -> None:
"""Store an image string (in base64) to the DB.
Args:
img_id (str): ID (primary key) of the image.
img_str_b64 (str): Image string (in base64)
"""
try:
with sqlite3.connect(SQLLITE_CONN_STR, uri=True) as conn:
cursor = conn.cursor()
cursor.execute("INSERT OR REPLACE INTO images (id, image) VALUES (?, ?)", (img_id, img_str_b64))
conn.commit()
except sqlite3.Error as e:
print(f"Database error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
async def _image_file_to_base64(self, file: UploadFile = File(...)) -> str:
"""Convert a loaded image to Base64
Args:
file (UploadFile, optional): Image file to be converted.
Returns:
str: Image converted (in Base64)
"""
image_file = await file.read()
return base64.b64encode(image_file).decode("utf-8")
async def send_frame(self, stream_id: str, frame: Union[str, UploadFile, bytes]) -> None:
"""Send a frame to be streamed.
Args:
stream_id (str): ID (primary key) of the frame
frame (Union[str, UploadFile, bytes]): Frame (image) to be streamed.
"""
if isinstance(frame, str):
await self._store_image_str(stream_id, frame)
elif isinstance(frame, UploadFile):
img_str = await self._image_file_to_base64(frame)
await self._store_image_str(stream_id, img_str)
elif isinstance(frame, bytes):
img_str = base64.b64encode(frame).decode("utf-8")
await self._store_image_str(stream_id, img_str)
def _readb64(self, encoded_img: str) -> Any:
"""Decode an image (in base64) to an OpenCV image
Args:
encoded_img (str): Image (in base64)
Returns:
Any: Image decoded from OpenCV
"""
if encoded_img is None:
return None
nparr = np.frombuffer(base64.b64decode(encoded_img), np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return img
def _start_stream(self, img_id: str, freq: int = 30):
"""Continuous loop to stream the frame from SQLite to HTML image/jpeg format
Args:
img_id (str): ID (primary key) of the image in the DB
freq (int, optional): Loop frequency. Defaults to 30.
Yields:
bytes: HTML containing the bytes to plot the stream
"""
sleep_duration = 1.0 / freq
while True:
time.sleep(sleep_duration)
try:
frame = self._readb64(self._get_image(img_id))
if frame is None:
continue
frame = imutils.resize(frame, width=680)
(flag, encodedImage) = cv2.imencode(".jpg", frame)
if not flag:
continue
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')
except Exception as e:
print(f"Error during streaming: {e}")
continue
def get_stream(self, stream_id: str, freq: int = 30, status_code: int = 206,
headers: Union[Mapping[str, str], None] = None,
background: Union[BackgroundTasks, None] = None) -> StreamingResponse:
"""Get a stream of frames
Args:
stream_id (str): ID (primary key) of the stream to be retrieved
freq (int, optional): Frequency of the continuous loop retrieval (in Hz). Defaults to 30.
status_code (int, optional): HTTP response status code. Defaults to 206.
headers (Union[Mapping[str, str], None], optional): HTTP headers. Defaults to None.
background (Union[BackgroundTasks, None], optional): FastAPI background. Defaults to None.
Returns:
StreamingResponse: FastAPI StreamingResponse
"""
return StreamingResponse(self._start_stream(stream_id, freq),
media_type="multipart/x-mixed-replace;boundary=frame",
status_code=status_code,
headers=headers,
background=background)
async def base64_mix_generator(self, bots_names, fps=15):
sleep_duration = 1.0 / fps
while True:
await asyncio.sleep(sleep_duration)
mix = ''
for bot in bots_names:
stream_id = bot['name']
try:
# Obtiene la imagen en base64
base64_frame = self._get_image(stream_id)
# Decodifica la imagen
frame = self._readb64(base64_frame)
except (ValueError, cv2.error) as e:
print(f"Error procesando la imagen de {stream_id}: {e}")
continue
if frame is None:
continue
# Redimensiona y codifica la imagen
output_frame = imutils.resize(frame, width=680)
if output_frame is None:
continue
flag, encodedImage = cv2.imencode(".jpg", output_frame)
if not flag:
continue
mix += f"{stream_id}:{base64_frame} "
# Elimina el último espacio ' '
mix = mix.rstrip(' ')
yield mix