From 414f118de2a0c0b31a74ee1a5295d55f17a25602 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Thu, 21 Jan 2021 14:13:39 +0300 Subject: [PATCH] Add keep connection for local stations --- .../yandex_station/core/yandex_glagol.py | 44 +++++++++++++++---- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/custom_components/yandex_station/core/yandex_glagol.py b/custom_components/yandex_station/core/yandex_glagol.py index 62b21d9..c45b730 100644 --- a/custom_components/yandex_station/core/yandex_glagol.py +++ b/custom_components/yandex_station/core/yandex_glagol.py @@ -4,7 +4,7 @@ import logging import time import uuid -from asyncio import Future +from asyncio import Future, Task from typing import Callable, Optional, Dict from aiohttp import ClientWebSocketResponse, WSMsgType, ClientConnectorError @@ -21,6 +21,8 @@ class YandexGlagol: url: Optional[str] = None ws: Optional[ClientWebSocketResponse] = None + next_ping_ts = 0 + keep_task: Task = None update_handler: Callable = None waiters: Dict[str, Future] = {} @@ -28,6 +30,7 @@ class YandexGlagol: def __init__(self, session: YandexSession, device: dict): self.session = session self.device = device + self.loop = asyncio.get_event_loop() def debug(self, text: str): _LOGGER.debug(f"{self.device['name']} | {text}") @@ -81,18 +84,22 @@ async def _connect(self, fails: int): try: self.ws = await self.session.ws_connect(self.url, heartbeat=55, ssl=False) - await self.ws.send_json({ - 'conversationToken': self.device_token, - 'id': str(uuid.uuid4()), - 'payload': {'command': 'ping'}, - 'sentTime': int(round(time.time() * 1000)), - }) + await self.ping() if not self.ws.closed: fails = 0 + if not self.keep_task or self.keep_task.done(): + self.keep_task = self.loop.create_task(self._keep_connection()) + async for msg in self.ws: if msg.type == WSMsgType.TEXT: + # _LOGGER.debug("update") + + # Большая станция в режиме idle шлёт статус раз в 5 секунд, + # в режиме playing шлёт чаще раза в 1 секунду + self.next_ping_ts = time.time() + 6 + data = json.loads(msg.data) response = None @@ -153,6 +160,25 @@ async def _connect(self, fails: int): asyncio.create_task(self._connect(fails)) + async def _keep_connection(self): + _LOGGER.debug("Start keep connection task") + while not self.ws.closed: + await asyncio.sleep(1) + if time.time() > self.next_ping_ts: + await self.ping() + + async def ping(self): + # _LOGGER.debug("ping") + try: + await self.ws.send_json({ + 'conversationToken': self.device_token, + 'id': str(uuid.uuid4()), + 'payload': {'command': 'ping'}, + 'sentTime': int(round(time.time() * 1000)), + }) + except: + pass + async def send(self, payload: dict) -> Optional[dict]: _LOGGER.debug(f"{self.name} => local | {payload}") @@ -166,11 +192,13 @@ async def send(self, payload: dict) -> Optional[dict]: 'sentTime': int(round(time.time() * 1000)), }) - self.waiters[request_id] = asyncio.get_event_loop().create_future() + self.waiters[request_id] = self.loop.create_future() # limit future wait time await asyncio.wait_for(self.waiters[request_id], 5) + self.next_ping_ts = time.time() + 0.5 + return self.waiters.pop(request_id).result() except asyncio.TimeoutError: