-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
140 lines (107 loc) · 5.14 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from pickle import FALSE
from opcua import Server, ua, uamethod
from opcua.common.manage_nodes import create_method
from simulation import MController
server = Server()
# opcua server setup
url = "opc.tcp://localhost:4841/"
server.set_endpoint(url)
server.set_security_IDs(["Anonymous", "Basic256Sha256", "Admin"])
server.allow_remote_admin("allow")
uri = "localhost/ptp_programmierprojekt"
namespace = server.register_namespace(uri)
objects = server.get_objects_node()
server.start()
print("Server started at {}".format(url))
anlage = objects.add_object("ns=1;i=1", "Machine")
anlage_status = anlage.add_variable("ns=1;i=12", "machine_status", False)
komponente_1 = objects.add_object("ns=1;i=2", "komponente_1")
komponente_1_status = komponente_1.add_variable("ns=1;i=21", "komponente_1_status", False)
lichtschranke_l1 = komponente_1.add_variable("ns=1;i=23", "lichtschranke_l1", False)
initiator_1 = komponente_1.add_variable("ns=1;i=24", "initiator_1", False)
lichtschranke_l2 = komponente_1.add_variable("ns=1;i=25", "lichtschranke_l2", False)
fließband_1 = objects.add_object("ns=1;i=3", "fließband_1")
fließband_1_status = fließband_1.add_variable("ns=1;i=34", "fließband_1_status", False)
fließband_2 = objects.add_object("ns=1;i=31", "fließband_2")
fließband_2_puffer = fließband_2.add_variable("ns=1;i=32", "fließband_2_puffer", False)
fließband_2_status = fließband_2.add_variable("ns=1;i=33", "fließband_2_status", False)
fahrstuhldockingstation = objects.add_object("ns=1;i=4", "fahrstuhldockingstation")
fahrstuhldockingstation_status = fahrstuhldockingstation.add_variable("ns=1;i=42", "fahrstuhldockingstation_status", False)
lichtschranke_l3 = fahrstuhldockingstation.add_variable("ns=1;i=41", "lichtschranke_l3", False)
hochregallager = objects.add_object("ns=1;i=5", "hochregallager")
roboter = hochregallager.add_variable("ns=1;i=51", "roboter", False)
freie_roboter = hochregallager.add_variable("ns=1;i=52", "freie_roboter", 6)
freie_lagerplaetze = hochregallager.add_variable("ns=1;i=53", "freie_lagerplaetze", 20000)
werkstueck = objects.add_object("ns=1;i=6", "werkstueck")
werkstueck_status = werkstueck.add_variable("ns=1;i=61", "werkstueck_status", False)
werkstueck_typ = werkstueck.add_variable("ns=1;i=62", "werkstueck_typ", False)
werkstueck_position = werkstueck.add_variable("ns=1;i=65", "werkstueck_position", False)
task = objects.add_object("ns=1;i=8", "task")
task_status = task.add_variable("ns=1;i=81", "task_status", False)
task_starttime = task.add_variable("ns=1;i=82", "task_starttime", False)
task_stoptime = task.add_variable("ns=1;i=83", "task_stoptime", False)
current_task = task.add_variable("ns=1;i=84", "current_task", False)
programm = objects.add_object("ns=1;i=7", "Programm")
program_name = programm.add_variable("ns=1;i=71", "Programm", True)
class DataObserver():
def update(self, args):
for key, value in args.items():
print(key, value)
output = globals()[key]
output.set_value(value)
pass
machine_controller = MController()
obs = DataObserver()
controller = machine_controller.setup(obs, globals())
# ua args for method "change_machine_state"
statusUA = ua.Argument()
statusUA.Name = "StatusUA"
statusUA.DataType = ua.NodeId(ua.ObjectIds.Boolean)
statusUA.ArrayDimensions = []
statusUA.Description = ua.LocalizedText("Status change")
statusoutput = ua.Argument()
statusoutput.Name = "StatusOutputUA"
statusoutput.DataType = ua.NodeId(ua.ObjectIds.Boolean)
statusoutput.ArrayDimensions = []
statusoutput.Description = ua.LocalizedText("Status change output")
taskUA = ua.Argument()
taskUA.Name = "TaskUA"
taskUA.DataType = ua.NodeId(ua.ObjectIds.String)
taskUA.ArrayDimensions = []
taskUA.Description = ua.LocalizedText("task execution")
taskUAinstruction = ua.Argument()
taskUAinstruction.Name = "TaskUAInstruction"
taskUAinstruction.DataType = ua.NodeId(ua.ObjectIds.Integer)
taskUAinstruction.ArrayDimensions = []
taskUAinstruction.Description = ua.LocalizedText("task execution")
taskoutput = ua.Argument()
taskoutput.Name = "TaskOutputUA"
taskoutput.DataType = ua.NodeId(ua.ObjectIds.String)
taskoutput.ArrayDimensions = []
taskoutput.Description = ua.LocalizedText("task execution result")
programUA = ua.Argument()
programUA.Name = "ProgramUA"
programUA.DataType = ua.NodeId(ua.ObjectIds.String)
programUA.ArrayDimensions = []
programUA.Description = ua.LocalizedText("Program")
programoutputUA = ua.Argument()
programoutputUA.Name = "ProgramUA"
programoutputUA.DataType = ua.NodeId(ua.ObjectIds.String)
programoutputUA.ArrayDimensions = []
programoutputUA.Description = ua.LocalizedText("Program")
#Callable ua methods
@uamethod
def change_machine_status(status):
if status == True:
controller.startmachine()
return status
elif status == False:
controller.stopmachine()
return status
@uamethod
def startprogram(parent, name):
program = controller.runprogram(name)
return program
#Register methods in namespace
anlage.add_method(1, "change machine status", change_machine_status, [statusUA], [statusoutput])
anlage.add_method(1, "run program", startprogram, [programUA], [programoutputUA])