diff --git a/README.md b/README.md index e69de29..96aeca5 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,78 @@ +# BT DDoS Shield + +## Overview + +`bt-ddos-shield` is a Python package designed to address the critical issue of Distributed Denial-of-Service (DDoS) attacks in bittensor ecosystem. The project leverages encryption to protect communication between miners and validators, ensuring the IPs and ports of these nodes remain secure and hidden from malicious actors. This decentralized solution aims to eliminate the financial burden caused by traditional DDoS protection methods like WAF and Cloudflare, which are often costly and impractical for subnets handling large volumes of data. + +## Project Goals + +The goal of this project is to implement a distributed and decentralized system that: +- Protects miner and validator IP addresses from exposure, preventing potential DDoS attacks. +- Removes the need for on-chain storage of unencrypted IP addresses and ports, eliminating an obvious attack surface. +- Uses encrypted messages between miners and validators to securely exchange connection information (IP, IP version, and port). +- Provides a scalable, decentralized alternative to traditional DDoS protection methods while maintaining performance and minimizing attack vectors. + +## Features + +1. **Encryption-Based Communication**: + - Uses ECIES (Elliptic Curve Integrated Encryption Scheme) to encrypt communication between miners and validators. + - The encrypted data includes the miner's hotkey, subnet ID, and validator connection details (IP, IP version, and port). + +2. **Decentralized DDoS Mitigation**: + - Removes the need for centralized DDoS protection services by distributing connection information securely across nodes. + - Prevents IP address exposure by sharing encrypted connection data through a decentralized network of subtensors. + +3. **Secure Message Exchange**: + - Validators can request the connection information of miners from the subtensor network. This information is validated and decrypted locally using the validator's private key. + + +## Installation +``` +pip install bt-ddos-shield +``` + +## Contribution Guidelines + +To contribute to the `bt-ddos-shield` package, the steps below: + +### 1. Clone the Repository: + +```bash +git clone https://github.com/bactensor/bt-ddos-shield.git +cd bt-ddos-shield +``` + +### 2. Install Dependencies: + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install poetry +poetry install +``` + +### 3. Build the Pacakge: +```bash +poetry build +``` + +### 4. Run Tests: +```bash +poetry run pytest +``` + +### 5. Local Pacakge Usage + +To install the package locally for development purposes: +```bash +pip insatll -e +``` + +### 6. Publish the Package +```bash +poetry publish +``` + +## License + +See the [LICENSE](./LICENSE) file for more details. diff --git a/bt_ddos_shield/encryption.py b/bt_ddos_shield/encryption.py index 331d105..a0a1d98 100644 --- a/bt_ddos_shield/encryption.py +++ b/bt_ddos_shield/encryption.py @@ -1,8 +1,70 @@ +import json +from ecies import encrypt, decrypt + + +class EncryptionError(Exception): + pass + + +class DecryptionError(Exception): + pass + + class EncryptionManager: - @staticmethod - def encrypt_message(): + def __init__(self): pass - @staticmethod - def decrypt_message(): - pass + def encrypt_data( + self, public_key: str, data: dict, encoding: str = "utf-8" + ) -> bytes: + """ + Encrypts the given data as a dictionary using the provided public key. + + Args: + public_key (str): The public key in string format that will be used to encrypt the data. + data (dict): The data to be encrypted, represented as a dictionary. + encoding (str): Encoding format for the string representation of the data (default: 'utf-8). + + Returns: + bytes: The encrypted data in bytes format. + """ + if not isinstance(public_key, str): + raise TypeError(f"Public key must be of type str: {public_key}") + + if not isinstance(data, dict): + raise TypeError(f"Data must be of type dict: {data}") + + try: + data_str = json.dumps(data).encode(encoding=encoding) + encrypted_data = encrypt(public_key, data_str) + return encrypted_data + except Exception as e: + raise EncryptionError(f"Encryption failed: {e}") + + def decrypt_data( + self, private_key: str, encrypted_data: bytes, encoding: str = "utf-8" + ) -> dict: + """ + Decrypts the given encrypted data using the provided private key. + + Args: + private_key (str): The private key in string format used for decription. + encrypted_data (bytes): The encrypted data to be decrypted. + encoding (str): Encoding format for the string representation of the data (default: 'utf-8). + + Returns: + dict: The decrypted data, converted back to a dictionary. + """ + if not isinstance(private_key, str): + raise TypeError(f"Private key must be of type str: {private_key}") + + if not isinstance(encrypted_data, bytes): + raise TypeError(f"Encrypted data must be of type bytes: {encrypted_data}") + + try: + decrypted_data = decrypt(private_key, encrypted_data) + decrypted_str = decrypted_data.decode(encoding=encoding) + decrypted_dict = json.loads(decrypted_str) + return decrypted_dict + except Exception as e: + raise DecryptionError(f"Decrption failed: {e}") diff --git a/tests/test_encryption.py b/tests/test_encryption.py new file mode 100644 index 0000000..508e306 --- /dev/null +++ b/tests/test_encryption.py @@ -0,0 +1,169 @@ +import pytest + +from bt_ddos_shield.encryption import ( + EncryptionManager, + EncryptionError, + DecryptionError, +) +from ecies.utils import generate_eth_key + +# Generate a valid pair of public and private keys for testing +eth_k = generate_eth_key() +private_key = eth_k.to_hex() +public_key = eth_k.public_key.to_hex() + +# Sample test data +valid_test_data = { + "name": "John", + "age": 32, + "github": {"login": "John", "events": [1, 2, 3]}, +} +empty_data_dict = {} +non_encrypted_bytes = b"This is not encrypted" +empty_byte = b"" +invalid_key_type = 123 +invalid_string_data = "invalid_data" + +# Pre-encrypt valid data for decryption tests. +pre_encrypted_data = EncryptionManager().encrypt_data( + public_key=public_key, data=valid_test_data +) + + +class TestEncryptionManager: + """ + Test suite for the EncryptionManager class. + """ + + def test_encrypt_data_valid(self): + """ + Test encryption with valid public key and data. + Ensures the returned encrypted data is of bytes type. + """ + encrypted_data = EncryptionManager().encrypt_data( + public_key=public_key, data=valid_test_data + ) + assert isinstance( + encrypted_data, bytes + ), "Encrypted dta should be of type bytes" + + def test_encrypt_data_invalid_public_key(self): + """ + Test encryption with an invalid public key (string that doens't represent a valid key). + Expects EncryptionError to be raised. + """ + with pytest.raises(EncryptionError): + EncryptionManager().encrypt_data( + public_key=invalid_string_data, data=valid_test_data + ) + + def test_encrypt_data_invalid_data_type(self): + """ + Test encryption with an invalid data type (non-dict data). + Expects TypeError to be raised. + """ + with pytest.raises(TypeError): + EncryptionManager().encrypt_data( + public_key=public_key, data=invalid_string_data + ) + + def test_encrypt_data_invalid_public_key_type(self): + """ + Test encryption with a public key of invalid type (e.g., integer). + Exepcts TypeError to be raised. + """ + with pytest.raises(TypeError): + EncryptionManager().encrypt_data( + public_key=invalid_key_type, data=valid_test_data + ) + + def test_encrypt_data_empty_data(self): + """ + Test encryption with an empty dictionary. + Expects encryption to succeed and return data of type bytes. + """ + encrypted_empty_data = EncryptionManager().encrypt_data( + public_key=public_key, data=empty_data_dict + ) + assert isinstance( + encrypted_empty_data, bytes + ), "Encrypted data should be of type bytes for an empty dict" + + def test_decrypt_data_valid(self): + """ + Test decryption with valid private key and encrypted data. + Ensures that the decrypted data matches the original dictionary. + """ + decrypted_data = EncryptionManager().decrypt_data( + private_key=private_key, encrypted_data=pre_encrypted_data + ) + assert isinstance(decrypted_data, dict), "Decrypted data should be of type dict" + assert ( + decrypted_data == valid_test_data + ), "Decrypted data should match the original data" + + def test_decrypt_data_empty_data(self): + """ + Test decryption with an empty dictionary. + Expects that the decrypted data matches the original dictionary. + """ + encrypted_empty_data = EncryptionManager().encrypt_data( + public_key=public_key, data=empty_data_dict + ) + decrypted_data = EncryptionManager().decrypt_data( + private_key=private_key, encrypted_data=encrypted_empty_data + ) + assert isinstance(decrypted_data, dict), "Decrypted data should be of type dict" + assert ( + decrypted_data == empty_data_dict + ), "Decrypted data should match the original data" + + def test_decrypt_data_invalid_private_key(self): + """ + Test decryption with an invalid private key (string that doesn't represnet a valid key). + Expects DecryptionError to be raised. + """ + with pytest.raises(DecryptionError): + EncryptionManager().decrypt_data( + private_key=invalid_string_data, encrypted_data=pre_encrypted_data + ) + + def test_decrypt_data_invalid_encrypted_data(self): + """ + Test decryption with invalid encrypted data (non-encrypted bytes). + Expects DecryptionError to be raised. + """ + with pytest.raises(DecryptionError): + EncryptionManager().decrypt_data( + private_key=private_key, encrypted_data=non_encrypted_bytes + ) + + def test_decrypt_data_invalid_private_key_type(self): + """ + Test decryption with a private key of invalid type (e.g., integer). + Expects TypeError to be raised. + """ + with pytest.raises(TypeError): + EncryptionManager().decrypt_data( + private_key=invalid_key_type, encrypted_data=pre_encrypted_data + ) + + def test_decrypt_data_invalid_encrypted_data_type(self): + """ + Test decryption with encrypted data of invalid type (e.g., integer). + Expects TypeError to be raised. + """ + with pytest.raises(TypeError): + EncryptionManager().decrypt_data( + private_key=private_key, encrypted_data=invalid_key_type + ) + + def test_decrypt_data_empty_encrypted_data(self): + """ + Test decryption with empty encrypted data (empty byte string). + Expects DecryptionError to be raised. + """ + with pytest.raises(DecryptionError): + EncryptionManager().decrypt_data( + private_key=private_key, encrypted_data=empty_byte + )