latest.ipv8.dht.community

Attributes

DHTValue

PING_INTERVAL

TOKEN_EXPIRATION_TIME

DHT_ENTRY_STR

DHT_ENTRY_STR_SIGNED

MAX_ENTRY_SIZE

MAX_ENTRY_AGE

MAX_CRAWL_NODES

MAX_CRAWL_REQUESTS

MAX_CRAWL_TASKS

MAX_VALUES_IN_STORE

MAX_VALUES_IN_FIND

MAX_NODES_IN_FIND

TARGET_NODES

FindResultType

Classes

Request

This request cache keeps track of all outstanding requests within the DHTCommunity.

Crawl

Class to manage crawls in the Community, in search of a specific target key.

DHTCommunity

Community for storing/finding key-value pairs.

Functions

gather_without_errors(→ list)

Gather only the successful results from the given futures.

merge_results(→ tuple[FindResultType, Ellipsis])

Merge the results from a tuple of lists into a flat tuple.

Module Contents

latest.ipv8.dht.community.DHTValue
latest.ipv8.dht.community.PING_INTERVAL = 25
latest.ipv8.dht.community.TOKEN_EXPIRATION_TIME = 600
latest.ipv8.dht.community.DHT_ENTRY_STR = 0
latest.ipv8.dht.community.DHT_ENTRY_STR_SIGNED = 1
latest.ipv8.dht.community.MAX_ENTRY_SIZE = 170
latest.ipv8.dht.community.MAX_ENTRY_AGE = 3600
latest.ipv8.dht.community.MAX_CRAWL_NODES = 8
latest.ipv8.dht.community.MAX_CRAWL_REQUESTS = 24
latest.ipv8.dht.community.MAX_CRAWL_TASKS = 4
latest.ipv8.dht.community.MAX_VALUES_IN_STORE = 8
latest.ipv8.dht.community.MAX_VALUES_IN_FIND = 8
latest.ipv8.dht.community.MAX_NODES_IN_FIND = 8
latest.ipv8.dht.community.TARGET_NODES = 8
async latest.ipv8.dht.community.gather_without_errors(*futures: asyncio.Future) list

Gather only the successful results from the given futures.

latest.ipv8.dht.community.FindResultType
latest.ipv8.dht.community.merge_results(results: tuple[list[FindResultType], Ellipsis]) tuple[FindResultType, Ellipsis]

Merge the results from a tuple of lists into a flat tuple.

class latest.ipv8.dht.community.Request(community: DHTCommunity, msg_type: str, node: latest.ipv8.dht.routing.Node, params: list | None = None, consume_errors: bool = False, timeout: float = 5.0)

Bases: latest.ipv8.requestcache.RandomNumberCache

This request cache keeps track of all outstanding requests within the DHTCommunity.

msg_type
node
params = None
future: asyncio.Future
start_time
consume_errors = False
timeout = 5.0
property timeout_delay: float

The time in seconds after which this request is to be timed out.

on_timeout() None

Cancel our completion future, if it’s not already done.

on_complete() None

Update our associated node’s success metrics.

class latest.ipv8.dht.community.Crawl(target: bytes, routing_table: latest.ipv8.dht.routing.RoutingTable, force_nodes: bool = False, offset: int = 0)

Class to manage crawls in the Community, in search of a specific target key.

class TodoNode

Node that still need to be contacted.

node_to_contact: latest.ipv8.dht.routing.Node
node_to_puncture: latest.ipv8.dht.routing.Node | None
__iter__() collections.abc.Iterator

Make this dataclass iterable.

target
routing_table
nodes_todo: list[Crawl]
nodes_tried: set[latest.ipv8.dht.routing.Node]
responses: list[tuple[latest.ipv8.dht.routing.Node, dict[str, list[bytes] | list[latest.ipv8.dht.routing.Node]]]] = []
force_nodes = False
offset = 0
add_response(sender: latest.ipv8.dht.routing.Node, response: dict[str, list[bytes] | list[latest.ipv8.dht.routing.Node]]) None

Register a response by the given node.

property done: bool

Check if we exhausted our tries, or we have no nodes left to query.

property cache_candidate: latest.ipv8.dht.routing.Node | None

Return closest node to the target that did not respond with values.

property values: list[bytes]

Get the currently known values for our crawl.

property nodes: list[latest.ipv8.dht.routing.Node]

Get the nodes we have already contacted to retrieve values.

class latest.ipv8.dht.community.DHTCommunity(settings: latest.ipv8.community.CommunitySettings)

Bases: latest.ipv8.community.Community

Community for storing/finding key-value pairs.

community_id
network
storages: dict[type[latest.ipv8.messaging.interfaces.udp.endpoint.Address], latest.ipv8.dht.storage.Storage]
routing_tables: dict[type[latest.ipv8.messaging.interfaces.udp.endpoint.Address], latest.ipv8.dht.routing.RoutingTable]
request_cache
tokens: dict[bytes, tuple[float, bytes]]
token_secrets: collections.deque[bytes]
get_serializer() latest.ipv8.messaging.serialization.Serializer

Extend our serializer with a node list packer.

get_available_strategies() dict[str, type[latest.ipv8.peerdiscovery.discovery.DiscoveryStrategy]]

Extend our available strategies with our maintenance strategy.

async unload() None

Shut down our request cache and then unload the overlay.

get_address_class(node: latest.ipv8.peer.Peer) type[latest.ipv8.messaging.interfaces.udp.endpoint.Address]

Get the class of the given node’s address.

get_routing_table(node: latest.ipv8.peer.Peer) latest.ipv8.dht.routing.RoutingTable

Get the routing table that the given node belongs to.

get_storage(node: latest.ipv8.dht.routing.Node) latest.ipv8.dht.storage.Storage

Get or create a Storage object for the given node.

get_my_node_id(node: latest.ipv8.peer.Peer) bytes

Get our own node id to share with the given node.

get_requesting_node(peer: latest.ipv8.peer.Peer) latest.ipv8.dht.routing.Node | None

Add the given peer to the appropriate routing table and return it or None if it is already blocked.

introduction_request_callback(peer: latest.ipv8.peer.Peer, dist: latest.ipv8.messaging.payload_headers.GlobalTimeDistributionPayload, payload: latest.ipv8.messaging.payload.IntroductionRequestPayload | latest.ipv8.messaging.payload.NewIntroductionRequestPayload) None

Call our node discovery logic when an introduction request is received from it.

introduction_response_callback(peer: latest.ipv8.peer.Peer, dist: latest.ipv8.messaging.payload_headers.GlobalTimeDistributionPayload, payload: latest.ipv8.messaging.payload.IntroductionResponsePayload | latest.ipv8.messaging.payload.NewIntroductionResponsePayload) None

Call our node discovery logic when an introduction response is received from it.

on_node_discovered(public_key_bin: bytes, source_address: latest.ipv8.messaging.interfaces.udp.endpoint.Address) None

Handler for potentially new nodes.

ping(node: latest.ipv8.dht.routing.Node) asyncio.Future

Send a ping to the given node.

on_ping_request(peer: latest.ipv8.peer.Peer, payload: latest.ipv8.dht.payload.PingRequestPayload, data: bytes) None

When we receive a ping request through a valid node, send a response.

on_ping_response(peer: latest.ipv8.peer.Peer, payload: latest.ipv8.dht.payload.PingResponsePayload, data: bytes) None

When receive a response to our ping, update the node’s metrics.

serialize_value(data: bytes, sign: bool = True) bytes

Serialize the given bytes.

unserialize_value(value: bytes) tuple[bytes, bytes | None, int] | None

Unserialize data from the given serialized value.

add_value(key: bytes, value: bytes, storage: latest.ipv8.dht.storage.Storage, max_age: float = MAX_ENTRY_AGE) None

Add a serialized value under the given key into a storage.

async store_value(key: bytes, data: bytes, sign: bool = False) list[latest.ipv8.dht.routing.Node]

Attempt to store a value at the given key and return the nodes that store it.

async _store(key: bytes, value: bytes) list[latest.ipv8.dht.routing.Node]

Attempt to store a serialized value at the given key and return the nodes that store it.

async store_on_nodes(key: bytes, values: list[bytes], nodes: list[latest.ipv8.dht.routing.Node]) list[latest.ipv8.dht.routing.Node]

Store the given values under a given key on the given nodes.

on_store_request(peer: latest.ipv8.peer.Peer, payload: latest.ipv8.dht.payload.StoreRequestPayload) None

Store or forward the given requested value.

on_store_response(peer: latest.ipv8.peer.Peer, payload: latest.ipv8.dht.payload.StoreResponsePayload) None

We got confirmation of storage.

_send_find_request(node: latest.ipv8.dht.routing.Node, target: bytes, force_nodes: bool, offset: int = 0) asyncio.Future
async _contact_node(crawl: Crawl, node: latest.ipv8.dht.routing.Node, puncture_node: latest.ipv8.dht.routing.Node) None
async _find(crawl: Crawl, debug: bool = False) list[latest.ipv8.dht.routing.Node] | list[DHTValue] | tuple[list[DHTValue], Crawl]
post_process_values(values: list[bytes]) list[DHTValue]

Unpack signed and unsigned values and filter out duplicates.

async find(target: bytes, force_nodes: bool, offset: int, debug: Literal[False]) tuple[DHTValue, Ellipsis] | tuple[latest.ipv8.dht.routing.Node, Ellipsis]
async find(target: bytes, force_nodes: bool, offset: int, debug: Literal[True]) tuple[tuple[DHTValue, Ellipsis], list[Crawl]]
async find(target: bytes, force_nodes: bool, offset: int, debug: bool) tuple[DHTValue, Ellipsis] | tuple[latest.ipv8.dht.routing.Node, Ellipsis] | tuple[tuple[DHTValue, Ellipsis], list[Crawl]]

Get the values belonging to the given target key.

async find_values(target: bytes, offset: int = 0, debug: bool = False) tuple[DHTValue, Ellipsis] | tuple[latest.ipv8.dht.routing.Node, Ellipsis] | tuple[tuple[DHTValue, Ellipsis], list[Crawl]]

Find the values belonging to the target key.

async find_nodes(target: bytes, debug: bool = False) collections.abc.Sequence[latest.ipv8.dht.routing.Node]

Find the values belonging to the target key.

on_find_request(peer: latest.ipv8.peer.Peer, payload: latest.ipv8.dht.payload.FindRequestPayload) None

Try to perform a search for the requested target.

on_find_response(peer: latest.ipv8.peer.Peer, payload: latest.ipv8.dht.payload.FindResponsePayload) None

We got a response for our find requests.

async node_maintenance() None

Refresh old result values.

value_maintenance() None

Remove expired values from the storage objects.

token_maintenance() None

Make sure tokens are periodically refreshed.

generate_token(node: latest.ipv8.dht.routing.Node) bytes

Generate a token for the given node.

check_token(node: latest.ipv8.dht.routing.Node, token: bytes) bool

Check if the presented token is valid for the given node.