-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
135 lines (108 loc) · 3.57 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
###########################################################################
#
# MQTT client firmware for Itead S20 Smart Socket
#
# Apache License Version 2.0
#
# Copyright (c) 2017, Tero Saarni
#
# Description
#
# Unofficial alternative firmware for Itead S20 Wifi Smart Socket based
# on MicroPython for ESP8266 http://micropython.org/ to control the socket
# remotely by MQTT protocol.
#
# The program registers with MQTT broker, subscribes topic and waits for
# message with 'on' or 'off' payload. When message is received, the state
# of relay switch is set to on or off.
#
# If button is pressed on S20 Wifi Smart Socket, the program will publish
# 'on' or 'off' message to the broker.
#
from umqtt.simple import MQTTClient
import time
import json
import micropython
import os
# try to recognize micropython on unix and load stub version of machine
if hasattr(os, 'uname'):
from machine import Pin
else:
from machine_stub import Pin
def get_value(pin):
"""get debounced value from pin by waiting for 20 msec for stable value"""
cur_value = pin.value()
stable = 0
while stable < 20:
if pin.value() == cur_value:
stable = stable + 1
else:
stable = 0
cur_value = pin.value()
time.sleep_ms(1)
return cur_value
class SmartSocket(object):
def __init__(self, config):
self.config = config
self.relay = Pin(12, Pin.OUT)
self.button = Pin(0, Pin.IN)
self.button.irq(self.button_pressed)
self.effective_button_value = 1 # the button is active-low
self.mqtt = MQTTClient(client_id=self.config['mqtt']['client_id'],
server=self.config['mqtt']['server'],
user=self.config['mqtt']['user'],
password=self.config['mqtt']['password'])
self.mqtt.set_callback(self.mqtt_action)
def main_loop(self):
self.mqtt_connect()
while True:
try:
self.mqtt.wait_msg()
except Exception as ex:
print('Got excception {} retrying in 5 seconds'.format(ex))
time.sleep(5)
self.mqtt_connect()
def button_pressed(self, pin):
cur_button_value = get_value(pin)
if cur_button_value == 1 and self.effective_button_value == 0:
self.toggle_relay()
self.effective_button_value = cur_button_value
def toggle_relay(self):
if self.relay.value() == 1:
self.relay.off()
self.mqtt.publish(self.config['mqtt']['feed'], b'off')
else:
self.relay.on()
self.mqtt.publish(self.config['mqtt']['feed'], b'on')
def mqtt_connect(self):
self.mqtt.connect()
self.mqtt.subscribe(self.config['mqtt']['feed'])
def mqtt_action(self, topic, msg):
print('Received {}: {}'.format(topic, msg))
if msg == b'on':
self.relay.on()
elif msg == b'off':
self.relay.off()
elif msg == b'offon':
self.relay.off()
time.sleep(5)
self.relay.on()
def main():
#micropython.alloc_emergency_exception_buf(100)
with open('config.json') as f:
config = json.load(f)
s = SmartSocket(config)
s.main_loop()
if __name__ == '__main__':
main()
###########################################################################
#
# References
#
# * MicroPython libraries
# http://docs.micropython.org/en/latest/pyboard/library/index.html
#
# * umqtt
# https://github.com/micropython/micropython-lib/tree/master/umqtt.simple
#
#