6
6
The following is an example of how to use the asynchronous modbus
7
7
client implementation from pymodbus.
8
8
"""
9
- # --------------------------------------------------------------------------- #
9
+ # --------------------------------------------------------------------------- #
10
10
# import needed libraries
11
- # --------------------------------------------------------------------------- #
11
+ # --------------------------------------------------------------------------- #
12
12
from twisted .internet import reactor , protocol
13
13
from pymodbus .constants import Defaults
14
14
15
- # --------------------------------------------------------------------------- #
15
+ # --------------------------------------------------------------------------- #
16
16
# choose the requested modbus protocol
17
- # --------------------------------------------------------------------------- #
17
+ # --------------------------------------------------------------------------- #
18
18
from pymodbus .client .async import ModbusClientProtocol
19
19
from pymodbus .client .async import ModbusUdpClientProtocol
20
20
from pymodbus .framer .rtu_framer import ModbusRtuFramer
21
21
22
- # --------------------------------------------------------------------------- #
22
+ # --------------------------------------------------------------------------- #
23
23
# configure the client logging
24
- # --------------------------------------------------------------------------- #
24
+ # --------------------------------------------------------------------------- #
25
25
import logging
26
26
FORMAT = ('%(asctime)-15s %(threadName)-15s'
27
27
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s' )
28
28
logging .basicConfig (format = FORMAT )
29
29
log = logging .getLogger ()
30
30
log .setLevel (logging .DEBUG )
31
31
32
- # --------------------------------------------------------------------------- #
32
+ # --------------------------------------------------------------------------- #
33
33
# helper method to test deferred callbacks
34
34
# --------------------------------------------------------------------------- #
35
35
@@ -40,9 +40,9 @@ def _assertor(value):
40
40
deferred .addCallback (lambda r : _assertor (callback (r )))
41
41
deferred .addErrback (lambda _ : _assertor (False ))
42
42
43
- # --------------------------------------------------------------------------- #
43
+ # --------------------------------------------------------------------------- #
44
44
# specify slave to query
45
- # --------------------------------------------------------------------------- #
45
+ # --------------------------------------------------------------------------- #
46
46
# The slave to query is specified in an optional parameter for each
47
47
# individual request. This can be done by specifying the `unit` parameter
48
48
# which defaults to `0x00`
@@ -64,9 +64,9 @@ def exampleRequests(client):
64
64
rr .addCallback (processResponse )
65
65
stopAsynchronousTest (client )
66
66
67
- # --------------------------------------------------------------------------- #
67
+ # --------------------------------------------------------------------------- #
68
68
# example requests
69
- # --------------------------------------------------------------------------- #
69
+ # --------------------------------------------------------------------------- #
70
70
# simply call the methods that you would like to use. An example session
71
71
# is displayed below along with some assert checks. Note that unlike the
72
72
# synchronous version of the client, the asynchronous version returns
@@ -89,45 +89,45 @@ def stopAsynchronousTest(client):
89
89
def beginAsynchronousTest (client ):
90
90
rq = client .write_coil (1 , True , unit = UNIT )
91
91
rr = client .read_coils (1 , 1 , unit = UNIT )
92
- dassert (rq , lambda r : r . function_code < 0x80 ) # test for no error
92
+ dassert (rq , lambda r : not r . isError () ) # test for no error
93
93
dassert (rr , lambda r : r .bits [0 ] == True ) # test the expected value
94
-
94
+
95
95
rq = client .write_coils (1 , [True ]* 8 , unit = UNIT )
96
96
rr = client .read_coils (1 , 8 , unit = UNIT )
97
- dassert (rq , lambda r : r . function_code < 0x80 ) # test for no error
97
+ dassert (rq , lambda r : not r . isError () ) # test for no error
98
98
dassert (rr , lambda r : r .bits == [True ]* 8 ) # test the expected value
99
-
99
+
100
100
rq = client .write_coils (1 , [False ]* 8 , unit = UNIT )
101
101
rr = client .read_discrete_inputs (1 , 8 , unit = UNIT )
102
- dassert (rq , lambda r : r . function_code < 0x80 ) # test for no error
102
+ dassert (rq , lambda r : not r . isError () ) # test for no error
103
103
dassert (rr , lambda r : r .bits == [True ]* 8 ) # test the expected value
104
-
104
+
105
105
rq = client .write_register (1 , 10 , unit = UNIT )
106
106
rr = client .read_holding_registers (1 , 1 , unit = UNIT )
107
- dassert (rq , lambda r : r . function_code < 0x80 ) # test for no error
107
+ dassert (rq , lambda r : not r . isError () ) # test for no error
108
108
dassert (rr , lambda r : r .registers [0 ] == 10 ) # test the expected value
109
-
109
+
110
110
rq = client .write_registers (1 , [10 ]* 8 , unit = UNIT )
111
111
rr = client .read_input_registers (1 , 8 , unit = UNIT )
112
- dassert (rq , lambda r : r . function_code < 0x80 ) # test for no error
112
+ dassert (rq , lambda r : not r . isError () ) # test for no error
113
113
dassert (rr , lambda r : r .registers == [17 ]* 8 ) # test the expected value
114
-
114
+
115
115
arguments = {
116
116
'read_address' : 1 ,
117
117
'read_count' : 8 ,
118
118
'write_address' : 1 ,
119
119
'write_registers' : [20 ]* 8 ,
120
120
}
121
- rq = client .readwrite_registers (** arguments , unit = UNIT )
121
+ rq = client .readwrite_registers (arguments , unit = UNIT )
122
122
rr = client .read_input_registers (1 , 8 , unit = UNIT )
123
123
dassert (rq , lambda r : r .registers == [20 ]* 8 ) # test the expected value
124
124
dassert (rr , lambda r : r .registers == [17 ]* 8 ) # test the expected value
125
125
stopAsynchronousTest (client )
126
126
127
127
128
- # --------------------------------------------------------------------------- #
128
+ # --------------------------------------------------------------------------- #
129
129
# extra requests
130
- # --------------------------------------------------------------------------- #
130
+ # --------------------------------------------------------------------------- #
131
131
# If you are performing a request that is not available in the client
132
132
# mixin, you have to perform the request like this instead::
133
133
#
@@ -139,11 +139,11 @@ def beginAsynchronousTest(client):
139
139
# if isinstance(response, ClearCountersResponse):
140
140
# ... do something with the response
141
141
#
142
- # --------------------------------------------------------------------------- #
142
+ # --------------------------------------------------------------------------- #
143
143
144
- # --------------------------------------------------------------------------- #
144
+ # --------------------------------------------------------------------------- #
145
145
# choose the client you want
146
- # --------------------------------------------------------------------------- #
146
+ # --------------------------------------------------------------------------- #
147
147
# make sure to start an implementation to hit against. For this
148
148
# you can use an existing device, the reference implementation in the tools
149
149
# directory, or start a pymodbus server.
0 commit comments