-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcamscan.py
122 lines (94 loc) · 3.33 KB
/
camscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import os
import socket
import shodan
import cv2
# Disable SSL warnings (for testing purposes only)
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def get_environment_variable(var_name, prompt):
value = os.getenv(var_name)
if not value:
value = input(prompt)
os.environ[var_name] = value
return value
def scan_for_cameras(api_key):
"""Scans the specified IP range for open cameras.
Args:
api_key: The Shodan API key.
Returns:
A list of camera IP addresses and their corresponding titles.
"""
try:
# Initialize Shodan API
api = shodan.Shodan(api_key)
# Search Shodan for cameras
results = api.search(query='http.title:"Axis Camera" OR http.title:"Foscam Camera" OR http.title:"D-Link Camera" OR http.title:"Hikvision Camera" OR http.title:"Netgear Camera"')
# Extract camera IP addresses and titles
cameras = [(result['ip_str'], result['http']['title']) for result in results['matches']]
return cameras
except Exception as e:
print(f"Error scanning for cameras: {e}")
return []
def get_stream_url(ip_address):
"""Gets the stream URL from the camera.
Args:
ip_address: The IP address of the camera.
Returns:
The stream URL if successful, None otherwise.
"""
try:
# Connect to the camera
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip_address, 80))
# Send a HTTP request to retrieve the camera's stream
request = f"GET / HTTP/1.1\r\nHost: {ip_address}\r\n\r\n"
sock.sendall(request.encode())
# Receive the response
response = sock.recv(4096).decode()
# Check if the response contains a stream URL
if "rtsp://" in response:
# Extract the stream URL
stream_url = response.split("rtsp://")[1].split('"')[0]
return f"rtsp://{stream_url}"
else:
return None
except Exception as e:
print(f"Error getting stream URL: {e}")
return None
finally:
sock.close()
def watch_stream(stream_url):
"""Watches the camera stream.
Args:
stream_url: The URL of the camera stream.
"""
try:
# Use OpenCV to watch the stream
cap = cv2.VideoCapture(stream_url)
while True:
ret, frame = cap.read()
if not ret:
break
cv2.imshow('Camera Stream', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
except Exception as e:
print(f"Error watching stream: {e}")
def main():
# Define the Shodan API key
api_key = get_environment_variable('SHODAN_API_KEY', 'Enter your Shodan API key: ')
# Scan for cameras
cameras = scan_for_cameras(api_key)
# Get stream URLs
for ip_address, title in cameras:
print(f"Getting stream URL for {title} at {ip_address}...")
stream_url = get_stream_url(ip_address)
if stream_url:
print(f"Stream URL: {stream_url}")
watch_stream(stream_url)
else:
print(f"Failed to get stream URL for {title} at {ip_address}")
if __name__ == "__main__":
main()