-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexample.py
73 lines (63 loc) · 2.29 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
from time import time
from websocket_rooms import Room
from fastapi import Depends, FastAPI, WebSocket
from typing import Any, NoReturn
import logging
from fastapi.responses import HTMLResponse
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>Time websocket</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/current_time");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
app = FastAPI()
@app.get("/")
async def get():
return HTMLResponse(html)
time_room = Room()
@time_room.on_receive("text")
async def on_receive(room: Room, websocket: WebSocket, message: Any) -> None:
print("{}:{} just sent '{}'".format(websocket.client.host, websocket.client.port, message))
@time_room.on_connect("after")
async def on_chatroom_connection(room: Room, websocket: WebSocket) -> None:
print("{}:{} joined the channel".format(websocket.client.host, websocket.client.port))
@time_room.on_disconnect("after")
async def on_chatroom_disconnect(room: Room, websocket: WebSocket) -> None:
print("{}:{} left the channel".format(websocket.client.host, websocket.client.port))
async def updater_function(room: Room) -> NoReturn:
while True:
t = time()
await room.push_json({"current_time": t})
await asyncio.sleep(1)
updater_task = asyncio.create_task(updater_function(time_room))
@app.websocket("/current_time")
async def connect_websocket(websocket: WebSocket, room: Room = Depends(time_room)):
await room.connect(websocket)