-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlumiax_bt.py
109 lines (86 loc) · 3.25 KB
/
lumiax_bt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import pygatt
import sys
import time
VALID_HEADERS = (b"\x01\x03", b"\x01\x04")
GET_STATUS = "fe043030002bab15"
message_length = 0
need_to_concat = False
concat_timeout = 1
start_time = 0
data = b""
def data_handler_cb(handle, value):
global data
global message_length
global need_to_concat
global start_time
# print(f"Handle: {handle}")
# value type is bytearray
value_string = value.hex()
print(f"Data: {value_string}")
# PV multi-line data handling
if value.startswith(VALID_HEADERS) and len(value) > 2:
data = b""
message_length = value[2]
print(f"message_length={message_length}")
if (len(value)-5) < message_length:
start_time = time.time()
need_to_concat = True
print("Data detected. Next data will be concated")
# # print(f"start_time={start_time}")
# # print(f"stop_time={start_time + concat_timeout}")
# data = ""
if need_to_concat:
if time.time() <= (start_time + concat_timeout):
data = data + value
else:
need_to_concat = False
message_length = 0
print("Concat timeout occured. Concat aborted.")
# analyzing data
if (len(data) -5) == message_length:
need_to_concat = False
message_length = 0
print("Full data fetched. Concat finished.")
print(f"batt SoC ={int.from_bytes(data[46:47], 'big')}")
print(f"PV V={int.from_bytes(data[63:65], 'big') / 100}")
print(f"PV A={int.from_bytes(data[65:67], 'big', signed=True) / 100}")
pv_w = int.from_bytes(data[67:69], 'big') / 100
print(f"PV W=\t{pv_w}")
pv_total = int.from_bytes(data[73:75], 'big') / 100
print(f"PV TOTAL=\t{pv_total}\tkWh")
print(f"BATT V={int.from_bytes(data[47:49], 'big') / 100}")
print(f"BATT A={int.from_bytes(data[49:51], 'big', signed=True) / 100}")
print(f"BATT temp={int.from_bytes(data[17:19], 'big') / 100}")
print(f"LOAD V={int.from_bytes(data[55:57], 'big') / 100}")
print(f"LOAD A={int.from_bytes(data[57:59], 'big', signed=True) / 100}")
load_w = int.from_bytes(data[59:61], 'big') / 100
print(f"LOAD W=\t{load_w}")
load_total = int.from_bytes(data[79:81], 'big') / 100
print(f"LOAD TOTAL=\t{load_total}\tkWh")
# print(f"time.time()={time.time()}")
# print(f"need_to_concat={need_to_concat}")
# print(f"concat_timeout={concat_timeout}")
# print(f"data={data}")
def main():
adapter = pygatt.GATTToolBackend(hci_device=sys.argv[1])
try:
adapter.start()
print("adapter started")
device = adapter.connect(sys.argv[2])
print("connected")
device.subscribe('0000ff01-0000-1000-8000-00805f9b34fb', callback=data_handler_cb, indication=True)
print("subscribed")
# enable notifications
adapter.sendline('char-write-req 0x0012 0100')
# get data
device.char_write_handle(0x0014, bytes.fromhex(sys.argv[3]))
time.sleep(1)
print("Script finished")
# print(f"data={data}")
# print(f"len(data)={len(data)}")
finally:
adapter.stop()
print("adapter stopped")
return 0
if __name__ == '__main__':
exit(main())