forked from niccokunzmann/open-web-calendar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conversion_base.py
67 lines (53 loc) · 2.31 KB
/
conversion_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from concurrent.futures import ThreadPoolExecutor
from threading import RLock
import requests
import sys
import traceback
import io
from icalendar import Calendar
def get_text_from_url(url):
"""Return the text from a url."""
return requests.get(url).text
class ConversionStrategy:
"""Base class for conversions."""
# TODO: add as parameters
MAXIMUM_THREADS = 100
def __init__(self, specification, get_text_from_url=get_text_from_url):
self.specification = specification
self.lock = RLock()
self.components = []
self.get_text_from_url = get_text_from_url
self.created()
def created(self):
"""Template method for subclasses."""
def error(self, ty, err, tb, url):
tb_s = io.StringIO()
traceback.print_exception(ty, err, tb, file=tb_s)
return self.convert_error(err, url, tb_s.getvalue())
def retrieve_calendars(self):
"""Retrieve the calendars from different sources."""
urls = self.specification["url"]
if isinstance(urls, str):
urls = [urls]
assert len(urls) <= self.MAXIMUM_THREADS, "You can only merge {} urls. If you like more, open an issue.".format(MAXIMUM_THREADS)
with ThreadPoolExecutor(max_workers=self.MAXIMUM_THREADS) as e:
for e in e.map(self.retrieve_calendar, urls):
pass # no error should pass silently; import this
def retrieve_calendar(self, url):
"""Retrieve a calendar from a url"""
try:
calendar_text = self.get_text_from_url(url)
calendars = Calendar.from_ical(calendar_text, multiple=True)
self.collect_components_from(calendars)
except:
ty, err, tb = sys.exc_info()
with self.lock:
self.components.append(self.error(ty, err, tb, url))
def collect_components_from(self, calendars):
"""Collect all the compenents from the calendar."""
raise NotImplementedError("to be implemented in subclasses")
def collect_calendar_information(self, calendars):
"""Collect additional information from the calendars."""
def merge(self):
"""Return the flask Response for the merged calendars."""
raise NotImplementedError("to be implemented in subclasses")