From 475c33de1278bd452f507e359d94082badd6bd42 Mon Sep 17 00:00:00 2001 From: qstokkink Date: Tue, 26 Nov 2024 12:09:05 +0100 Subject: [PATCH] Added documentation for the DHT(Discovery)Community --- doc/further-reading/dht.rst | 44 +++++++++++++++++++++++ doc/further-reading/dht_1.py | 67 ++++++++++++++++++++++++++++++++++++ doc/index.rst | 8 +---- ipv8/dht/community.py | 59 ++++++++++++++++++++++--------- 4 files changed, 155 insertions(+), 23 deletions(-) create mode 100644 doc/further-reading/dht.rst create mode 100644 doc/further-reading/dht_1.py diff --git a/doc/further-reading/dht.rst b/doc/further-reading/dht.rst new file mode 100644 index 000000000..eba1e2bfb --- /dev/null +++ b/doc/further-reading/dht.rst @@ -0,0 +1,44 @@ +DHT(Discovery)Community +======================= + +This document contains a description of how to use the ``DHTCommunity`` class for distributed hash table (DHT) data storage and the ``DHTDiscoveryCommunity`` extension of this functionality, which provides functionality to connect given public keys. + +In particular this document will **not** discuss how distributed hash table work, for this we refer the reader to other resources on the Internet. + + +Storing values and finding keys +------------------------------- + +The ``DHTCommunity`` is the main overlay that allows for decentralized key-value storage. +There are two main functions in this overlay: the ``store_value()`` function and the ``find_values()`` function. + +When you call ``store_value()``, you choose the globally unique ``key`` that your given value ``data`` is stored under. +You can, but are not required to, sign this new stored value with your public key to provide it with authenticity. +Note that this function may lead to a ``ipv8.dht.DHTError``: in this case, you will have to try again later. +An example of a call that stores the signed value ``b"my value"`` under the key ``b"my key"``, is the following. + +.. literalinclude:: dht_1.py + :lines: 43-47 + +The value can later be retrieved from the network by calling ``find_values()`` with the key that the information was stored under. +The following snippet retrieves the value that was stored in the previous snippet, under the ``b"my key"`` key. + +.. literalinclude:: dht_1.py + :lines: 49-55 + +Note that multiple peers may respond with answers and if (a) the orginal value is not signed or (b) multiple values are published under the same key, the reported values may be different. +In this example, only one value is published and it is signed so only a single value is ever returned. + +Finding peers +------------- + +The ``DHTDiscoveryCommunity`` allows for peers to be found by their public key. +You can search for public keys by their SHA-1 hash (conveniently available as ``Peer.mid``). +To do so, you can call ``connect_peer()`` with the hash/mid as shown in the following example. + + +.. literalinclude:: dht_1.py + :lines: 58-65 + +Note that you may need a few attempts to find the peer you are looking for. +Of course, if the peer you are looking for is not online, you may be waiting forever. diff --git a/doc/further-reading/dht_1.py b/doc/further-reading/dht_1.py new file mode 100644 index 000000000..78b6ff547 --- /dev/null +++ b/doc/further-reading/dht_1.py @@ -0,0 +1,67 @@ +from asyncio import run, sleep +from itertools import combinations +from typing import cast + +from ipv8.configuration import ConfigBuilder +from ipv8.dht import DHTError +from ipv8.dht.community import DHTCommunity +from ipv8.dht.discovery import DHTDiscoveryCommunity +from ipv8.peer import Peer +from ipv8_service import IPv8 + + +async def main() -> None: + instances = [] + + # Put some peers in the network + for _ in range(10): + config = ConfigBuilder().clear_keys() + config.config["overlays"] = [o for o in config.config["overlays"] if o["class"] == "DHTDiscoveryCommunity"] + config.add_ephemeral_key("anonymous id") + config.set_address("127.0.0.1") # We don't want this test to connect to the actual network! + ipv8 = IPv8(config.finalize()) + instances.append(ipv8) + await ipv8.start() + + # Supercharge introductions, normally this takes longer + for id1, id2 in combinations(range(10), 2): + overlay1 = instances[id1].get_overlay(DHTCommunity) + overlay2 = instances[id2].get_overlay(DHTCommunity) + peer1 = Peer(overlay2.my_peer.public_key.key_to_bin(), ("127.0.0.1", overlay2.my_estimated_lan[1])) + peer1.address_frozen = True + peer2 = Peer(overlay1.my_peer.public_key.key_to_bin(), ("127.0.0.1", overlay1.my_estimated_lan[1])) + peer2.address_frozen = True + overlay1.network.add_verified_peer(peer2) + overlay1.get_requesting_node(peer2) + overlay2.network.add_verified_peer(peer1) + overlay2.get_requesting_node(peer1) + for i in range(10): + await instances[i].get_overlay(DHTDiscoveryCommunity).store_peer() + instances[i].get_overlay(DHTDiscoveryCommunity).ping_all() + + dht_community = cast(DHTCommunity, instances[0].get_overlay(DHTCommunity)) + try: + await dht_community.store_value(b"my key", b"my value", True) + print(dht_community.my_peer.public_key.key_to_bin(), "published b'my value' under b'my key'!") + except DHTError as e: + print("Failed to store my value under my key!", e) + + try: + results = await dht_community.find_values(b"my key") + print(f"We got results from {len(results)} peers!") + for value, signer_key in results: + print(f"The value {value} was found, signed by {signer_key}") + except DHTError as e: + print("Failed to find key!", e) + + dht_discovery_community = cast(DHTDiscoveryCommunity, instances[7].get_overlay(DHTDiscoveryCommunity)) + some_peer_mid = instances[2].keys["anonymous id"].mid + while True: + try: + await sleep(0.5) + await dht_discovery_community.connect_peer(some_peer_mid) + break + except DHTError as e: + print("Failed to connect to peer!", e) + +run(main()) diff --git a/doc/index.rst b/doc/index.rst index db94fefee..945b11687 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -82,13 +82,7 @@ Table of contents further-reading/advanced_identity.rst further-reading/advanced_peer_discovery.rst further-reading/anonymization.rst - -.. toctree:: - :maxdepth: 2 - :caption: Deprecated/Archive: - - deprecated/attestation_prototype.rst - deprecated/attestation_tutorial.rst + further-reading/dht.rst Search diff --git a/ipv8/dht/community.py b/ipv8/dht/community.py index 2f4ab9731..ced9f55a4 100644 --- a/ipv8/dht/community.py +++ b/ipv8/dht/community.py @@ -9,8 +9,9 @@ from binascii import hexlify, unhexlify from collections import defaultdict, deque from collections.abc import Coroutine, Iterator, Sequence +from functools import reduce from itertools import zip_longest -from typing import TYPE_CHECKING, Any, Optional, cast +from typing import TYPE_CHECKING, Any, Optional, cast, Literal, overload, Union, TypeVar from ..community import Community, CommunitySettings from ..lazy_community import lazy_wrapper, lazy_wrapper_wd @@ -83,6 +84,19 @@ async def gather_without_errors(*futures: Future) -> list: return [r for r in results if not isinstance(r, Exception)] +FindResultType = TypeVar("FindResultType", Node, DHTValue) + + +def merge_results(results: tuple[list[FindResultType], ...]) -> tuple[FindResultType, ...]: + """ + Merge the results from a tuple of lists into a flat tuple. + """ + out: list[FindResultType] = [] + for result in results: + out.extend(result) + return tuple(out) + + class Request(RandomNumberCache): """ This request cache keeps track of all outstanding requests within the DHTCommunity. @@ -624,36 +638,49 @@ def post_process_values(self, values: list[bytes]) -> list[DHTValue]: # Unsigned data return [*results, *((data[1], None) for data in unpacked[None])] + @overload + async def find(self, target: bytes, force_nodes: bool, offset: int, + debug: Literal[False]) -> tuple[DHTValue, ...] | tuple[Node, ...]: + ... + + @overload + async def find(self, target: bytes, force_nodes: bool, offset: int, + debug: Literal[True]) -> tuple[tuple[DHTValue, ...], list[Crawl]]: + ... + + @overload + async def find(self, target: bytes, force_nodes: bool, offset: int, + debug: bool) -> tuple[DHTValue, ...] | tuple[Node, ...] | tuple[tuple[DHTValue, ...], list[Crawl]]: + ... + async def find(self, target: bytes, force_nodes: bool, offset: int, - debug: bool) -> Sequence[DHTValue] | \ - tuple[Sequence[DHTValue], list[Crawl]] | \ - Sequence[Node]: + debug: bool) -> tuple[DHTValue, ...] | tuple[Node, ...] | tuple[tuple[DHTValue, ...], list[Crawl]]: """ Get the values belonging to the given target key. """ - futures: list[Coroutine[Any, Any, list[Node] | \ - list[DHTValue] | \ + futures: list[Coroutine[Any, Any, list[Node] | + list[DHTValue] | tuple[list[DHTValue], Crawl]]] = [] for routing_table in self.routing_tables.values(): crawl = Crawl(target, routing_table, force_nodes=force_nodes, offset=offset) futures.append(self._find(crawl, debug=debug)) - results: list[list[Any] | - list[DHTValue] | - tuple[list[DHTValue], Crawl]] = await gather(*futures) + results = await gather(*futures) if debug: - results_debug = cast(list[tuple[list[DHTValue], Crawl]], results) - return tuple(*[r[0] for r in results]), [r[1] for r in results_debug] - return tuple(*results) + results_debug = cast(tuple[tuple[list[DHTValue], Crawl], ...], results) + return tuple(*[r[0] for r in results_debug]), cast(list[Crawl], [r[1] for r in results_debug]) + # ``results`` is of type ``tuple[list[Node] | list[DHTValue], ...]`` + # However, mypy 1.13.0 is not yet powerful enough to infer the argument type from this. + return merge_results(results) # type: ignore[arg-type] async def find_values(self, target: bytes, offset: int = 0, - debug: bool = False) -> Sequence[DHTValue] | tuple[Sequence[DHTValue], list[Crawl]]: + debug: bool = False) -> (tuple[DHTValue, ...] + | tuple[Node, ...] + | tuple[tuple[DHTValue, ...], list[Crawl]]): """ Find the values belonging to the target key. """ - values = await self.find(target, False, offset, debug) - return (cast(tuple[Sequence[tuple[bytes, Optional[bytes]]], list[Crawl]], values) if debug - else cast(Sequence[tuple[bytes, Optional[bytes]]], values)) + return await self.find(target, False, offset, debug) async def find_nodes(self, target: bytes, debug: bool = False) -> Sequence[Node]: """