-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxapp_control.py
49 lines (32 loc) · 1.06 KB
/
xapp_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import socket
# open control socket
def open_control_socket(port: int):
print('Waiting for xApp connection on port ' + str(port))
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# host = socket.gethostname()
# bind to INADDR_ANY
server.bind(('', port))
server.listen(5)
control_sck, client_addr = server.accept()
print('xApp connected: ' + client_addr[0] + ':' + str(client_addr[1]))
return control_sck
# send through socket
def send_socket(socket, msg: str):
bytes_num = socket.send(msg.encode('utf-8'))
print('Socket sent ' + str(bytes_num) + ' bytes')
# receive data from socker
def receive_from_socket(socket) -> str:
ack = 'Indication ACK\n'
data = socket.recv(4096)
try:
data = data.decode('utf-8')
except UnicodeDecodeError:
return ''
if ack in data:
data = data[len(ack):]
if len(data) > 0:
# print("Received: ", str(data))
return data.strip()
else:
return ''