From 89fbc83869a17d7b41cd7e3d7f0f73d98d16e1cb Mon Sep 17 00:00:00 2001 From: rishiad Date: Mon, 13 Jan 2025 22:35:33 +1030 Subject: [PATCH 1/7] docs: add challenge doc --- docs/challenge.md | 236 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 docs/challenge.md diff --git a/docs/challenge.md b/docs/challenge.md new file mode 100644 index 0000000..b918f7a --- /dev/null +++ b/docs/challenge.md @@ -0,0 +1,236 @@ +# Challenge System Documentation + +## Overview + +The **Challenge System** in Storb is designed to implement a **Provable Data Possession (PDP)** protocol. This system enables a **validator** to verify that a **miner** is correctly storing specific data blocks without requiring access to the entire data. The system leverages cryptographic techniques to ensure data integrity and storage reliability in a secure and efficient manner. + +--- + +## Key Components + +### 1. **APDPKey** + +The `APDPKey` class manages the cryptographic parameters essential for the PDP protocol. + +- **Attributes:** + - `rsa`: An RSA private key (`RSAPrivateKey`) used for generating and verifying tags. + - `g`: An integer representing a generator in the multiplicative group $\mathbb{Z}_n^*$. + - `prf_key`: A byte string used as a key for the Pseudo-Random Function (PRF). + +- **Methods:** + - `generate(rsa_bits)`: Initializes the RSA key, selects a suitable generator `g`, and generates the `prf_key`. + - `clear()`: Clears all cryptographic material from the key instance. + +### 2. **APDPTag** + +The `APDPTag` class represents a cryptographic tag associated with a specific data block. + +- **Attributes:** + - `index`: An integer identifier for the data block (e.g., 0 for single-block scenarios). + - `tag_value`: An integer representing the cryptographic tag of the data block. + - `prf_value`: A byte string derived from the PRF, adding randomness to the tag. + +- **Serialization:** + - `prf_value` is serialized and deserialized using Base64 encoding to ensure safe transmission and storage. + +### 3. **Challenge** + +The `Challenge` class encapsulates the parameters sent by the validator to the miner to initiate a verification request. + +- **Attributes:** + - `tag`: An instance of `APDPTag` corresponding to the data being challenged. + - `prp_key`: A byte string serving as a key for the Pseudo-Random Permutation (PRP). + - `prf_key`: A byte string serving as a key for the PRF. + - `s`: An integer randomly selected to introduce variability in the challenge. + - `g_s`: An integer calculated as $ g^s \mod n $, where `g` is the generator and `n` is the RSA modulus. + +- **Serialization:** + - `prp_key` and `prf_key` are serialized and deserialized using Base64 encoding. + +### 4. **Proof** + +The `Proof` class represents the miner’s response to a challenge, demonstrating possession of the data. + +- **Attributes:** + - `tag_value`: An integer derived from the original tag value, modified based on the challenge. + - `block_value`: An integer representing the aggregated data block value influenced by the challenge. + - `hashed_result`: A Base64-encoded string containing the SHA-256 hash of a critical computation, ensuring proof integrity. + +### 5. **ChallengeSystem** + +The `ChallengeSystem` class orchestrates the entire PDP process, managing key operations such as key initialization, tag generation, challenge issuance, proof generation, and proof verification. + +- **Attributes:** + - `key`: An instance of `APDPKey` containing all necessary cryptographic parameters. + - `challenges`: A dictionary storing issued challenges keyed by their unique identifiers. + - `challenge_queue`: An asynchronous queue managing incoming challenges for processing. + +- **Methods:** + - `initialize_keys(rsa_bits)`: Initializes the cryptographic keys using the specified RSA key size. + - `generate_tag(data)`: Creates an `APDPTag` for a given data block. + - `issue_challenge(tag)`: Generates a `Challenge` based on the provided tag. + - `generate_proof(data, tag, challenge, n)`: Produces a `Proof` in response to a challenge. + - `verify_proof(proof, challenge, tag, n, e)`: Validates the received proof against the original challenge and tag. + +--- + +## Workflow + +### Step 1: **Key Initialization** + +Initialize the cryptographic keys required for the PDP protocol. + +```python +self.key = APDPKey() +self.key.generate(rsa_bits=DEFAULT_RSA_KEY_SIZE) +``` + +- **Process:** + - Generates an RSA private key with the specified key size. + - Selects a suitable generator `g` within $\mathbb{Z}_n$. + - Generates a PRF key (`prf_key`) for random value generation. + +### Step 2: **Tag Generation** + +Generate a cryptographic tag for a specific data block. + +```python +tag = self.generate_tag(data) +``` + +- **Process:** + - Converts the data block into an integer modulo the RSA modulus `n`. + - Generates a PRF value (`prf_value`) using the `prf_key`. + - Computes the tag value (`tag_value`) by combining the hashed data and generator operations, followed by exponentiation with the RSA private exponent. + +### Step 3: **Issuing a Challenge** + +Create and issue a challenge to a miner to verify data possession. + +```python +challenge = self.issue_challenge(tag) +``` + +- **Process:** + - Selects a random integer `s` ensuring $\gcd(s, n) = 1$. + - Calculates $ g_s = g^s \mod n $. + - Generates ephemeral keys (`prp_key`, `prf_key`) for the challenge. + - Constructs a `Challenge` object encapsulating the tag and challenge parameters. + +### Step 4: **Generating a Proof** + +The miner generates a proof in response to the received challenge. + +```python +proof = self.generate_proof(data, tag, challenge, n) +``` + +- **Process:** + - Derives a coefficient using the PRF with `prf_key`. + - Adjusts the tag value by raising it to the derived coefficient modulo `n`. + - Aggregates the data block value by multiplying it with the coefficient. + - Computes $\rho = g_s^\text{aggregated\_blocks} \mod n $ and hashes the result to produce `hashed_result`. + - Constructs a `Proof` object containing the modified tag value, aggregated block value, and hashed result. + +### Step 5: **Verifying a Proof** + +The validator verifies the proof submitted by the miner. + +```python +is_valid = self.verify_proof(proof, challenge, tag, n, e) +``` + +- **Process:** + - Recomputes $\tau = (proof * tag\_value^e) \mod n$. + - Eliminates the effect of the full domain hash by applying the modular inverse of the FDH component. + - Calculates $\tau_s = \tau^s \mod n $ based on the challenge parameter `s`. + - Hashes $\tau_s $ and compares it with the `hashed_result` from the proof. + - Returns `True` if the hashes match, indicating valid proof; otherwise, returns `False`. + +--- + +## Asynchronous Challenge Workflow + +The Challenge System operates asynchronously to support distributed and scalable environments. Below are the detailed workflows for both the **validator** and the **miner**. + +### Validator Workflow + +1. **Initiate Challenge** + + The validator selects a miner and a specific data piece to challenge. + + ```python + await self.challenge_miner(miner_id, piece_id, tag) + ``` + + - **Process:** + - Constructs a `Challenge` using `issue_challenge(tag)`. + - Signs the challenge message to ensure authenticity. + - Sets a deadline for the miner to respond. + - Sends the signed challenge message to the selected miner. + +2. **Verify Response** + + Upon receiving a proof from the miner, the validator verifies its authenticity and correctness. + + ```python + self.verify_challenge(challenge_request) + ``` + + - **Process:** + - Retrieves the corresponding `Challenge` using the challenge ID. + - Validates the proof using `verify_proof(proof, challenge, tag, n, e)`. + - Logs the result and updates the challenge status accordingly. + +### Miner Workflow + +1. **Receive Challenge** + + The miner acknowledges and queues the received challenge for processing. + + ```python + await self.ack_challenge(request) + ``` + + - **Process:** + - Validates the authenticity of the challenge (e.g., verifies the signature). + - Parses and stores the challenge parameters. + - Enqueues the challenge with its deadline for proof generation. + +2. **Generate Proof** + + The miner processes the queued challenge by generating a corresponding proof. + + ```python + proof = self.generate_proof(data, tag, challenge, n) + ``` + + - **Process:** + - Retrieves the data block associated with the challenge. + - Uses the `generate_proof` method to create a `Proof` object based on the challenge parameters. + - Ensures the proof adheres to the protocol specifications. + +3. **Send Proof** + + The miner submits the generated proof back to the validator for verification. + + - **Process:** + - Constructs a `ProofResponse` containing the proof and related metadata. + - Sends the `ProofResponse` to the validator's designated endpoint. + - Waits for acknowledgment or further instructions. + +### Processing Challenges + +The system includes an asynchronous consumer that continuously processes incoming challenges. + +```python +await self.consume_challenges() +``` + +- **Process:** + - Continuously retrieves challenges from the `challenge_queue`. + - Validates the deadline and ensures the challenge is still active. + - Reads the required data block from storage. + - Generates a proof using the stored data and challenge parameters. + - Sends the proof back to the validator for verification. + - Marks the challenge as completed or handles errors accordingly. From 0af3295568083b495d79cf29a2849a8f2b111c48 Mon Sep 17 00:00:00 2001 From: rishiad Date: Mon, 13 Jan 2025 22:35:42 +1030 Subject: [PATCH 2/7] refactor: replace pow with gmpy2.powmod for improved performance and accuracy --- storb/challenge/__init__.py | 41 +++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/storb/challenge/__init__.py b/storb/challenge/__init__.py index 3c86a66..fac069f 100644 --- a/storb/challenge/__init__.py +++ b/storb/challenge/__init__.py @@ -97,16 +97,6 @@ def prf(key: bytes, input_int: int, out_len=16) -> bytes: block = int_to_bytes(input_int, out_len) return hmac.digest(key, block, hashlib.sha256) - @staticmethod - def mod_power(a, n, m): - r = 1 - while n > 0: - if n & 1 == 1: - r = (r * a) % m - a = (a * a) % m - n >>= 1 - return r - class APDPKey(BaseModel): """Holds the cryptographic parameters for single-block APDP. Using Pydantic to store/serialize them. @@ -154,7 +144,7 @@ def generate(self, rsa_bits=DEFAULT_RSA_KEY_SIZE): g_candidate = None for _ in range(G_CANDIDATE_RETRY): candidate = random.randint(2, n - 2) - temp_val = pow(candidate, 2, n) + temp_val = gmpy2.powmod(gmpy2.mpz(candidate), 2, n) if temp_val not in (0, 1): g_candidate = temp_val break @@ -245,7 +235,7 @@ def generate_tag(data: bytes, keys: bytes, prf_key: bytes, g: int) -> APDPTag: d = key.private_numbers().d try: - tag_value = pow(gmpy2.mpz(base), gmpy2.mpz(d), gmpy2.mpz(n)) + tag_value = gmpy2.powmod(gmpy2.mpz(base), gmpy2.mpz(d), gmpy2.mpz(n)) except ValueError: raise APDPError("Failed to compute tag value.") @@ -429,7 +419,7 @@ def issue_challenge(self, tag: APDPTag) -> Challenge: "Key values are not initialized. Call initialize_keys first." ) - n = self.key.rsa.public_key().public_numbers().n + n = gmpy2.mpz(self.key.rsa.public_key().public_numbers().n) s = random.randint(2, n - 1) attempt_count = 0 @@ -439,12 +429,13 @@ def issue_challenge(self, tag: APDPTag) -> Challenge: if attempt_count > S_CANDIDATE_RETRY: raise APDPError("Failed to find suitable s in Z*_n") - g_s = pow(self.key.g, s, n) + g_s = gmpy2.powmod(gmpy2.mpz(self.key.g), s, n) prp_key = Fernet.generate_key() prf_key = Fernet.generate_key() tag = APDPTag.model_validate_json(tag) + g_s = int(g_s) return Challenge(s=s, g_s=g_s, prf_key=prf_key, prp_key=prp_key, tag=tag) def generate_proof( @@ -484,14 +475,20 @@ def generate_proof( f"Block int: {block_int}, coefficient: {coefficient}, prf: {prf_result}" ) - aggregated_tag = pow(tag.tag_value, coefficient, n) + n = gmpy2.mpz(n) + coefficient = gmpy2.mpz(coefficient) + + aggregated_tag = gmpy2.powmod(gmpy2.mpz(tag.tag_value), coefficient, n) + aggregated_tag = int(aggregated_tag) aggregated_blocks = coefficient * block_int + aggregated_blocks = gmpy2.mpz(aggregated_blocks) + aggregated_blocks = int(aggregated_blocks) logger.debug( f"Aggregated tag: {aggregated_tag}, aggregated blocks: {aggregated_blocks}" ) - rho = pow(challenge.g_s, aggregated_blocks, n) + rho = gmpy2.powmod(gmpy2.mpz(challenge.g_s), aggregated_blocks, n) hashed_result = hashlib.sha256(int_to_bytes(rho)).digest() logger.debug(f"Hashed result: {hashed_result}") @@ -534,23 +531,27 @@ def verify_proof( rsa_key = self.key.rsa + e = gmpy2.mpz(e) + n = gmpy2.mpz(n) + try: - tau = pow(proof.tag_value, e, n) + tau = gmpy2.powmod(gmpy2.mpz(proof.tag_value), e, n) except ValueError: raise APDPError("Failed to compute tau in verification.") logger.debug(f"Computed tau: {tau}") prf_result = CryptoUtils.prf(challenge.prf_key, 0) coefficient = int.from_bytes(prf_result, "big") % n + coefficient = gmpy2.mpz(coefficient) fdh_hash = CryptoUtils.full_domain_hash(rsa_key, tag.prf_value) - denominator = pow(fdh_hash, coefficient, n) % n + denominator = gmpy2.powmod(gmpy2.mpz(fdh_hash), coefficient, n) % n try: - denominator_inv = pow(denominator, -1, n) + denominator_inv = gmpy2.powmod(gmpy2.mpz(denominator), -1, n) except ValueError: raise APDPError("Failed to invert denominator modulo n.") tau = (tau * denominator_inv) % n - tau_s = pow(tau, challenge.s, n) + tau_s = gmpy2.powmod(tau, challenge.s, n) expected_hash = hashlib.sha256(int_to_bytes(tau_s)).digest() expected_hash = base64.b64encode(expected_hash).decode("utf-8") From 77d8c6684141df0486dd3f55bd0569298e5ad1f4 Mon Sep 17 00:00:00 2001 From: rishiad Date: Mon, 13 Jan 2025 22:35:57 +1030 Subject: [PATCH 3/7] refactor: add y and n options --- storb/util/key_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storb/util/key_manager.py b/storb/util/key_manager.py index 6f1d019..b17a77d 100644 --- a/storb/util/key_manager.py +++ b/storb/util/key_manager.py @@ -48,9 +48,9 @@ def prompt_confirmation(self, message: str) -> bool: """ while True: - response = input(f"{message} (yes/no): ").strip().lower() - if response in ["yes", "no"]: - return response == "yes" + response = input(f"{message} (y/n): ").strip().lower() + if response in ["yes", "no", "y", "n"]: + return response == "yes" or response == "y" logger.warning("Please respond with 'yes' or 'no'.") def initialize_keys(self, rsa_bits=DEFAULT_RSA_KEY_SIZE): From e5e0bf306c4db5f1808b976e4a490e7cc008fb1b Mon Sep 17 00:00:00 2001 From: Rishi <88962693+rishiad@users.noreply.github.com> Date: Mon, 13 Jan 2025 22:45:34 +1030 Subject: [PATCH 4/7] fix: challenge docs math formatting --- docs/challenge.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/challenge.md b/docs/challenge.md index b918f7a..97e0692 100644 --- a/docs/challenge.md +++ b/docs/challenge.md @@ -42,7 +42,7 @@ The `Challenge` class encapsulates the parameters sent by the validator to the m - `prp_key`: A byte string serving as a key for the Pseudo-Random Permutation (PRP). - `prf_key`: A byte string serving as a key for the PRF. - `s`: An integer randomly selected to introduce variability in the challenge. - - `g_s`: An integer calculated as $ g^s \mod n $, where `g` is the generator and `n` is the RSA modulus. + - `g_s`: An integer calculated as $g^s \mod n$, where `g` is the generator and `n` is the RSA modulus. - **Serialization:** - `prp_key` and `prf_key` are serialized and deserialized using Base64 encoding. @@ -113,7 +113,7 @@ challenge = self.issue_challenge(tag) - **Process:** - Selects a random integer `s` ensuring $\gcd(s, n) = 1$. - - Calculates $ g_s = g^s \mod n $. + - Calculates $g_s = g^s \mod n$. - Generates ephemeral keys (`prp_key`, `prf_key`) for the challenge. - Constructs a `Challenge` object encapsulating the tag and challenge parameters. @@ -129,7 +129,7 @@ proof = self.generate_proof(data, tag, challenge, n) - Derives a coefficient using the PRF with `prf_key`. - Adjusts the tag value by raising it to the derived coefficient modulo `n`. - Aggregates the data block value by multiplying it with the coefficient. - - Computes $\rho = g_s^\text{aggregated\_blocks} \mod n $ and hashes the result to produce `hashed_result`. + - Computes rho and hashes the result to produce `hashed_result`. - Constructs a `Proof` object containing the modified tag value, aggregated block value, and hashed result. ### Step 5: **Verifying a Proof** @@ -141,10 +141,10 @@ is_valid = self.verify_proof(proof, challenge, tag, n, e) ``` - **Process:** - - Recomputes $\tau = (proof * tag\_value^e) \mod n$. + - Recomputes $tau = (proof * tag\_value^e) \mod n$. - Eliminates the effect of the full domain hash by applying the modular inverse of the FDH component. - - Calculates $\tau_s = \tau^s \mod n $ based on the challenge parameter `s`. - - Hashes $\tau_s $ and compares it with the `hashed_result` from the proof. + - Calculates $tau_s = \tau^s \mod n$ based on the challenge parameter `s`. + - Hashes $tau_s$ and compares it with the `hashed_result` from the proof. - Returns `True` if the hashes match, indicating valid proof; otherwise, returns `False`. --- From f4de4003d90d80054a0627f4f64d68ac51348fd5 Mon Sep 17 00:00:00 2001 From: rishiad Date: Mon, 13 Jan 2025 23:27:23 +1030 Subject: [PATCH 5/7] refactor: clean up imports and enhance docstrings for clarity --- storb/dht/__init__.py | 4 +- storb/dht/storage.py | 141 +++++++++++++++++++++++------------------- storb/miner/miner.py | 10 ++- storb/neuron.py | 1 - storb/util/query.py | 88 +++++++++++++++++++++++--- 5 files changed, 166 insertions(+), 78 deletions(-) diff --git a/storb/dht/__init__.py b/storb/dht/__init__.py index df15aea..e633e7b 100644 --- a/storb/dht/__init__.py +++ b/storb/dht/__init__.py @@ -4,7 +4,6 @@ import threading from pathlib import Path -from fiber.logging_utils import get_logger from kademlia.network import Server from storb.constants import ( @@ -341,6 +340,7 @@ async def get_chunk_entry(self, chunk_hash: str) -> ChunkDHTValue: RuntimeError If the chunk entry fails to retrieve. """ + logger.info(f"Retrieving chunk entry for chunk hash: {chunk_hash}") key = build_store_key("chunk", chunk_hash) try: @@ -378,6 +378,7 @@ def store_piece_entry(self, piece_hash: str, value: PieceDHTValue) -> None: ----- The key for the piece entry is "piece:piece_hash". """ + logger.info(f"Storing piece entry for piece_hash: {piece_hash}") value: bytes = value.model_dump_json().encode("utf-8") key: bytes = build_store_key("piece", piece_hash) @@ -414,6 +415,7 @@ async def get_piece_entry(self, piece_hash: str) -> PieceDHTValue: RuntimeError If the piece entry fails to retrieve. """ + logger.info(f"Retrieving piece entry for piece hash: {piece_hash}") key = build_store_key("piece", piece_hash) try: diff --git a/storb/dht/storage.py b/storb/dht/storage.py index 253649c..85ba93d 100644 --- a/storb/dht/storage.py +++ b/storb/dht/storage.py @@ -70,7 +70,6 @@ def __init__(self, db_dir: str): self.mem: dict[bytes, tuple[DHTValue, float]] = {} self.loop = asyncio.new_event_loop() - self.semaphore = asyncio.Semaphore(100) self.thread = threading.Thread(target=self._start_event_loop, daemon=True) self.thread.start() @@ -90,6 +89,7 @@ def set(self, key: bytes, value: bytes): value : DHTValue The value to store. """ + logger.debug(f"Number of pending tasks: {len(asyncio.all_tasks(self.loop))}") self.mem[key] = (value, time.time()) namespace, key = parse_store_key(key) @@ -97,7 +97,6 @@ def set(self, key: bytes, value: bytes): future = asyncio.run_coroutine_threadsafe( self._db_write_data(namespace, key, value), self.loop ) - # Optionally log or monitor the future without blocking def handle_future(fut): try: @@ -105,7 +104,6 @@ def handle_future(fut): except Exception as e: logger.error(f"Database write error for key {key}: {e}") - # Add a callback to the future future.add_done_callback(handle_future) def __getitem__(self, key: bytes) -> bytes: @@ -152,6 +150,21 @@ def __getitem__(self, key: bytes) -> bytes: raise def get(self, key: bytes, default=None): + """Retrieve a value from the DHT corresponding to the key. + + Args: + key (bytes) + The key in 'namespace:key' format encoded in UTF-8. + default (None, optional) + Defaults to None. + + Returns: + DHTValue: The retrieved value. + + Notes: + If the key does not exist, this method returns the default value. + """ + try: return self.__getitem__(key) except KeyError: @@ -209,68 +222,68 @@ async def _db_write_data(self, namespace: str, key: str, value: bytes): ValueError If the value format is invalid. """ - async with self.semaphore: - try: - value = value.decode("utf-8") - except ValueError: - raise ValueError( - f"DISK: Invalid value format: {value}, expected utf-8 encoded bytes" - ) - - logger.debug(f"DISK: Setting {namespace}:{key} => {value}") - async with db.get_db_connection(self.db_path) as conn: - await conn.execute("PRAGMA journal_mode=WAL;") - match namespace: - case "chunk": - val = ChunkDHTValue.model_validate_json(value) - entry = ChunkDHTValue( - chunk_hash=key, - validator_id=val.validator_id, - piece_hashes=val.piece_hashes, - chunk_idx=val.chunk_idx, - k=val.k, - m=val.m, - chunk_size=val.chunk_size, - padlen=val.padlen, - original_chunk_size=val.original_chunk_size, - signature=val.signature, - ) - logger.debug(f"flushing chunk entry {entry} to disk") - await db.set_chunk_entry(conn, entry) - - case "piece": - val = PieceDHTValue.model_validate_json(value) - entry = PieceDHTValue( - piece_hash=key, - validator_id=val.validator_id, - miner_id=val.miner_id, - chunk_idx=val.chunk_idx, - piece_idx=val.piece_idx, - piece_type=val.piece_type, - tag=val.tag, - signature=val.signature, - ) - logger.debug(f"flushing piece entry {entry} to disk") - await db.set_piece_entry(conn, entry) - - case "tracker": - val = TrackerDHTValue.model_validate_json(value) - entry = TrackerDHTValue( - infohash=key, - validator_id=val.validator_id, - filename=val.filename, - length=val.length, - chunk_size=val.chunk_size, - chunk_count=val.chunk_count, - chunk_hashes=val.chunk_hashes, - creation_timestamp=val.creation_timestamp, - signature=val.signature, - ) - logger.debug(f"flushing tracker entry {entry} to disk") - await db.set_tracker_entry(conn, entry) - case _: - raise ValueError(f"Invalid key namespace: {namespace}") + try: + value = value.decode("utf-8") + except ValueError: + raise ValueError( + f"DISK: Invalid value format: {value}, expected utf-8 encoded bytes" + ) + + logger.debug(f"DISK: Setting {namespace}:{key} => {value}") + async with db.get_db_connection(self.db_path) as conn: + await conn.execute("PRAGMA journal_mode=WAL;") + match namespace: + case "chunk": + val = ChunkDHTValue.model_validate_json(value) + entry = ChunkDHTValue( + chunk_hash=key, + validator_id=val.validator_id, + piece_hashes=val.piece_hashes, + chunk_idx=val.chunk_idx, + k=val.k, + m=val.m, + chunk_size=val.chunk_size, + padlen=val.padlen, + original_chunk_size=val.original_chunk_size, + signature=val.signature, + ) + logger.debug(f"flushing chunk entry {entry} to disk") + await db.set_chunk_entry(conn, entry) + + case "piece": + val = PieceDHTValue.model_validate_json(value) + entry = PieceDHTValue( + piece_hash=key, + validator_id=val.validator_id, + miner_id=val.miner_id, + chunk_idx=val.chunk_idx, + piece_idx=val.piece_idx, + piece_type=val.piece_type, + tag=val.tag, + signature=val.signature, + ) + logger.debug(f"flushing piece entry {entry} to disk") + await db.set_piece_entry(conn, entry) + + case "tracker": + val = TrackerDHTValue.model_validate_json(value) + entry = TrackerDHTValue( + infohash=key, + validator_id=val.validator_id, + filename=val.filename, + length=val.length, + chunk_size=val.chunk_size, + chunk_count=val.chunk_count, + chunk_hashes=val.chunk_hashes, + creation_timestamp=val.creation_timestamp, + signature=val.signature, + ) + logger.debug(f"flushing tracker entry {entry} to disk") + await db.set_tracker_entry(conn, entry) + + case _: + raise ValueError(f"Invalid key namespace: {namespace}") async def _db_read_data(self, key: bytes) -> DHTValue: """Read from the DB in the event loop thread, based on the namespace. diff --git a/storb/miner/miner.py b/storb/miner/miner.py index 2fbee8f..e0b479d 100644 --- a/storb/miner/miner.py +++ b/storb/miner/miner.py @@ -1,6 +1,5 @@ import asyncio import datetime -import sys import threading import uuid @@ -35,6 +34,7 @@ def __init__(self): ] = asyncio.PriorityQueue() async def start(self): + """Starts the miner's operations.""" self.app_init() await self.start_dht() @@ -152,7 +152,13 @@ async def store_piece( return response async def get_piece(self, request: protocol.Retrieve): - """Returns a piece from storage as JSON metadata and a file.""" + """Returns a piece from storage as JSON metadata and a file. + + Parameters + ---------- + request : protocol.Retrieve + The request object containing the piece_id to retrieve. + """ logger.info("Retrieving piece...") logger.debug(f"piece_id to retrieve: {request.piece_id}") diff --git a/storb/neuron.py b/storb/neuron.py index 3533ec6..7e7b885 100644 --- a/storb/neuron.py +++ b/storb/neuron.py @@ -4,7 +4,6 @@ from abc import ABC, abstractmethod import httpx -import uvicorn from fastapi import FastAPI from fiber.chain import chain_utils, interface, post_ip_to_chain from fiber.chain.metagraph import Metagraph diff --git a/storb/util/query.py b/storb/util/query.py index c8826e9..8c151b5 100644 --- a/storb/util/query.py +++ b/storb/util/query.py @@ -63,12 +63,6 @@ def factory_app(conf: StorbConfig, debug: bool = False) -> FastAPI: async def lifespan(app: FastAPI): config = custom_config(conf) metagraph = config.metagraph - # sync_thread = None - # if metagraph.substrate is not None: - # sync_thread = threading.Thread( - # target=metagraph.periodically_sync_nodes, daemon=True - # ) - # sync_thread.start() yield @@ -76,8 +70,6 @@ async def lifespan(app: FastAPI): # TODO: should this be moved elsewhere? metagraph.shutdown() - # if metagraph.substrate is not None and sync_thread is not None: - # sync_thread.join() app = FastAPI(lifespan=lifespan, debug=debug) @@ -178,6 +170,33 @@ async def make_non_streamed_post( payload: Payload, timeout: float = QUERY_TIMEOUT, ) -> httpx.Response: + """Make a non-streamed POST request. Adapted from Fiber's implementation + + Parameters + ---------- + httpx_client : httpx.AsyncClient + The HTTPX client to use for the request + server_address : str + The address of the server to send the request to + validator_ss58_address : str + The SS58 address of the validator + miner_ss58_address : str + The SS58 address of the miner + keypair : Keypair + The keypair to use for signing the request + endpoint : str + The endpoint to send the request to + payload : Payload + The payload to send with the request + timeout : float, optional + The timeout for the request, by default QUERY_TIMEOUT + + Returns + ------- + httpx.Response + The response from the server + """ + # Prepare the headers with a nonce content = bytes(str(payload.data or "").encode("utf-8")) + ( bytes(str(payload.file or "").encode("utf-8")) if payload.file else b"" @@ -227,11 +246,35 @@ async def make_streamed_post( miner_ss58_address: str, keypair: Keypair, endpoint: str, - # payload: dict[str, Any], payload: Payload, timeout: float = QUERY_TIMEOUT, ) -> AsyncGenerator[bytes, None]: - """Make a streamed POST request. Adapted from Fiber's implementation""" + """Make a streamed POST request. Adapted from Fiber's implementation + + Parameters + ---------- + httpx_client : httpx.AsyncClient + The HTTPX client to use for the request + server_address : str + The address of the server to send the request to + validator_ss58_address : str + The SS58 address of the validator + miner_ss58_address : str + The SS58 address of the miner + keypair : Keypair + The keypair to use for signing the request + endpoint : str + The endpoint to send the request to + payload : Payload + The payload to send with the request + timeout : float, optional + The timeout for the request, by default QUERY_TIMEOUT + + Yields + ------- + AsyncGenerator[bytes, None] + An async generator that yields the response content line by line + """ content = ( bytes(str(payload.data or "").encode("utf-8")) @@ -288,6 +331,31 @@ async def make_non_streamed_get( payload: Payload, timeout: float = QUERY_TIMEOUT, ) -> httpx.Response: + """Make a non-streamed GET request. + + Parameters + ---------- + httpx_client (httpx.AsyncClient) + httpx.AsyncClient instance + server_address (str) + Server address to send the request to + validator_ss58_address (str) + SS58 address of the validator making the request + symmetric_key_uuid (str) + UUID of the symmetric key + endpoint (str) + Endpoint to send the request to + payload (Payload) + Payload to send with the request + timeout (float, optional) + Timeout for the request, by default QUERY_TIMEOUT + + Returns + ------- + httpx.Response + Response from the server + """ + headers = _get_headers(symmetric_key_uuid, validator_ss58_address) response = await httpx_client.get( json=payload.data.model_dump(), From 799df156ef6466cead755474d809971e0e7148ef Mon Sep 17 00:00:00 2001 From: Ray Okamoto Date: Mon, 13 Jan 2025 23:55:46 +1030 Subject: [PATCH 6/7] docs: update challenge docs --- docs/challenge.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/challenge.md b/docs/challenge.md index 97e0692..cba63a8 100644 --- a/docs/challenge.md +++ b/docs/challenge.md @@ -2,7 +2,7 @@ ## Overview -The **Challenge System** in Storb is designed to implement a **Provable Data Possession (PDP)** protocol. This system enables a **validator** to verify that a **miner** is correctly storing specific data blocks without requiring access to the entire data. The system leverages cryptographic techniques to ensure data integrity and storage reliability in a secure and efficient manner. +The **Challenge System** in Storb is designed to implement a [**Provable Data Possession (PDP)** protocol](https://dl.acm.org/doi/10.1145/1315245.1315318). This system enables a **validator** to verify that a **miner** is correctly storing specific data blocks without requiring access to the entire data. The system leverages cryptographic techniques to ensure data integrity and storage reliability in a secure and efficient manner. --- @@ -87,8 +87,8 @@ self.key.generate(rsa_bits=DEFAULT_RSA_KEY_SIZE) - **Process:** - Generates an RSA private key with the specified key size. - - Selects a suitable generator `g` within $\mathbb{Z}_n$. - - Generates a PRF key (`prf_key`) for random value generation. + - Selects a suitable generator `g` in $\mathbb{Z}_n$. + - Generates a PRF key (`prf_key`) for pseudorandom number generation. ### Step 2: **Tag Generation** @@ -141,8 +141,8 @@ is_valid = self.verify_proof(proof, challenge, tag, n, e) ``` - **Process:** - - Recomputes $tau = (proof * tag\_value^e) \mod n$. - - Eliminates the effect of the full domain hash by applying the modular inverse of the FDH component. + - Recomputes `tau = (proof * tag_value^e) mod n`. + - Eliminates the effect of the full domain hash (FDH) by applying the modular inverse of the FDH component. - Calculates $tau_s = \tau^s \mod n$ based on the challenge parameter `s`. - Hashes $tau_s$ and compares it with the `hashed_result` from the proof. - Returns `True` if the hashes match, indicating valid proof; otherwise, returns `False`. From 495d16355b416602a5c970ba86e1e2918e32f34c Mon Sep 17 00:00:00 2001 From: Ray Okamoto Date: Tue, 14 Jan 2025 00:01:30 +1030 Subject: [PATCH 7/7] chore: bump version to 0.2.0 --- min_compute.yml | 2 +- pyproject.toml | 2 +- settings.toml.example | 2 +- storb/__init__.py | 2 +- uv.lock | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/min_compute.yml b/min_compute.yml index 1d14821..0896897 100644 --- a/min_compute.yml +++ b/min_compute.yml @@ -7,7 +7,7 @@ # NOTE: Specification for miners may be different from validators -version: "0.1.0" # update this version key as needed, ideally should match your release version +version: "0.2.0" # update this version key as needed, ideally should match your release version compute_spec: diff --git a/pyproject.toml b/pyproject.toml index 14c5d67..40a31ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "storb" -version = "0.1.0" +version = "0.2.0" description = "An object storage subnet on the Bittensor network" readme = "README.md" license = { file = "LICENSE" } diff --git a/settings.toml.example b/settings.toml.example index d4004e6..60fdc27 100644 --- a/settings.toml.example +++ b/settings.toml.example @@ -1,4 +1,4 @@ -version = "0.1.0" +version = "0.2.0" netuid = 0 external_ip = "0.0.0.0" diff --git a/storb/__init__.py b/storb/__init__.py index 363c330..2a9cfab 100644 --- a/storb/__init__.py +++ b/storb/__init__.py @@ -1,6 +1,6 @@ """Storb: An object storage subnet on the Bittensor network""" -__version__ = "0.1.0" +__version__ = "0.2.0" def get_spec_version(version: str) -> int: diff --git a/uv.lock b/uv.lock index ea44a5d..6a6e7c1 100644 --- a/uv.lock +++ b/uv.lock @@ -884,7 +884,7 @@ wheels = [ [[package]] name = "storb" -version = "0.1.0" +version = "0.2.0" source = { virtual = "." } dependencies = [ { name = "aiofiles" },