Skip to content

Commit

Permalink
Add keep connection for local stations
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Jan 21, 2021
1 parent 3d01241 commit 414f118
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions custom_components/yandex_station/core/yandex_glagol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import time
import uuid
from asyncio import Future
from asyncio import Future, Task
from typing import Callable, Optional, Dict

from aiohttp import ClientWebSocketResponse, WSMsgType, ClientConnectorError
Expand All @@ -21,13 +21,16 @@ class YandexGlagol:
url: Optional[str] = None
ws: Optional[ClientWebSocketResponse] = None

next_ping_ts = 0
keep_task: Task = None
update_handler: Callable = None

waiters: Dict[str, Future] = {}

def __init__(self, session: YandexSession, device: dict):
self.session = session
self.device = device
self.loop = asyncio.get_event_loop()

def debug(self, text: str):
_LOGGER.debug(f"{self.device['name']} | {text}")
Expand Down Expand Up @@ -81,18 +84,22 @@ async def _connect(self, fails: int):
try:
self.ws = await self.session.ws_connect(self.url, heartbeat=55,
ssl=False)
await self.ws.send_json({
'conversationToken': self.device_token,
'id': str(uuid.uuid4()),
'payload': {'command': 'ping'},
'sentTime': int(round(time.time() * 1000)),
})
await self.ping()

if not self.ws.closed:
fails = 0

if not self.keep_task or self.keep_task.done():
self.keep_task = self.loop.create_task(self._keep_connection())

async for msg in self.ws:
if msg.type == WSMsgType.TEXT:
# _LOGGER.debug("update")

# Большая станция в режиме idle шлёт статус раз в 5 секунд,
# в режиме playing шлёт чаще раза в 1 секунду
self.next_ping_ts = time.time() + 6

data = json.loads(msg.data)

response = None
Expand Down Expand Up @@ -153,6 +160,25 @@ async def _connect(self, fails: int):

asyncio.create_task(self._connect(fails))

async def _keep_connection(self):
_LOGGER.debug("Start keep connection task")
while not self.ws.closed:
await asyncio.sleep(1)
if time.time() > self.next_ping_ts:
await self.ping()

async def ping(self):
# _LOGGER.debug("ping")
try:
await self.ws.send_json({
'conversationToken': self.device_token,
'id': str(uuid.uuid4()),
'payload': {'command': 'ping'},
'sentTime': int(round(time.time() * 1000)),
})
except:
pass

async def send(self, payload: dict) -> Optional[dict]:
_LOGGER.debug(f"{self.name} => local | {payload}")

Expand All @@ -166,11 +192,13 @@ async def send(self, payload: dict) -> Optional[dict]:
'sentTime': int(round(time.time() * 1000)),
})

self.waiters[request_id] = asyncio.get_event_loop().create_future()
self.waiters[request_id] = self.loop.create_future()

# limit future wait time
await asyncio.wait_for(self.waiters[request_id], 5)

self.next_ping_ts = time.time() + 0.5

return self.waiters.pop(request_id).result()

except asyncio.TimeoutError:
Expand Down

0 comments on commit 414f118

Please sign in to comment.