-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadsafe.py
66 lines (52 loc) · 2.06 KB
/
threadsafe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# threadsafe_queue.py Provides ThreadsafeQueue class
# Copyright (c) 2022 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Uses pre-allocated ring buffer: can use list or array
# Asynchronous iterator allowing consumer to use async for
import asyncio
class ThreadSafeQueue: # MicroPython optimised
def __init__(self, buf):
self._q = [0 for _ in range(buf)] if isinstance(buf, int) else buf
self._size = len(self._q)
self._wi = 0
self._ri = 0
self._evput = asyncio.ThreadSafeFlag() # Triggered by put, tested by get
self._evget = asyncio.ThreadSafeFlag() # Triggered by get, tested by put
def full(self):
return ((self._wi + 1) % self._size) == self._ri
def empty(self):
return self._ri == self._wi
def qsize(self):
return (self._wi - self._ri) % self._size
def get_sync(self, block=False): # Remove and return an item from the queue.
if not block and self.empty():
raise IndexError # Not allowed to block
while self.empty(): # Block until an item appears
pass
r = self._q[self._ri]
self._ri = (self._ri + 1) % self._size
self._evget.set()
return r
def put_sync(self, v, block=False):
self._q[self._wi] = v
self._evput.set() # Schedule task waiting on get
if not block and self.full():
raise IndexError
while self.full():
pass # can't bump ._wi until an item is removed
self._wi = (self._wi + 1) % self._size
async def put(self, val): # Usage: await queue.put(item)
while self.full(): # Queue full
await self._evget.wait()
self.put_sync(val)
def __aiter__(self):
return self
async def __anext__(self):
return await self.get()
async def get(self):
while self.empty():
await self._evput.wait()
r = self._q[self._ri]
self._ri = (self._ri + 1) % self._size
self._evget.set() # Schedule task waiting on ._evget
return r