Skip to content

Commit

Permalink
[Community] add get_production_anon_client
Browse files Browse the repository at this point in the history
  • Loading branch information
GuillaumeDSM committed Oct 27, 2023
1 parent 788b32d commit 8c0490c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 36 deletions.
11 changes: 7 additions & 4 deletions octobot/community/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ class CommunityAuthentication(authentication.Authenticator):
SESSION_HEADER = "X-Session"
GQL_AUTHORIZATION_HEADER = "Authorization"

def __init__(self, feed_url, config=None):
def __init__(self, feed_url, config=None, backend_url=None, backend_key=None):
super().__init__()
self.feed_url = feed_url
self.backend_url = backend_url or identifiers_provider.IdentifiersProvider.BACKEND_URL
self.backend_key = backend_key or identifiers_provider.IdentifiersProvider.BACKEND_KEY
self.configuration_storage = supabase_backend.SyncConfigurationStorage(config)
self.supabase_client = self._create_client()
self.user_account = community_user_account.CommunityUserAccount()
Expand All @@ -80,10 +82,11 @@ def __init__(self, feed_url, config=None):
self._fetch_account_task = None

@staticmethod
def create(configuration: commons_configuration.Configuration):
def create(configuration: commons_configuration.Configuration, **kwargs):
return CommunityAuthentication.instance(
None,
config=configuration,
**kwargs,
)

def update(self, configuration: commons_configuration.Configuration):
Expand Down Expand Up @@ -178,8 +181,8 @@ def _supports_mock():

def _create_client(self):
return supabase_backend.CommunitySupabaseClient(
identifiers_provider.IdentifiersProvider.BACKEND_URL,
identifiers_provider.IdentifiersProvider.BACKEND_KEY,
self.backend_url,
self.backend_key,
self.configuration_storage
)

Expand Down
55 changes: 37 additions & 18 deletions octobot/community/supabase_backend/community_supabase_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
self.event_loop = None
super().__init__(supabase_url, supabase_key, options=options)
self.is_admin = False
self.production_anon_client = None

async def sign_in(self, email: str, password: str) -> None:
try:
Expand Down Expand Up @@ -346,7 +347,8 @@ async def fetch_candles_history(
self, exchange: str, symbol: str, time_frame: commons_enums.TimeFrames,
first_open_time: float, last_open_time: float
) -> list:
historical_candles = await self._fetch_paginated_signals_history(
historical_candles = await self._fetch_paginated_history(
await self.get_production_anon_client(),
"temp_ohlcv_history",
"timestamp, open, high, low, close, volume",
{
Expand All @@ -363,26 +365,25 @@ async def fetch_candles_history(
async def fetch_gpt_signal(
self, exchange: str, symbol: str, time_frame: commons_enums.TimeFrames, timestamp: float, version: str
) -> str:
signals = (
await self.table("temp_chatgpt_signals").select("signal").match(
{
"timestamp": timestamp,
"exchange_internal_name": exchange,
"symbol": symbol,
"time_frame": time_frame.value,
"metadata->>version": version,
},
).execute()
).data
signals = (await (await self.get_production_anon_client()).table("temp_chatgpt_signals").select("signal").match(
{
"timestamp": self.get_formatted_time(timestamp),
"exchange_internal_name": exchange,
"symbol": symbol,
"time_frame": time_frame.value,
"metadata->>version": version,
},
).execute()).data
if signals:
return signals[0]["content"]
return signals[0]["signal"]["content"]
return ""

async def fetch_gpt_signals_history(
self, exchange: str, symbol: str, time_frame: commons_enums.TimeFrames,
first_open_time: float, last_open_time: float, version: str
) -> dict:
historical_signals = await self._fetch_paginated_signals_history(
historical_signals = await self._fetch_paginated_history(
await self.get_production_anon_client(),
"temp_chatgpt_signals",
"timestamp, signal",
{
Expand All @@ -397,8 +398,16 @@ async def fetch_gpt_signals_history(
)
return self._format_gpt_signals(historical_signals)

async def _fetch_paginated_signals_history(
self, table_name: str, select: str, matcher: dict,
async def get_production_anon_client(self):
if self.production_anon_client is None:
self.production_anon_client = await self.init_other_postgrest_client(
supabase_url=constants.COMMUNITY_PRODUCTION_BACKEND_URL,
supabase_key=constants.COMMUNITY_PRODUCTION_BACKEND_KEY,
)
return self.production_anon_client

async def _fetch_paginated_history(
self, client, table_name: str, select: str, matcher: dict,
time_interval: float, first_open_time: float, last_open_time: float
) -> list:
total_elements_count = (last_open_time - first_open_time) // time_interval
Expand All @@ -409,7 +418,7 @@ async def _fetch_paginated_signals_history(
request_count = 0
while request_count < max_requests_count:
request = (
self.table(table_name).select(select)
client.table(table_name).select(select)
.match(matcher).gte(
"timestamp", self.get_formatted_time(first_open_time)
).lte(
Expand Down Expand Up @@ -439,7 +448,7 @@ async def _fetch_paginated_signals_history(

def _format_gpt_signals(self, signals: list):
return {
signal["timestamp"]: signal["signal"]["content"]
self.get_parsed_time(signal["timestamp"]).timestamp(): signal["signal"]["content"]
for signal in signals
}

Expand Down Expand Up @@ -525,3 +534,13 @@ def get_parsed_time(str_time: str) -> datetime.datetime:

async def _get_user(self) -> gotrue.User:
return self.auth.get_user().user

async def close(self):
await super().close()
if self.production_anon_client is not None:
try:
await self.production_anon_client.aclose()
except RuntimeError:
# happens when the event loop is closed already
pass
self.production_anon_client = None
38 changes: 27 additions & 11 deletions octobot/community/supabase_backend/supabase_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import contextlib
import copy
import typing
import storage3
import storage3.constants
Expand Down Expand Up @@ -61,8 +62,8 @@ def _init_postgrest_client(
rest_url: str,
supabase_key: str,
headers: typing.Dict[str, str],
schema: str,
timeout, # skip typing to avoid httpx import
schema: str = "public",
) -> postgrest.AsyncPostgrestClient:
"""Private helper for creating an instance of the Postgrest client."""
# Override to use postgrest.AsyncPostgrestClient and allow async requests
Expand Down Expand Up @@ -96,21 +97,33 @@ def table(self, table_name: str) -> postgrest.AsyncRequestBuilder: # typing ove
return self.from_(table_name)

@contextlib.asynccontextmanager
async def other_postgres_client(self, schema):
other_postgres: postgrest.AsyncPostgrestClient = None
async def other_postgres_client(self, supabase_url: str = None, supabase_key: str = None, schema: str = "public"):
other_postgres = None
try:
other_postgres = AuthenticatedAsyncSupabaseClient._init_postgrest_client(
rest_url=self.rest_url,
supabase_key=self.supabase_key,
headers=self.options.headers,
schema=schema,
timeout=self.options.postgrest_client_timeout,
other_postgres = await self.init_other_postgrest_client(
supabase_url=supabase_url, supabase_key=supabase_key, schema=schema
)
yield other_postgres
finally:
if other_postgres is not None:
await other_postgres.aclose()

async def init_other_postgrest_client(
self, supabase_url: str = None, supabase_key: str = None, schema: str = "public"
) -> postgrest.AsyncPostgrestClient:
supabase_key = supabase_key or self.supabase_key
headers = self.options.headers
if supabase_key != self.supabase_key:
headers = copy.deepcopy(postgrest.constants.DEFAULT_POSTGREST_CLIENT_HEADERS)
headers.update(self._format_auth_headers(supabase_key, supabase_key))
return AuthenticatedAsyncSupabaseClient._init_postgrest_client(
rest_url=f"{supabase_url}/rest/v1" if supabase_url else self.rest_url,
supabase_key=supabase_key,
headers=headers,
timeout=self.options.postgrest_client_timeout,
schema=schema,
)

async def close(self):
# timer has to be stopped, there is no public stop api
if self.auth._refresh_token_timer:
Expand Down Expand Up @@ -148,9 +161,12 @@ def _use_auth_session(self, event: gotrue.AuthChangeEvent, _):
def _get_auth_headers(self):
"""Helper method to get auth headers."""
# What's the corresponding method to get the token
return self._format_auth_headers(self.supabase_key, self._get_auth_key())

def _format_auth_headers(self, supabase_key, auth_token):
return {
"apiKey": self.supabase_key,
"Authorization": f"Bearer {self._get_auth_key()}",
"apiKey": supabase_key,
"Authorization": f"Bearer {auth_token}",
}

def _get_auth_key(self):
Expand Down
7 changes: 4 additions & 3 deletions octobot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@
OCTOBOT_COMMUNITY_LANDING_URL = os.getenv("COMMUNITY_SERVER_URL", "https://octobot.cloud")
OCTOBOT_COMMUNITY_URL = os.getenv("COMMUNITY_SERVER_URL", "https://app.octobot.cloud")
OCTOBOT_COMMUNITY_RECOVER_PASSWORD_URL = OCTOBOT_COMMUNITY_URL
# todo use real production db
# default env
COMMUNITY_BACKEND_URL = os.getenv("COMMUNITY_BACKEND_URL", "https://nwhpvrguwcihhizrnyoe.supabase.co")
COMMUNITY_BACKEND_KEY = os.getenv("COMMUNITY_BACKEND_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6Im53aHB2cmd1d2NpaGhpenJueW9lIiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTU2NDQxMDcsImV4cCI6MjAxMTIyMDEwN30.AILcgv0l6hl_0IUEPlWh1wiu9RIpgrkGZGERM5uXftE")


# staging env SHOULD ONLY BE USED THROUGH CommunityIdentifiersProvider
STAGING_OCTOBOT_COMMUNITY_LANDING_URL = os.getenv("COMMUNITY_SERVER_URL", "https://beta.octobot.cloud")
STAGING_OCTOBOT_COMMUNITY_URL = os.getenv("COMMUNITY_SERVER_URL", "https://app-beta.octobot.cloud/")
STAGING_COMMUNITY_RECOVER_PASSWORD_URL = STAGING_OCTOBOT_COMMUNITY_URL
STAGING_COMMUNITY_BACKEND_URL = os.getenv("COMMUNITY_BACKEND_URL", "https://wmfkgvgzokyzhvxowbyg.supabase.co")
STAGING_COMMUNITY_BACKEND_KEY = os.getenv("COMMUNITY_BACKEND_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6IndtZmtndmd6b2t5emh2eG93YnlnIiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTE0NDA1MTEsImV4cCI6MjAwNzAxNjUxMX0.YZQl7LYgvnzO_Jizs0UKfPEaqPoV2EwhjunH8gime8o")


# production env, ignored by CommunityIdentifiersProvider
COMMUNITY_PRODUCTION_BACKEND_URL = os.getenv("COMMUNITY_PRODUCTION_BACKEND_URL", COMMUNITY_BACKEND_URL)
COMMUNITY_PRODUCTION_BACKEND_KEY = os.getenv("COMMUNITY_PRODUCTION_BACKEND_KEY", COMMUNITY_BACKEND_KEY)

CONFIG_COMMUNITY = "community"
CONFIG_COMMUNITY_BOT_ID = "bot_id"
Expand Down

0 comments on commit 8c0490c

Please sign in to comment.