Running simulations with IPv8

In this tutorial we describe how to run simulations with IPv8. Running simulations can be convenient if a developer quickly wants to test out some overlay communication without consuming local resources (e.g., network sockets), or when replaying longitudinal data traces. Simulations provide granular control over the experiment and allows for advanced customization, e.g., simulating network latencies between peers.

All code associated with simulations are provided in the simulation package. IPv8 simulation uses a customized asyncio event loop to ensure that commands (e.g., asyncio.sleep(2)) proceed in single time steps without actually waiting for the same real-time period. The source code for this functionality can be found in the discrete_loop.py file in the simulation package.

Code Example

Below we provide a Python snippet that runs a simple simulation. In this example, we start two IPv8 instances with a SimulatedEndpoint as endpoint. Like the unit tests, the LAN/WAN addresses of peers are randomly generated and any communication to a peer that is not part of our simulation will raise an exception. Each instance loads a single PingPongCommunity overlay and peers are introduced to each other after IPv8 has started. Each peer sends a ping message to the other peer every two seconds. The sending and reception of ping and pong message are printed to standard output, together with the current time of the event loop. The simulation ends after ten seconds.

import os
from asyncio import ensure_future, get_event_loop, set_event_loop, sleep

from pyipv8.ipv8.community import Community
from pyipv8.ipv8.configuration import ConfigBuilder
from pyipv8.ipv8.lazy_community import lazy_wrapper
from pyipv8.ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from pyipv8.ipv8_service import IPv8
from pyipv8.simulation.discrete_loop import DiscreteLoop
from pyipv8.simulation.simulation_endpoint import SimulationEndpoint


@vp_compile
class PingMessage(VariablePayload):
    msg_id = 1


@vp_compile
class PongMesage(VariablePayload):
    msg_id = 2


class PingPongCommunity(Community):
    """
    This basic community sends ping messages to other known peers every two seconds.
    """
    community_id = os.urandom(20)

    def __init__(self, my_peer, endpoint, network):
        super().__init__(my_peer, endpoint, network)
        self.add_message_handler(1, self.on_ping_message)
        self.add_message_handler(2, self.on_pong_message)

    def started(self):
        self.register_task("send_ping", self.send_ping, interval=2.0, delay=0)

    def send_ping(self):
        self.logger.info("🔥 <t=%.1f> peer %s sending ping", get_event_loop().time(), self.my_peer.address)
        for peer in self.network.verified_peers:
            self.ez_send(peer, PingMessage())

    @lazy_wrapper(PingMessage)
    def on_ping_message(self, peer, payload):
        self.logger.info("🔥 <t=%.1f> peer %s received ping", get_event_loop().time(), self.my_peer.address)
        self.logger.info("🧊 <t=%.1f> peer %s sending pong", get_event_loop().time(), self.my_peer.address)
        self.ez_send(peer, PongMesage())

    @lazy_wrapper(PongMesage)
    def on_pong_message(self, peer, payload):
        self.logger.info("🧊 <t=%.1f> peer %s received pong", get_event_loop().time(), self.my_peer.address)


async def start_communities():
    instances = []
    for i in [1, 2]:
        builder = ConfigBuilder().clear_keys().clear_overlays()
        builder.add_key("my peer", "medium", f"ec{i}.pem")
        builder.add_overlay("PingPongCommunity", "my peer", [], [], {}, [('started',)])

        endpoint = SimulationEndpoint()
        instance = IPv8(builder.finalize(), endpoint_override=endpoint,
                        extra_communities={'PingPongCommunity': PingPongCommunity})
        await instance.start()
        instances.append(instance)

    # Introduce peers to each other
    for from_instance in instances:
        for to_instance in instances:
            if from_instance == to_instance:
                continue
            from_instance.overlays[0].walk_to(to_instance.endpoint.wan_address)


async def run_simulation():
    await start_communities()
    await sleep(10)
    get_event_loop().stop()

# We use a discrete event loop to enable quick simulations.
loop = DiscreteLoop()
set_event_loop(loop)

ensure_future(run_simulation())

loop.run_forever()

The endpoint used in the above code assumes that packets arrive immediately at the recipient. This is unrealistic in deployed peer-to-peer networks where link latency can be considerable, especially when considering communication between peers in different continents. Developers can simulate link latencies by modifying the latencies class variable, or by overriding the get_link_latency method. Link latencies can, for example, be set to random values or be determined by a latency matrix.

Using the Bootstrap Server

In the example above, we explicitly introduce peers to each other by sending introduction requests between all pairs of peers. While this ensures full connectivity, it might not reflect the behaviours in deployed networks where peer discovery is a gradual and dynamic process. Peer discovery in IPv8 is assisted by bootstrap servers that maintain knowledge of the current active peers in the network and are able to introduce (a few) peers to newly joined peers to get them started.

Below we show a Python snippet running a simple simulation with two simulated bootstrap servers and 20 peers. The experiments simulates a period of two minutes and every two seconds, a peer prints how many other peers it has discovered. When the simulation ends, each peer knows around ten other peers. Note that we also add a RandomWalk strategy to each initialized community.

import os
from asyncio import ensure_future, get_event_loop, set_event_loop, sleep

from pyipv8.ipv8.community import Community
from pyipv8.ipv8.configuration import Bootstrapper, BootstrapperDefinition, ConfigBuilder, Strategy, WalkerDefinition
from pyipv8.ipv8_service import IPv8
from pyipv8.scripts.tracker_service import EndpointServer
from pyipv8.simulation.discrete_loop import DiscreteLoop
from pyipv8.simulation.simulation_endpoint import SimulationEndpoint


class SimpleCommunity(Community):
    """
    Very basic community that just prints the number of known peers every two seconds.
    """
    community_id = os.urandom(20)

    def __init__(self, my_peer, endpoint, network, **kwargs):
        super().__init__(my_peer, endpoint, network)
        self.id = kwargs.pop('id')

    def started(self):
        self.register_task("print_peers", self.print_peers, interval=2.0, delay=0)

    def print_peers(self):
        self._logger.info("I am peer %d, I know %d peers...", self.id, len(self.network.verified_peers))


async def start_communities():
    bootstrap_ips = []
    for i in range(2):  # We start two bootstrap nodes
        bootstrap_endpoint = SimulationEndpoint()
        await bootstrap_endpoint.open()
        bootstrap_overlay = EndpointServer(bootstrap_endpoint)

        # We assume all peers in our simulation have a public WAN IP (to avoid conflicts with our SimulationEndpoint)
        bootstrap_overlay.my_estimated_lan = ("0.0.0.0", 0)
        bootstrap_overlay.my_estimated_wan = bootstrap_endpoint.wan_address
        bootstrap_ips.append(bootstrap_endpoint.wan_address)

    instances = []
    for i in range(20):
        builder = ConfigBuilder().clear_keys().clear_overlays()
        builder.add_key("my peer", "medium", f"ec{i}.pem")
        bootstrappers = [BootstrapperDefinition(Bootstrapper.DispersyBootstrapper,
                                                {"ip_addresses": bootstrap_ips, "dns_addresses": []})]
        walkers = [WalkerDefinition(Strategy.RandomWalk, 10, {'timeout': 3.0})]
        builder.add_overlay("SimpleCommunity", "my peer", walkers, bootstrappers, {'id': i}, [('started',)])

        endpoint = SimulationEndpoint()
        instance = IPv8(builder.finalize(), endpoint_override=endpoint,
                        extra_communities={'SimpleCommunity': SimpleCommunity})
        # We assume all peers in our simulation have a public WAN IP (to avoid conflicts with our SimulationEndpoint)
        instance.overlays[0].my_estimated_lan = ("0.0.0.0", 0)
        instance.overlays[0].my_estimated_wan = endpoint.wan_address
        await instance.start()
        instances.append(instance)


async def run_simulation():
    await start_communities()
    await sleep(120)
    get_event_loop().stop()

# We use a discrete event loop to enable quick simulations.
loop = DiscreteLoop()
set_event_loop(loop)

ensure_future(run_simulation())

loop.run_forever()