Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev merge #2772

Merged
merged 15 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions octobot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def _get_authenticated_community_if_possible(config, logger):
# switch environments if necessary
octobot_community.IdentifiersProvider.use_environment_from_config(config)
community_auth = octobot_community.CommunityAuthentication.create(config)
community_auth.clear_local_data_if_necessary()
try:
if not community_auth.is_initialized():
if constants.IS_CLOUD_ENV:
Expand Down
152 changes: 108 additions & 44 deletions octobot/community/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import json
import time
import typing
import hashlib
import os

import octobot.constants as constants
import octobot.enums as enums
import octobot.community.errors as errors
import octobot.community.identifiers_provider as identifiers_provider
import octobot.community.models.community_supports as community_supports
Expand Down Expand Up @@ -180,6 +183,11 @@ def get_user_id(self):
raise authentication.AuthenticationRequired()
return self.user_account.get_user_id()

def get_last_email_address_confirm_code_email_content(self) -> typing.Optional[str]:
if not self.user_account.has_user_data():
raise authentication.AuthenticationRequired()
return self.user_account.last_email_address_confirm_code_email_content

async def get_deployment_url(self):
deployment_url_data = await self.supabase_client.fetch_deployment_url(
self.user_account.get_selected_bot_deployment_id()
Expand Down Expand Up @@ -291,12 +299,15 @@ async def _create_community_feed_if_necessary(self) -> bool:
return True
return False

async def _ensure_init_community_feed(self):
async def _ensure_init_community_feed(
self,
stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]=None
):
await self._create_community_feed_if_necessary()
if not self._community_feed.is_connected() and self._community_feed.can_connect():
if self.initialized_event is not None and not self.initialized_event.is_set():
await asyncio.wait_for(self.initialized_event.wait(), self.LOGIN_TIMEOUT)
await self._community_feed.start()
await self._community_feed.start(stop_on_cfg_action)

async def register_feed_callback(self, channel_type: commons_enums.CommunityChannelTypes, callback, identifier=None):
try:
Expand All @@ -305,6 +316,14 @@ async def register_feed_callback(self, channel_type: commons_enums.CommunityChan
except errors.BotError as e:
self.logger.error(f"Impossible to connect to community signals: {e}")

async def trigger_wait_for_email_address_confirm_code_email(self):
if not self.get_owned_packages():
raise errors.ExtensionRequiredError(
f"The {constants.OCTOBOT_EXTENSION_PACKAGE_1_NAME} is required to use TradingView email alerts"
)
await self._ensure_init_community_feed(enums.CommunityConfigurationActions.EMAIL_CONFIRM_CODE)


async def send(self, message, channel_type, identifier=None):
"""
Sends a message
Expand Down Expand Up @@ -505,9 +524,48 @@ async def has_login_info(self):
def remove_login_detail(self):
self.user_account.flush()
self._reset_login_token()
# force user to (re)select a bot
self._save_bot_id("")
# mqtt feed can't connect as long as the user is not authenticated: don't display unusable email address
self.save_tradingview_email("")
self.logger.debug("Removed community login data")

def _clear_bot_scoped_config(self):
"""
Clears all bot local data including mqtt id, which will trigger a new mqtt device creation.
Warning: should only be called in rare cases, mostly to avoid multi connection on the same mqtt
device
"""
self.logger.info(
"Clearing bot local scoped config data. Your TradingView alert email address "
"and webhook url will be different on this bot."
)
self._save_bot_id("")
self.save_tradingview_email("")
# also reset mqtt id to force a new mqtt id creation
self._save_mqtt_device_uuid("")
# will force reconfiguring the next email
self.save_tradingview_email_confirmed(False)

def clear_local_data_if_necessary(self):
if constants.IS_CLOUD_ENV:
# disabled on cloud environments
return
previous_local_identifier = self._get_saved_bot_scoped_data_identifier()
current_local_identifier = self._get_bot_scoped_data_identifier()
if not previous_local_identifier:
self._save_bot_scoped_data_identifier(current_local_identifier)
# nothing to clear
return
if current_local_identifier != previous_local_identifier:
self._clear_bot_scoped_config()
self._save_bot_scoped_data_identifier(current_local_identifier)

def _get_bot_scoped_data_identifier(self) -> str:
# identifier is based on the path to the local bot to ensure the same data are not re-used
# when copy/pasting a bot config to another bot
return hashlib.sha256(os.getcwd().encode()).hexdigest()

async def stop(self):
self.logger.debug("Stopping ...")
if self._fetch_account_task is not None and not self._fetch_account_task.done():
Expand Down Expand Up @@ -594,6 +652,9 @@ async def fetch_bot_tentacles_data_based_config(

async def fetch_private_data(self, reset=False):
try:
if not self.is_logged_in():
self.logger.info(f"Can't fetch private data: no authenticated user")
return
mqtt_uuid = None
try:
mqtt_uuid = self.get_saved_mqtt_device_uuid()
Expand All @@ -603,7 +664,9 @@ async def fetch_private_data(self, reset=False):
self.logger.info("Community extension check is disabled")
elif reset or (not self.user_account.community_package_urls or not mqtt_uuid):
self.successfully_fetched_tentacles_package_urls = False
packages, package_urls, fetched_mqtt_uuid = await self._fetch_package_urls(mqtt_uuid)
packages, package_urls, fetched_mqtt_uuid, tradingview_email = (
await self._fetch_extensions_details(mqtt_uuid)
)
self.successfully_fetched_tentacles_package_urls = True
self.user_account.owned_packages = packages
self.save_installed_package_urls(package_urls)
Expand All @@ -616,7 +679,9 @@ async def fetch_private_data(self, reset=False):
self.logger.info(f"New tentacles are available for installation")
self.user_account.has_pending_packages_to_install = True
if fetched_mqtt_uuid and fetched_mqtt_uuid != mqtt_uuid:
self.save_mqtt_device_uuid(fetched_mqtt_uuid)
self._save_mqtt_device_uuid(fetched_mqtt_uuid)
if tradingview_email and tradingview_email != self.get_saved_tradingview_email():
self.save_tradingview_email(tradingview_email)
except Exception as err:
self.logger.exception(err, True, f"Unexpected error when fetching package urls: {err}")
finally:
Expand All @@ -627,56 +692,37 @@ async def fetch_private_data(self, reset=False):
# fetch indexes as well
await self._refresh_products()

async def _fetch_package_urls(self, mqtt_uuid: typing.Optional[str]) -> (list[str], str):
self.logger.debug(f"Fetching package")
resp = await self.supabase_client.http_get(
constants.COMMUNITY_EXTENSIONS_CHECK_ENDPOINT,
headers={
"Content-Type": "application/json",
"X-Auth-Token": constants.COMMUNITY_EXTENSIONS_CHECK_ENDPOINT_KEY
},
params={"mqtt_id": mqtt_uuid} if mqtt_uuid else {},
timeout=constants.COMMUNITY_FETCH_TIMEOUT
)
self.logger.debug("Fetched package")
resp.raise_for_status()
json_resp = json.loads(resp.json().get("message", {}))
if not json_resp:
return None, None, None
async def _fetch_extensions_details(self, mqtt_uuid: typing.Optional[str]) -> (list[str], list[str], str, str):
self.logger.debug(f"Fetching extension package details")
extensions_details = await self.supabase_client.fetch_extensions(mqtt_uuid)
self.logger.debug("Fetched extension package details")
if not extensions_details:
return None, None, None, None
packages = [
package
for package in json_resp["paid_package_slugs"]
for package in extensions_details["paid_package_slugs"]
if package
]
urls = [
url
for url in json_resp["package_urls"]
for url in extensions_details["package_urls"]
if url
]
mqtt_id = json_resp["mqtt_id"]
return packages, urls, mqtt_id
mqtt_id = extensions_details["mqtt_id"]
tradingview_email = extensions_details["tradingview_email"]
return packages, urls, mqtt_id, tradingview_email

async def fetch_checkout_url(self, payment_method, redirect_url):
async def fetch_checkout_url(self, payment_method: str, redirect_url: str):
try:
if not self.is_logged_in():
self.logger.info(f"Can't fetch checkout url: no authenticated user")
return None
self.logger.debug(f"Fetching {payment_method} checkout url")
resp = await self.supabase_client.http_post(
constants.COMMUNITY_EXTENSIONS_CHECK_ENDPOINT,
json={
"payment_method": payment_method,
"success_url": redirect_url,
},
headers={
"Content-Type": "application/json",
"X-Auth-Token": constants.COMMUNITY_EXTENSIONS_CHECK_ENDPOINT_KEY
},
timeout=constants.COMMUNITY_FETCH_TIMEOUT
)
resp.raise_for_status()
json_resp = json.loads(resp.json().get("message", {}))
if not json_resp:
url_details = await self.supabase_client.fetch_checkout_url(payment_method, redirect_url)
if not url_details:
# valid error code but no content: user already has this product
return None
url = json_resp["checkout_url"]
url = url_details["checkout_url"]
self.logger.info(
f"Here is your {constants.OCTOBOT_EXTENSION_PACKAGE_1_NAME} checkout url {url} "
f"paste it into a web browser to proceed to payment if your browser did to automatically "
Expand All @@ -697,21 +743,39 @@ def _reset_login_token(self):
def save_installed_package_urls(self, package_urls: list[str]):
self._save_value_in_config(constants.CONFIG_COMMUNITY_PACKAGE_URLS, package_urls)

def save_mqtt_device_uuid(self, mqtt_uuid):
def save_tradingview_email(self, tradingview_email: str):
self._save_value_in_config(constants.CONFIG_COMMUNITY_TRADINGVIEW_EMAIL, tradingview_email)

def save_tradingview_email_confirmed(self, confirmed: bool):
self._save_value_in_config(constants.CONFIG_COMMUNITY_TRADINGVIEW_EMAIL_CONFIRMED, confirmed)

def _save_mqtt_device_uuid(self, mqtt_uuid: str):
self._save_value_in_config(constants.CONFIG_COMMUNITY_MQTT_UUID, mqtt_uuid)

def _save_bot_scoped_data_identifier(self, identifier: str):
self._save_value_in_config(constants.CONFIG_COMMUNITY_LOCAL_DATA_IDENTIFIER, identifier)

def get_saved_package_urls(self) -> list[str]:
return self._get_value_in_config(constants.CONFIG_COMMUNITY_PACKAGE_URLS) or []

def get_saved_mqtt_device_uuid(self):
def get_saved_mqtt_device_uuid(self) -> str:
if mqtt_uuid := self._get_value_in_config(constants.CONFIG_COMMUNITY_MQTT_UUID):
return mqtt_uuid
raise errors.NoBotDeviceError("No MQTT device ID has been set")

def _get_saved_bot_scoped_data_identifier(self) -> str:
return self._get_value_in_config(constants.CONFIG_COMMUNITY_LOCAL_DATA_IDENTIFIER)

def get_saved_tradingview_email(self) -> str:
return self._get_value_in_config(constants.CONFIG_COMMUNITY_TRADINGVIEW_EMAIL)

def is_tradingview_email_confirmed(self) -> bool:
return self._get_value_in_config(constants.CONFIG_COMMUNITY_TRADINGVIEW_EMAIL_CONFIRMED) is True

def _save_bot_id(self, bot_id):
self._save_value_in_config(constants.CONFIG_COMMUNITY_BOT_ID, bot_id)

def _get_saved_bot_id(self):
def _get_saved_bot_id(self) -> str:
return constants.COMMUNITY_BOT_ID or self._get_value_in_config(constants.CONFIG_COMMUNITY_BOT_ID)

def _save_value_in_config(self, key, value):
Expand Down
4 changes: 3 additions & 1 deletion octobot/community/feeds/abstract_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import time
import typing

import octobot.enums as enums
import octobot_commons.logging as bot_logging


Expand All @@ -35,7 +37,7 @@ def __init__(self, feed_url, authenticator):
def has_registered_feed(self) -> bool:
return bool(self.feed_callbacks)

async def start(self):
async def start(self, stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]):
raise NotImplementedError("start is not implemented")

async def stop(self):
Expand Down
Loading
Loading