Skip to content

Commit

Permalink
Merge pull request #2022 from Drakkar-Software/dev
Browse files Browse the repository at this point in the history
Update master
  • Loading branch information
Herklos authored Sep 4, 2022
2 parents dcbd3a1 + 49997c6 commit 4783b4c
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 99 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ __pycache__/

# Distribution / packaging
.Python
.idea
.pytest_cache/
env/
build/
Expand Down Expand Up @@ -102,6 +101,8 @@ ENV/

# IDE
.vscode/
.idea
.gitpod.yml

# Tentacles manager temporary files
octobot/creator_temp/
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

*It is strongly advised to perform an update of your tentacles after updating OctoBot. (start.py tentacles --install --all)*

## [0.4.8] - 2022-09-04
### Fixed
- Device creation

## [0.4.7] - 2022-09-03
### Updated
- [Astrolab] Improvements and fixes

## [0.4.6] - 2022-08-23
### Added
- [Trading] Futures trading
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# OctoBot [0.4.6](https://octobot.click/gh-changelog)
# OctoBot [0.4.8](https://octobot.click/gh-changelog)
[![PyPI](https://img.shields.io/pypi/v/OctoBot.svg)](https://octobot.click/gh-pypi)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/e07fb190156d4efb8e7d07aaa5eff2e1)](https://app.codacy.com/gh/Drakkar-Software/OctoBot?utm_source=github.com&utm_medium=referral&utm_content=Drakkar-Software/OctoBot&utm_campaign=Badge_Grade_Dashboard)[![Downloads](https://pepy.tech/badge/octobot/month)](https://pepy.tech/project/octobot)
[![Dockerhub](https://img.shields.io/docker/pulls/drakkarsoftware/octobot.svg)](https://octobot.click/gh-dockerhub)
Expand Down
36 changes: 20 additions & 16 deletions octobot/community/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import base64
import contextlib
import json
import time

import requests
import aiohttp

Expand Down Expand Up @@ -55,7 +57,7 @@ def __init__(self, authentication_url, feed_url, config=None):
self._aiohttp_session = None
self._cache = {}
self._fetch_account_task = None
self._fetch_device_uuid_task = None
self._restart_task = None
self._community_feed = None
self._login_completed = None

Expand Down Expand Up @@ -100,7 +102,7 @@ def _create_community_feed_if_necessary(self) -> bool:

async def _ensure_init_community_feed(self):
self._create_community_feed_if_necessary()
if not self._community_feed.is_connected():
if not self._community_feed.is_connected() and self._community_feed.can_connect():
if self.initialized_event is not None and not self.initialized_event.is_set():
await asyncio.wait_for(self.initialized_event.wait(), self.LOGIN_TIMEOUT)
if not self.is_logged_in():
Expand Down Expand Up @@ -134,10 +136,13 @@ async def async_graphql_query(self, query, query_name, variables=None, operation
expected_code=None, allow_retry_on_expired_token=True):
try:
async with self._authenticated_qgl_session() as session:
t0 = time.time()
self.logger.debug(f"starting {query_name} graphql query")
resp = await session.post(
f"{identifiers_provider.IdentifiersProvider.GQL_BACKEND_API_URL}",
json=self._build_gql_request_body(query, variables, operation_name)
)
self.logger.debug(f"graphql query {query_name} done in {time.time() - t0} seconds")
if resp.status == 401:
# access token expired
raise authentication.AuthenticationRequired
Expand Down Expand Up @@ -254,7 +259,7 @@ async def on_new_device_select(self):
await self._update_feed_device_uuid()

async def _fetch_devices(self):
query, variables = graphql_requests.select_devices(self.user_account.gql_user_id)
query, variables = graphql_requests.select_devices()
return await self.async_graphql_query(query, "devices", variables=variables, expected_code=200)

async def fetch_device(self, device_id):
Expand All @@ -263,21 +268,20 @@ async def fetch_device(self, device_id):

async def create_new_device(self):
await self.gql_login_if_required()
query, variables = graphql_requests.create_new_device_query(self.user_account.gql_user_id)
return await self.async_graphql_query(query, "insertOneDevice", variables=variables, expected_code=200)
query, variables = graphql_requests.create_new_device_query()
return await self.async_graphql_query(query, "createDevice", variables=variables, expected_code=200)

async def _update_feed_device_uuid(self):
self._create_community_feed_if_necessary()
if self._community_feed.associated_gql_device_id != self.user_account.gql_device_id:
# only device id changed, need to refresh uuid. Otherwise it means that no feed was started with a
# only device id changed, need to refresh uuid. Otherwise, it means that no feed was started with a
# different uuid, no need to update
# reset _fetch_device_uuid_task if running
if self._fetch_device_uuid_task is not None and not self._fetch_device_uuid_task.done():
self._fetch_device_uuid_task.cancel()
# reset restart task if running
if self._restart_task is not None and not self._restart_task.done():
self._restart_task.cancel()
self._community_feed.remove_device_details()
task = self._community_feed.restart if self._community_feed.is_connected() \
else self._community_feed.fetch_mqtt_device_uuid
self._fetch_device_uuid_task = asyncio.create_task(task())
if self._community_feed.is_connected() or not self._community_feed.can_connect():
self._restart_task = asyncio.create_task(self._community_feed.restart())

def logout(self):
"""
Expand All @@ -287,10 +291,10 @@ def logout(self):
self._reset_tokens()
self.clear_cache()
self.remove_login_detail()
for task in (self._fetch_device_uuid_task, self._fetch_account_task):
for task in (self._restart_task, self._fetch_account_task):
if task is not None and not task.done():
task.cancel()
self._fetch_device_uuid_task = self._fetch_account_task = None
self._restart_task = self._fetch_account_task = None
self._create_community_feed_if_necessary()
self._community_feed.remove_device_details()

Expand Down Expand Up @@ -349,8 +353,8 @@ def get_aiohttp_session(self):
async def stop(self):
if self.is_initialized():
self._fetch_account_task.cancel()
if self._fetch_device_uuid_task is not None and not self._fetch_device_uuid_task.done():
self._fetch_device_uuid_task.cancel()
if self._restart_task is not None and not self._restart_task.done():
self._restart_task.cancel()
if self._aiohttp_session is not None:
await self._aiohttp_session.close()

Expand Down
5 changes: 5 additions & 0 deletions octobot/community/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import octobot_commons.authentication as commons_authentication


class RequestError(Exception):
Expand All @@ -21,3 +22,7 @@ class RequestError(Exception):

class StatusCodeRequestError(RequestError):
pass


class DeviceError(commons_authentication.UnavailableError):
pass
3 changes: 3 additions & 0 deletions octobot/community/feeds/abstract_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ async def register_feed_callback(self, channel_type, callback, identifier=None):

async def send(self, message, channel_type, identifier, **kwargs):
raise NotImplementedError("send is not implemented")

def can_connect(self):
return True
99 changes: 44 additions & 55 deletions octobot/community/feeds/community_mqtt_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,42 @@ def __init__(self, feed_url, authenticator):
self.associated_gql_device_id = None

self._mqtt_client: gmqtt.Client = None
self._valid_auth = True
self._device_uuid: str = None
self._fetching_uuid = False
self._subscription_attempts = 0
self._fetched_uuid = asyncio.Event()
self._subscription_topics = set()
self._reconnect_task = None
self._connect_task = None
self._connected_at_least_once = False
self._processed_messages = set()

async def start(self):
self.should_stop = False
await self.fetch_mqtt_device_uuid()
self._device_uuid = self.authenticator.user_account.get_selected_device_uuid()
await self._connect()

async def stop(self):
self.should_stop = True
await self._stop_mqtt_client()
if self._reconnect_task is not None and not self._reconnect_task.done():
self._reconnect_task.cancel()
if self._connect_task is not None and not self._connect_task.done():
self._connect_task.cancel()
self._reset()

async def restart(self):
await self.stop()
await self.start()
try:
await self.stop()
await self.start()
except Exception as e:
self.logger.exception(e, True, f"Error when restarting mqtt feed: {e}")

def _reset(self):
self._connected_at_least_once = False
self._subscription_attempts = 0
self._subscription_topics = set()
self._connect_task = None
self._valid_auth = True

async def _stop_mqtt_client(self):
if self.is_connected():
Expand All @@ -90,6 +99,9 @@ async def _stop_mqtt_client(self):
def is_connected(self):
return self._mqtt_client is not None and self._mqtt_client.is_connected

def can_connect(self):
return self._valid_auth

async def register_feed_callback(self, channel_type, callback, identifier=None):
topic = self._build_topic(channel_type, identifier)
try:
Expand All @@ -98,57 +110,14 @@ async def register_feed_callback(self, channel_type, callback, identifier=None):
self.feed_callbacks[topic] = [callback]
if topic not in self._subscription_topics:
self._subscription_topics.add(topic)
self._subscribe((topic, ))

async def fetch_mqtt_device_uuid(self):
if self._fetching_uuid:
self.logger.info(f"Waiting for feed UUID fetching")
await asyncio.wait_for(self._fetched_uuid.wait(), self.DEVICE_CREATE_TIMEOUT + 2)
else:
await self._fetch_mqtt_device_uuid()
if self._valid_auth:
self._subscribe((topic, ))
else:
self.logger.error(f"Can't subscribe to {channel_type.name} feed, invalid authentication")

def remove_device_details(self):
self._fetched_uuid.clear()
self._device_uuid = None

async def _fetch_mqtt_device_uuid(self):
try:
self._fetching_uuid = True
if device_uuid := self.authenticator.user_account.get_selected_device_uuid():
self._device_uuid = device_uuid
self.logger.debug("Using fetched mqtt device id")
else:
await self._poll_mqtt_device_uuid()
self.logger.debug("Successfully waited for mqtt device id")
except Exception as e:
self.logger.exception(e, True, f"Error when fetching device id: {e}")
raise
finally:
self._fetching_uuid = False
self._fetched_uuid.set()

async def _poll_mqtt_device_uuid(self):
t0 = time.time()
while time.time() - t0 < self.DEVICE_CREATE_TIMEOUT:
# loop until the device uuid is available
# used when a new gql device is created its uuid is not instantly filled
try:
device_data = await self.authenticator.fetch_device(self.authenticator.user_account.gql_device_id)
if device_data is None:
raise errors.RequestError(f"Error when fetching mqtt device uuid: can't find content with id: "
f"{self.authenticator.user_account.gql_device_id}")
elif device_uuid := device_data["uuid"]:
self._device_uuid = device_uuid
return
# retry soon
await asyncio.sleep(self.DEVICE_CREATION_REFRESH_DELAY)
# should never happen unless there is a real issue
except errors.RequestError:
raise
raise errors.RequestError(
f"Timeout when fetching mqtt device uuid: no uuid to be found after {self.DEVICE_CREATE_TIMEOUT} seconds"
)

@staticmethod
def _build_topic(channel_type, identifier):
return f"{channel_type.value}/{identifier}"
Expand Down Expand Up @@ -180,6 +149,9 @@ def _should_process(self, parsed_message):
return True

async def send(self, message, channel_type, identifier, **kwargs):
if not self._valid_auth:
self.logger.warning(f"Can't send {channel_type.name}, invalid feed authentication.")
return
topic = self._build_topic(channel_type, identifier)
self.logger.debug(f"Sending message on topic: {topic}, message: {message}")
self._mqtt_client.publish(
Expand Down Expand Up @@ -241,8 +213,12 @@ async def _reconnect(self, client):
await asyncio.sleep(delay)

def _on_disconnect(self, client, packet, exc=None):
self.logger.info(f"Disconnected, client_id: {client._client_id}")
self._try_reconnect_if_necessary(client)
if self._connected_at_least_once:
self.logger.info(f"Disconnected, client_id: {client._client_id}")
self._try_reconnect_if_necessary(client)
else:
if self._connect_task is not None and not self._connect_task.done():
self._connect_task.cancel()

def _on_subscribe(self, client, mid, qos, properties):
# from https://github.com/wialon/gmqtt/blob/master/examples/resubscription.py#L28
Expand Down Expand Up @@ -282,12 +258,25 @@ def _update_client_config(self, client):
client.set_config(default_config)

async def _connect(self):
if self._device_uuid is None:
self._valid_auth = False
raise errors.DeviceError("mqtt device uuid is None, impossible to connect client")
self._mqtt_client = gmqtt.Client(self.__class__.__name__)
self._update_client_config(self._mqtt_client)
self._register_callbacks(self._mqtt_client)
self._mqtt_client.set_auth_credentials(self._device_uuid, None)
self.logger.debug(f"Connecting client")
await self._mqtt_client.connect(self.feed_url, self.mqtt_broker_port, version=self.MQTT_VERSION)
self._connect_task = asyncio.create_task(
self._mqtt_client.connect(self.feed_url, self.mqtt_broker_port, version=self.MQTT_VERSION)
)
try:
await self._connect_task
self._connected_at_least_once = True
except asyncio.CancelledError:
# got cancelled by on_disconnect, can't connect
self.logger.error(f"Can't connect to server, please check your device uuid. "
f"Current mqtt uuid is: {self._device_uuid}")
self._valid_auth = False

def _subscribe(self, topics):
if not topics:
Expand Down
19 changes: 11 additions & 8 deletions octobot/community/graphql_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.


def create_new_device_query(user_id) -> (str, dict):
def create_new_device_query() -> (str, dict):
return """
mutation CreateDevice($user_id: ObjectId) {
insertOneDevice(data: {user_id: $user_id}) {
mutation CreateDevice {
createDevice {
_id
name
user_id
uuid
}
}
""", {"user_id": user_id}
""", {}


def select_device(device_id) -> (str, dict):
Expand All @@ -37,13 +40,13 @@ def select_device(device_id) -> (str, dict):
""", {"_id": device_id}


def select_devices(user_id) -> (str, dict):
def select_devices() -> (str, dict):
return """
query SelectDevices($user_id: ObjectId) {
devices(query: {user_id: $user_id}) {
query SelectDevices {
devices {
_id
uuid
name
}
}
""", {"user_id": user_id}
""", {}
Loading

0 comments on commit 4783b4c

Please sign in to comment.