-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelper_functions.py
307 lines (271 loc) · 9.11 KB
/
helper_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
"""
A collection of helper functions used in Kraken.
"""
from __future__ import annotations
import asyncio
import inspect
from functools import partial
from typing import Any, AsyncContextManager, AsyncGenerator, AsyncIterable, Awaitable, Callable, Type, TypedDict, cast
from aiostream import operator, pipable_operator, stream, streamcontext
from aiostream.core import Stream
from async_event_bus import TopicNotRegisteredError, event_bus
from errors import ConfigurationError
class FunctionCallConfig(TypedDict):
"""This is the datatype for a function call on a device as it is stored in the database."""
function: str
args: list | tuple
kwargs: dict
timeout: float
async def cancel_all_tasks(tasks: set[asyncio.Task]) -> None:
"""
Cancels all tasks and waits for them to finish. It then tests the results
for exceptions (except asyncio.CancelledError) and raises the first one found.
Parameters
----------
tasks: Iterable
The tasks to be cancelled and awaited
"""
for task in tasks:
if not task.done():
task.cancel()
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
# Check for exceptions, but ignore asyncio.CancelledError, which inherits from BaseException not Exception
if isinstance(result, Exception):
raise result
async def iterate_safely(topic: str, status_topic: str, *args: Any, **kwargs: Any) -> AsyncGenerator[Any, None]:
"""
Iterate over a topic on the eventbus. If the topic is not yet registered, register at the status topic and wait for
the data source to become available.
Parameters
----------
topic: str
The data source topic
status_topic:
The data source status update topic, that will be listened to, if the source is not available
*args; Any
The arguments passed to the eventbus topic
**kwargs: Any
The keyword arguments passed to the eventbus topic
Yields
-------
Any
The data returned from the subscription
"""
while "database not ready":
try:
gen = await event_bus.call(topic, *args, **kwargs)
except NameError:
# The database is not yet ready, wait for it
status: bool # TODO: Replace with proper event
async for status in event_bus.subscribe(status_topic):
if status:
break
continue
else:
async for item in gen:
yield item
break
async def call_safely(topic: str, status_topic: str, *args: Any, **kwargs: Any) -> Any:
"""
Call a topic on the eventbus. If the topic is not yet registered, register at the status topic and wait for
the data source to become available.
Parameters
----------
topic: str
The data source topic
status_topic:
The data source status update topic, that will be listened to, if the source is not available
*args; Any
The arguments passed to the eventbus topic
**kwargs: Any
The keyword arguments passed to the eventbus topic
Returns
-------
Any
The result of the event_bus function call.
"""
while "database not ready":
try:
result = await event_bus.call(topic, *args, **kwargs)
except TopicNotRegisteredError:
# The database is not yet ready, wait for it
status: bool # TODO: Replace with proper event
async for status in event_bus.subscribe(status_topic):
if status:
break
continue
else:
return result
@pipable_operator
async def retry(
source: AsyncIterable[Any], exc_class: Type[BaseException], interval: float = 0
) -> AsyncGenerator[Any, None]:
"""
Retry a datastream if the exception `exc_class` is thrown.
Parameters
----------
source: AsyncIterable
exc_class: BaseException type
The exception class to catch
interval: float
The time in seconds to wait between retries
Yield
-------
Any
The results from the stream
"""
timeout: float = 0
loop = asyncio.get_event_loop()
while True:
try:
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
except exc_class:
delay = timeout - loop.time()
await asyncio.sleep(delay)
timeout = loop.time() + interval
continue
else:
return
@pipable_operator
async def context(
source: AsyncIterable[Any],
context_manager: AsyncContextManager,
on_enter: Callable[[], Any] | None = None,
on_exit: Callable[[Exception | None], Any] | None = None,
) -> AsyncGenerator[Any, None]:
"""
Iterate a stream within a context. The on_enter and on_exit callbacks can be used to log the status of the stream.
Parameters
----------
source: AsyncIterable
context_manager: AsyncContextManager
The asynchronous context manager that needs to be entered
on_enter: Callable
A function to be called once the context has been entered
on_exit: Callable[int]
A function to be called once the context is left. The parameter is the exit_code. 0 is OK, 1 is an error.
Yields
-------
Any
The results from the data stream
"""
async with context_manager:
async with streamcontext(source) as streamer:
exit_code = None
try:
if on_enter is not None:
on_enter()
async for item in streamer:
yield item
except Exception as exc:
exit_code = exc
raise
finally:
if on_exit is not None:
on_exit(exit_code)
@operator
async def with_context(
context_manager: AsyncContextManager, on_exit: Callable[[], Any] | None = None
) -> AsyncGenerator[Any, None]:
"""
Enters the context and yields the context.
Parameters
----------
context_manager: AsyncContextManager
The context manager to enter
on_exit: Callable
A callback function to call, when exiting the context
Yields
-------
Any
The context
"""
try:
async with context_manager as ctx:
yield ctx
future: asyncio.Future = asyncio.Future()
try:
await future
finally:
future.cancel()
finally:
if on_exit is not None:
on_exit()
@pipable_operator
async def finally_action(
source: AsyncIterable[Any], func: Awaitable[Any] | Callable[[], Any]
) -> AsyncGenerator[Any, None]:
"""
Wrap a try/finally around a stream context.
Parameters
----------
source: AsyncIterable
func: Awaitable or Callable
The function to be called when the entering the finally-block.
Yields
-------
Any
The results from the source stream
"""
try:
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
finally:
if inspect.isawaitable(func):
await func
else:
cast(Callable, func)()
@pipable_operator
async def catch(
source: AsyncIterable[Any], exc_class: Type[BaseException], on_exc: Callable[[BaseException], Stream] | None = None
) -> AsyncGenerator[Any, None]:
"""
Catch an exception and then switch to the next stream `on_exc` or gracefully terminate, when no stream is given.
Parameters
----------
source: AsyncIterable
exc_class: BaseException type
The exception to catch
on_exc: Callable
A function, that takes an exception and returns a Stream.
Yields
-------
Any
The results from the source stream or the `on_exc` stream.
"""
try:
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
except exc_class as exc:
if on_exc is not None:
async with on_exc(exc).stream() as streamer:
async for item in streamer:
yield item
else:
yield stream.empty()
def create_device_function(device: Any, func_call: FunctionCallConfig) -> tuple[partial, float]:
"""
Creates a partial function from the function call with the parameters given and returns it
Parameters
----------
device: Any
func_call: dict
a dictionary containing the function call as string and the optional args and kwargs parameters
Returns
-------
tuple of partial and float
The function call and the timeout
"""
try:
function = getattr(device, func_call["function"])
# Create a partial function, that freezes the parameters and can be called later
func = partial(function, *func_call.get("args", []), **func_call.get("kwargs", {}))
timeout = func_call["timeout"]
except AttributeError:
raise ConfigurationError(f"Function '{func_call['function']}' not found") from None
return func, timeout