-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfinal.py
209 lines (173 loc) · 7.14 KB
/
final.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# final working code
import cv2
import json
import asyncio
import websockets
import base64
from ultralytics import YOLO
from collections import deque
from threading import Thread
import time
import winsound # Use for playing sound on Windows
# Load models
weapon_model = YOLO(r'M:/mask/detect/threat_train/weights/best.pt')
fire_smoke_model = YOLO(r'detect/fire_smoke_train/weights/best.pt')
# Object classes
weapon_class_names = ["violence", "gun", "knife"]
fire_smoke_class_names = ["fire", "smoke"]
# Store clients and threat status
clients = {
"local_master": None,
"regional_master": None
}
threat_detected = False
threat_acknowledged = False
threat_detection_window = deque(maxlen=30) # Store last 30 frames for threat analysis
# Define thresholds
THREAT_CONFIDENCE_THRESHOLD = 0.50
FIRE_CONFIDENCE_THRESHOLD = 0.50
SMOKE_CONFIDENCE_THRESHOLD = 0.90
ALARM_FILE = 'alarm.wav' # Replace with path to your alarm sound file
# Function to play an alarm sound
def play_alarm():
print("Playing alarm")
# winsound.Beep(1000, 1000) # Frequency, duration in milliseconds
# winsound.PlaySound(ALARM_FILE, winsound.SND_FILENAME)
def stop_alarm():
# winsound.PlaySound(None, winsound.SND_PURGE)
print("Stopping alarm")
# Define threat detection logic
def is_threat(detections):
global threat_detection_window, threat_detected
threat_detection_window.append(detections)
if len(threat_detection_window) < threat_detection_window.maxlen:
return False # Not enough data to make a decision
# If a threat is already detected, do not generate it again
if threat_detected:
return True
threat_count = 0
for frame_detections in threat_detection_window:
if any(d['class'] in ["gun", "knife", "fire", "smoke"] and d['confidence'] > THREAT_CONFIDENCE_THRESHOLD for d in frame_detections):
threat_count += 1
return threat_count > (threat_detection_window.maxlen / 2)
def detect_objects(frame):
results = []
# Detect weapons
weapon_results = weapon_model(frame, stream=True)
for r in weapon_results:
for box in r.boxes:
confidence = float(box.conf[0])
cls = int(box.cls[0])
class_name = weapon_class_names[cls]
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
if confidence > THREAT_CONFIDENCE_THRESHOLD:
results.append({
"class": class_name,
"confidence": confidence,
"box": [x1, y1, x2, y2],
"color": (0, 0, 255)
})
# Detect fire and smoke
fire_results = fire_smoke_model(frame, stream=True)
for r in fire_results:
for box in r.boxes:
confidence = float(box.conf[0])
cls = int(box.cls[0])
class_name = fire_smoke_class_names[cls]
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
if class_name == "fire" and confidence > FIRE_CONFIDENCE_THRESHOLD:
results.append({
"class": class_name,
"confidence": confidence,
"box": [x1, y1, x2, y2],
"color": (0, 165, 255)
})
elif class_name == "smoke" and confidence > SMOKE_CONFIDENCE_THRESHOLD:
results.append({
"class": class_name,
"confidence": confidence,
"box": [x1, y1, x2, y2],
"color": (0, 0, 0)
})
return results
# WebSocket server function for video streaming
async def video_stream(websocket, path):
global threat_detected, threat_acknowledged
user_type = path.strip('/')
if user_type not in ["local_master", "regional_master"]:
await websocket.close()
return
clients[user_type] = websocket
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
try:
while True:
success, frame = cap.read()
if not success:
break
detections = detect_objects(frame)
threat_detected = is_threat(detections)
# Draw boxes and labels
for detection in detections:
x1, y1, x2, y2 = detection["box"]
color = detection["color"]
label = f"{detection['class']} {detection['confidence']:.2f}"
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Encode the frame
_, buffer = cv2.imencode('.jpg', frame)
frame_encoded = base64.b64encode(buffer).decode('utf-8')
# Prepare data to send
data = {
"detections": detections,
"frame": frame_encoded,
"threat_detected": threat_detected
}
# Send data to the local master
if clients["local_master"]:
await clients["local_master"].send(json.dumps(data))
if threat_detected and not threat_acknowledged:
play_alarm() # Play alarm when a threat is detected
# Start a timeout thread to notify regional master if not acknowledged
Thread(target=threat_timeout).start()
# Notify regional master
if clients["regional_master"]:
notification = json.dumps({
"message": "Threat detected",
"threat_detected": threat_detected
})
await clients["regional_master"].send(notification)
await asyncio.sleep(0.033)
finally:
cap.release()
clients[user_type] = None
async def threat_timeout():
global threat_detected, threat_acknowledged
# Wait for 10 seconds for acknowledgment
await asyncio.sleep(10)
if threat_detected and not threat_acknowledged:
# Notify regional master and play alarm
if clients["regional_master"]:
notification = json.dumps({"message": "Threat not acknowledged", "threat_detected": threat_detected})
await clients["regional_master"].send(notification)
play_alarm()
async def handle_acknowledgment(websocket, path):
global threat_acknowledged
async for message in websocket:
data = json.loads(message)
if data.get('action') == 'acknowledge_threat':
threat_acknowledged = True
# If threat is acknowledged, stop the alarm
stop_alarm()
print("Threat acknowledged")
# Notify regional master about threat acknowledgment
if clients["regional_master"]:
await clients["regional_master"].send(json.dumps({"message": "Threat acknowledged"}))
# Start the WebSocket servers
start_server = websockets.serve(video_stream, "localhost", 8765)
acknowledge_server = websockets.serve(handle_acknowledgment, "localhost", 8766)
# Run the servers
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_until_complete(acknowledge_server)
asyncio.get_event_loop().run_forever()