-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontrollersim.py
64 lines (41 loc) · 1.26 KB
/
controllersim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from enum import IntEnum
import asyncio
from socket import SocketIO
from sre_parse import State
from typing import Dict
import zmq
import zmq.asyncio
import pickle
from cp_handler import CPStates
context = zmq.asyncio.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
async def zmq_send ( state :str , message , protocol :str = "controller") :
dumped_msg = pickle.dumps (
{
"state" : state ,
"message" : message ,
"protocol" : protocol,
}
)
await socket.send(dumped_msg)
async def main () :
while True:
message =await socket.recv()
dump :dict = pickle.loads(message)
try :
value = dump.get("message")
print(value)
if value == "init" :
await zmq_send("init",message="YES")
elif value == "cp_write" :
print("SENDINGGG")
await zmq_send("init",message=pickle.dumps(CPStates.A))
except KeyError :
print(value)
# if message == b"slac" :
# socket.send(b"1")
# print("YES")
# if dump.get("state") == "slac" :
# print("YESSSSSS")
asyncio.run(main())