latest.ipv8.test.messaging.interfaces.dispatcher.test_endpoint

Module Contents

Classes

DummyEndpointListener

This class simply listens on an endpoint and stores incoming packets in a list.

DummyEndpoint

Non-functional endpoint for manual staging and inspection.

TestDispatcherEndpoint

This class contains various tests for the DispatcherEndpoint.

class latest.ipv8.test.messaging.interfaces.dispatcher.test_endpoint.DummyEndpointListener(endpoint: latest.ipv8.messaging.interfaces.endpoint.Endpoint)

Bases: latest.ipv8.messaging.interfaces.endpoint.EndpointListener

This class simply listens on an endpoint and stores incoming packets in a list.

on_packet(packet: tuple[latest.ipv8.types.Address, bytes]) None

Callback for incoming packets.

class latest.ipv8.test.messaging.interfaces.dispatcher.test_endpoint.DummyEndpoint

Bases: latest.ipv8.messaging.interfaces.endpoint.Endpoint

Non-functional endpoint for manual staging and inspection.

assert_open() None

Assert open state as usual.

is_open() bool

Check if opened as usual.

get_address() latest.ipv8.types.Address

Give a fake localhost IPv4 address.

send(socket_address: latest.ipv8.types.Address, packet: bytes) None

Intercept and store sends without actually sending.

async open() bool

Do a fake open.

close() None

Close as usual.

reset_byte_counters() None

Reset counters as usual.

notify_listeners(packet: tuple[latest.ipv8.types.Address, bytes]) None

Perform usual notification.

class latest.ipv8.test.messaging.interfaces.dispatcher.test_endpoint.TestDispatcherEndpoint(methodName: str = 'runTest')

Bases: latest.ipv8.test.base.TestBase

This class contains various tests for the DispatcherEndpoint.

RANDOM_DATA = b'data'
async static _produce_dummy() tuple[latest.ipv8.messaging.interfaces.dispatcher.endpoint.DispatcherEndpoint, DummyEndpoint, DummyEndpointListener]

Create and open a DispatcherEndpoint, dispatching to a dummy endpoint and listener.

async test_initialize_no_interfaces() None

Check if the DispatcherEndpoint can initialize and “send” without interfaces.

This is black-hole functionality and should not crash (though we can’t really assert anything here).

async test_dispatch_receive() None

Check if packet reception is correctly propagated from children.

async test_dispatch_send() None

Check if packet sending is correctly propagated to children.

async test_dispatch_send_specific() None

Check if packet sending is correctly propagated to children, with specific interface.

async test_is_open() None

Check if is_open is correctly propagated from children.

async test_remove_listener() None

Check if remove_listener is correctly propagated to children.

async test_byte_counters() None

Check if byte counters are correctly propagated from children.

async test_get_address() None

Check if get_address is correctly propagated from children.

async test_get_address_specific() None

Check if get_address is correctly propagated from children, with specific interface.

async test_add_prefix_listener() None

Check if add_prefix_listener is correctly propagated to children.

async test_guess_interface_ipv4() None

Check if guess_interface guesses IPv4 interfaces correctly.

async test_guess_interface_ipv6() None

Check if guess_interface guesses IPv6 interfaces correctly.

async test_guess_interface_ipv6_short() None

Check if guess_interface guesses shortened IPv6 interfaces correctly.

async test_guess_interface_ipv6_very_short() None

Check if guess_interface guesses zero-omitted IPv6 interfaces correctly.

async test_guess_interface_ipv6_ipv4map() None

Check if guess_interface guesses IPv4 mapped onto IPv6 interfaces correctly.

async test_guess_interface_unknown() None

Check if guess_interface guesses None if the interface is invalid.