Replies: 1 comment 4 replies
-
@vanker-x the Here is how I would personally implement the above code (added json just for the sake of routing messages): import asyncio
import uuid
from emmett import App, Pipe, websocket
from emmett.tools import service
app = App(__name__)
class Broadcast:
def __init__(self):
self.consumers = {}
def new_consumer(self):
consumer = Consumer()
self.consumers[consumer.uuid] = consumer
return consumer
def del_consumer(self, consumer):
self.consumers.pop(consumer.uuid, None)
def send(self, message):
for consumer in self.consumers.values():
consumer.push(message)
class Consumer:
def __init__(self):
self.uuid = uuid.uuid4()
self.queue = asyncio.Queue()
def push(self, message):
self.queue.put_nowait(message)
async def consume(self):
while message := await self.queue.get():
yield message
class ConsumerPipe(Pipe):
def __init__(self) -> None:
self.broadcast = Broadcast()
async def open(self):
websocket.broadcast = self.broadcast
websocket.consumer = self.broadcast.new_consumer()
async def close(self):
self.broadcast.del_consumer(websocket.consumer)
@app.websocket("/", pipeline=[ConsumerPipe()])
@service.json
async def echo():
await websocket.accept()
async def channel():
async for message in websocket.consumer.consume():
await websocket.send({"type": "broadcast", "data": message})
async def echo_loop():
while True:
message = await websocket.receive()
if message["type"] == "broadcast":
websocket.broadcast.send(message["data"])
continue
await websocket.send(message)
await asyncio.gather(echo_loop(), channel()) and this is a quick test code for it: import asyncio
import json
import websockets
async def loop(cli_no):
async with websockets.connect(
'ws://localhost:8000/'
) as websocket:
async def recv():
async for message in websocket:
print(f"recv @ cli {cli_no}: {message}")
async def send():
while True:
await websocket.send(json.dumps({"type": "message", "data": "foo"}))
await websocket.send(json.dumps({"type": "broadcast", "data": f"foo from {cli_no}"}))
await asyncio.sleep(0.2)
await asyncio.gather(send(), recv())
if __name__ == "__main__":
async def main():
await asyncio.gather(loop(1), loop(2))
asyncio.run(main()) Please mind that this implementation only works with a single Emmett worker; for production usage you should add some external service like redis to broadcast messages between clients connected to different workers. |
Beta Was this translation helpful? Give feedback.
4 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello,
I have a problem when using emmett when I tried to add websocket object to the list,
the new connected websocket object would overwrite the original websocket object.
Have you considered explicitly transferring websocket object to avoid this problem?
Beta Was this translation helpful? Give feedback.
All reactions