Skip to content

Commit

Permalink
Merge branch 'staging' into chore/backmerge-master-7.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheem-opentensor committed Aug 20, 2024
2 parents 3632f69 + 6dbc1f1 commit a1be730
Show file tree
Hide file tree
Showing 78 changed files with 4,418 additions and 479 deletions.
7 changes: 7 additions & 0 deletions .github/auto_assign.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
addReviewers: true

# A list of team slugs to add as assignees
reviewers:
- opentensor/cortex

numberOfReviewers: 0
15 changes: 15 additions & 0 deletions .github/workflows/auto-assign.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Auto Assign Cortex to Pull Requests

on:
pull_request:
types: [opened, reopened]

jobs:
auto-assign:
runs-on: ubuntu-latest
steps:
- name: Auto-assign Cortex Team
uses: kentaro-m/auto-assign-action@v1.2.4
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
configuration-path: .github/auto_assign.yml
1 change: 1 addition & 0 deletions .github/workflows/e2e-subtensor-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
run:
needs: find-tests
runs-on: SubtensorCI
timeout-minutes: 45
strategy:
fail-fast: false # Allow other matrix jobs to run even if this job fails
max-parallel: 8 # Set the maximum number of parallel jobs
Expand Down
21 changes: 6 additions & 15 deletions bittensor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ def debug(on: bool = True):
# Needs to use wss://
__bellagene_entrypoint__ = "wss://parachain.opentensor.ai:443"

__local_entrypoint__ = "ws://127.0.0.1:9944"
if (
BT_SUBTENSOR_CHAIN_ENDPOINT := os.getenv("BT_SUBTENSOR_CHAIN_ENDPOINT")
) is not None:
__local_entrypoint__ = BT_SUBTENSOR_CHAIN_ENDPOINT
else:
__local_entrypoint__ = "ws://127.0.0.1:9944"

__tao_symbol__: str = chr(0x03C4)

Expand Down Expand Up @@ -200,19 +205,6 @@ def debug(on: bool = True):
},
},
},
"ValidatorIPRuntimeApi": {
"methods": {
"get_associated_validator_ip_info_for_subnet": {
"params": [
{
"name": "netuid",
"type": "u16",
},
],
"type": "Vec<u8>",
},
},
},
"SubnetInfoRuntimeApi": {
"methods": {
"get_subnet_hyperparams": {
Expand Down Expand Up @@ -318,7 +310,6 @@ def debug(on: bool = True):
strtobool,
strtobool_with_default,
get_explorer_root_url_by_network_from_map,
get_explorer_root_url_by_network_from_map,
get_explorer_url_for_network,
ss58_address_to_bytes,
U16_NORMALIZED_FLOAT,
Expand Down
127 changes: 92 additions & 35 deletions bittensor/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import traceback
import typing
import uuid
import warnings
from inspect import signature, Signature, Parameter
from typing import List, Optional, Tuple, Callable, Any, Dict, Awaitable

import uvicorn
from fastapi import FastAPI, APIRouter, Depends
from fastapi import APIRouter, Depends, FastAPI
from fastapi.responses import JSONResponse
from fastapi.routing import serialize_response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
Expand All @@ -44,18 +45,19 @@
from substrateinterface import Keypair

import bittensor
from bittensor.utils.axon_utils import allowed_nonce_window_ns, calculate_diff_seconds
from bittensor.constants import V_7_2_0
from bittensor.errors import (
BlacklistedException,
InvalidRequestNameError,
SynapseDendriteNoneException,
SynapseParsingError,
UnknownSynapseError,
NotVerifiedException,
BlacklistedException,
PriorityException,
PostProcessException,
PriorityException,
SynapseDendriteNoneException,
SynapseException,
SynapseParsingError,
UnknownSynapseError,
)
from bittensor.constants import ALLOWED_DELTA, V_7_2_0
from bittensor.threadpool import PriorityThreadPoolExecutor
from bittensor.utils import networking

Expand Down Expand Up @@ -484,17 +486,50 @@ def verify_custom(synapse: MyCustomSynapse):

async def endpoint(*args, **kwargs):
start_time = time.time()
response_synapse = forward_fn(*args, **kwargs)
if isinstance(response_synapse, Awaitable):
response_synapse = await response_synapse
return await self.middleware_cls.synapse_to_response(
synapse=response_synapse, start_time=start_time
)
response = forward_fn(*args, **kwargs)
if isinstance(response, Awaitable):
response = await response
if isinstance(response, bittensor.Synapse):
return await self.middleware_cls.synapse_to_response(
synapse=response, start_time=start_time
)
else:
response_synapse = getattr(response, "synapse", None)
if response_synapse is None:
warnings.warn(
"The response synapse is None. The input synapse will be used as the response synapse. "
"Reliance on forward_fn modifying input synapse as a side-effects is deprecated. "
"Explicitly set `synapse` on response object instead.",
DeprecationWarning,
)
# Replace with `return response` in next major version
response_synapse = args[0]

return await self.middleware_cls.synapse_to_response(
synapse=response_synapse,
start_time=start_time,
response_override=response,
)

return_annotation = forward_sig.return_annotation

if isinstance(return_annotation, type) and issubclass(
return_annotation, bittensor.Synapse
):
if issubclass(
return_annotation,
bittensor.StreamingSynapse,
):
warnings.warn(
"The forward_fn return annotation is a subclass of bittensor.StreamingSynapse. "
"Most likely the correct return annotation would be BTStreamingResponse."
)
else:
return_annotation = JSONResponse

# replace the endpoint signature, but set return annotation to JSONResponse
endpoint.__signature__ = Signature( # type: ignore
parameters=list(forward_sig.parameters.values()),
return_annotation=JSONResponse,
return_annotation=return_annotation,
)

# Add the endpoint to the router, making it available on both GET and POST methods
Expand Down Expand Up @@ -847,6 +882,8 @@ async def default_verify(self, synapse: bittensor.Synapse):
The method checks for increasing nonce values, which is a vital
step in preventing replay attacks. A replay attack involves an adversary reusing or
delaying the transmission of a valid data transmission to deceive the receiver.
The first time a nonce is seen, it is checked for freshness by ensuring it is
within an acceptable delta time range.
Authenticity and Integrity Checks
By verifying that the message's digital signature matches
Expand Down Expand Up @@ -893,33 +930,43 @@ async def default_verify(self, synapse: bittensor.Synapse):
if synapse.dendrite.nonce is None:
raise Exception("Missing Nonce")

# If we don't have a nonce stored, ensure that the nonce falls within
# a reasonable delta.

# Newer nonce structure post v7.2
if (
synapse.dendrite.version is not None
and synapse.dendrite.version >= V_7_2_0
):
# If we don't have a nonce stored, ensure that the nonce falls within
# a reasonable delta.
current_time_ns = time.time_ns()
allowed_window_ns = allowed_nonce_window_ns(
current_time_ns, synapse.timeout
)

if (
self.nonces.get(endpoint_key) is None
and synapse.dendrite.nonce
<= time.time_ns() - ALLOWED_DELTA - (synapse.timeout or 0)
and synapse.dendrite.nonce <= allowed_window_ns
):
raise Exception("Nonce is too old")
diff_seconds, allowed_delta_seconds = calculate_diff_seconds(
current_time_ns, synapse.timeout, synapse.dendrite.nonce
)
raise Exception(
f"Nonce is too old: acceptable delta is {allowed_delta_seconds:.2f} seconds but request was {diff_seconds:.2f} seconds old"
)

# If a nonce is stored, ensure the new nonce
# is greater than the previous nonce
if (
self.nonces.get(endpoint_key) is not None
and synapse.dendrite.nonce <= self.nonces[endpoint_key]
):
raise Exception("Nonce is too old")
raise Exception("Nonce is too old, a newer one was last processed")
# Older nonce structure pre v7.2
else:
if (
endpoint_key in self.nonces.keys()
and self.nonces[endpoint_key] is not None
self.nonces.get(endpoint_key) is not None
and synapse.dendrite.nonce <= self.nonces[endpoint_key]
):
raise Exception("Nonce is too small")
raise Exception("Nonce is too old, a newer one was last processed")

if not keypair.verify(message, synapse.dendrite.signature):
raise Exception(
Expand Down Expand Up @@ -952,7 +999,7 @@ def log_and_handle_error(
exception: Exception,
status_code: typing.Optional[int] = None,
start_time: typing.Optional[float] = None,
):
) -> bittensor.Synapse:
if isinstance(exception, SynapseException):
synapse = exception.synapse or synapse

Expand Down Expand Up @@ -1420,14 +1467,21 @@ async def run(

@classmethod
async def synapse_to_response(
cls, synapse: bittensor.Synapse, start_time: float
) -> JSONResponse:
cls,
synapse: bittensor.Synapse,
start_time: float,
*,
response_override: Optional[Response] = None,
) -> Response:
"""
Converts the Synapse object into a JSON response with HTTP headers.
Args:
synapse (bittensor.Synapse): The Synapse object representing the request.
start_time (float): The timestamp when the request processing started.
synapse: The Synapse object representing the request.
start_time: The timestamp when the request processing started.
response_override:
Instead of serializing the synapse, mutate the provided response object.
This is only really useful for StreamingSynapse responses.
Returns:
Response: The final HTTP response, with updated headers, ready to be sent back to the client.
Expand All @@ -1446,11 +1500,14 @@ async def synapse_to_response(

synapse.axon.process_time = time.time() - start_time

serialized_synapse = await serialize_response(response_content=synapse)
response = JSONResponse(
status_code=synapse.axon.status_code,
content=serialized_synapse,
)
if response_override:
response = response_override
else:
serialized_synapse = await serialize_response(response_content=synapse)
response = JSONResponse(
status_code=synapse.axon.status_code,
content=serialized_synapse,
)

try:
updated_headers = synapse.to_headers()
Expand Down
Loading

0 comments on commit a1be730

Please sign in to comment.