-
Notifications
You must be signed in to change notification settings - Fork 3
/
time_controller.py
110 lines (90 loc) · 4.48 KB
/
time_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import httplib
import json
import logging
import time
from datetime import datetime, timedelta
class TimeController:
def __init__(self, config):
self._config = config
self._current_date = None
self._sunrise = None
self._sunset = None
fetched_location = False
while not fetched_location:
try:
self._fetch_location()
fetched_location = True
except Exception as e:
logging.exception("Can't get location")
logging.info("Sleeping 60 seconds...")
time.sleep(60)
def _fetch_location(self):
conn = httplib.HTTPSConnection("freegeoip.io", timeout=60)
conn.request("GET", "/json/")
r1 = conn.getresponse()
if r1.status != 200:
raise Exception("Not 200 status in freegeoip API: " + str(r1.status))
data1 = r1.read()
decoded_json = json.loads(data1)
if decoded_json is None:
raise Exception("Can't decode freegeoip response as JSON: " + str(data1))
self._latitude = decoded_json['latitude']
self._longitude = decoded_json['longitude']
logging.info("Fetched device position: (" + str(self._latitude) + ", " + str(self._longitude) + ")")
def decode_utc_time(self, date_str, time_str):
t = datetime.strptime(date_str + ' ' + time_str, "%Y-%m-%d %I:%M:%S %p")
tz_offset = -time.timezone
local_time = time.localtime()
if local_time.tm_isdst == 1:
tz_offset += 3600
res = t + timedelta(seconds=tz_offset)
return res
# I know that it could be calculated without using any external APIs
# but those existing python packages that are already implemented
# could not be easily installed into OpenELEC because of broken dependency install
# system. And I'm too lazy to implement this maths on my own :)
def get_sunrise_sunset(self, date_str):
conn = httplib.HTTPConnection("api.sunrise-sunset.org", timeout=60)
conn.request("GET", "/json?lat=" + str(self._latitude) + "&lng=" + str(self._longitude) + "&date=" + date_str)
r1 = conn.getresponse()
if r1.status != 200:
raise Exception("Not 200 status in sunrise API: " + str(r1.status))
data1 = r1.read()
decoded_json = json.loads(data1)
if decoded_json is None or decoded_json['status'] != "OK":
raise Exception("Not OK status in decoded JSON of sunrise API: " + str(data1))
self._sunrise = self.decode_utc_time(date_str, decoded_json['results']['sunrise'])
self._sunset = self.decode_utc_time(date_str, decoded_json['results']['sunset'])
def dispatch(self):
current_time = datetime.now()
current_date = current_time.strftime("%Y-%m-%d")
if self._current_date is None or self._current_date != current_date:
logging.info("New date: " + current_date)
self._current_date = current_date
successful_response = False
while not successful_response:
try:
self.get_sunrise_sunset(current_date)
successful_response = True
except Exception as e:
logging.exception("Can't get sunrise/sunset")
logging.info("Sleeping 60 seconds...")
time.sleep(60)
logging.info("Today's sunrise: " + str(self._sunrise))
logging.info("Today's sunset: " + str(self._sunset))
current_date = datetime.now()
lower_bound = current_date.replace(hour=self._config.EARLIEST_ENABLE_HOUR,
minute=self._config.EARLIEST_ENABLE_MINUTE,
second=0)
upper_bound = current_date.replace(hour=self._config.LATEST_DISABLE_HOUR,
minute=self._config.LATEST_DISABLE_MINUTE,
second=0)
logging.info("Lower bound: " + str(lower_bound))
logging.info("Upper bound: " + str(upper_bound))
if self._sunrise < lower_bound:
self._sunrise = lower_bound
logging.info("Sunrise is before lower bound, using lower bound")
if self._sunset > upper_bound:
self._sunset = upper_bound
logging.info("Sunset is after upper bound, using upper bound")
return self._sunrise < current_time < self._sunset