Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
Improved concurrency (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
spietras authored May 20, 2022
1 parent a037d5f commit 254e477
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
20 changes: 12 additions & 8 deletions theatre/src/theatre/server/connection.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from asyncio import AbstractEventLoop
from typing import List, Optional, Union

from aiortc import (
InvalidStateError,
RTCDataChannel,
RTCPeerConnection,
RTCSessionDescription,
Expand Down Expand Up @@ -61,13 +61,17 @@ def session(self) -> Session:
return Session(description=self._peer_connection.localDescription.sdp)

async def send(self, data: Union[str, bytes]) -> None:
for channel in self._channels:
try:
channel.send(data)
await channel.transport._data_channel_flush()
await channel.transport._transmit()
except (InvalidStateError, ConnectionError):
pass
async def send_to_channel(
data: Union[str, bytes], channel: RTCDataChannel
) -> None:
channel.send(data)
await channel.transport._data_channel_flush()
await channel.transport._transmit()

tasks = [send_to_channel(data, channel) for channel in self._channels]

if len(tasks) > 0:
await asyncio.gather(*tasks, return_exceptions=True)

async def close(self) -> None:
await self._peer_connection.close()
24 changes: 19 additions & 5 deletions theatre/src/theatre/server/room.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import random
import uuid
Expand Down Expand Up @@ -90,11 +91,24 @@ def create_user(self) -> User:
return User(id=user_id, avatar=avatar)

async def broadcast(self, user_id: str, data: Union[str, bytes]) -> None:
for connected_user_id in self._connected_users:
if connected_user_id != user_id:
user_info = self._users.get(connected_user_id)
if user_info is not None:
await user_info.connection.send(data)
async def send_to_user(
data: Union[str, bytes], user_info: UserInfo
) -> None:
await user_info.connection.send(data)

user_infos = [
self._users.get(connected_user_id)
for connected_user_id in self._connected_users
if connected_user_id != user_id
]
tasks = [
send_to_user(data, user_info)
for user_info in user_infos
if user_info is not None
]

if len(tasks) > 0:
await asyncio.gather(*tasks, return_exceptions=True)

async def handle_data(self, user_id: str, data: Union[str, bytes]) -> None:
try:
Expand Down

0 comments on commit 254e477

Please sign in to comment.