Unit Testing Overlays

This document assumes you have a basic understanding of network overlays in IPv8, as documented in the overlay tutorial. You will learn how to use the IPv8’s TestBase class to unit test your overlays.

Files

This tutorial will place all of its files in the ~/Documents/ipv8_tutorial directory. You are free to choose whatever directory you want, to place your files in. This tutorial uses the following files in the working directory:

community.py
test_community.py

We will use the following community.py in this tutorial:

import os
import unittest
from typing import TYPE_CHECKING, Any

from ipv8.community import Community, CommunitySettings
from ipv8.requestcache import NumberCache, RequestCache
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8

if TYPE_CHECKING:
    from ipv8.messaging.payload import IntroductionRequestPayload
    from ipv8.messaging.payload_headers import GlobalTimeDistributionPayload
    from ipv8.types import Peer


class MyCache(NumberCache):

    def __init__(self, request_cache: RequestCache, overlay: MyCommunity) -> None:
        super().__init__(request_cache, "", 0)
        self.overlay = overlay

    def on_timeout(self) -> None:
        self.overlay.timed_out = True


class MyCommunitySettings(CommunitySettings):
    some_constant: int | None = None


class MyCommunity(Community):
    community_id = os.urandom(20)
    settings_class = MyCommunitySettings

    def __init__(self, settings: MyCommunitySettings) -> None:
        super().__init__(settings)
        self.request_cache = RequestCache()

        self._some_constant = 42 if settings.some_constant is None else settings.some_constant
        self.last_peer = None
        self.timed_out = False

    async def unload(self) -> None:
        await self.request_cache.shutdown()
        await super().unload()

    def some_constant(self) -> int:
        return self._some_constant

    def introduction_request_callback(self, peer: Peer,
                                      dist: GlobalTimeDistributionPayload,
                                      payload: IntroductionRequestPayload) -> None:
        self.last_peer = peer

    def add_cache(self) -> None:
        self.request_cache.add(MyCache(self.request_cache, self))

You’re encouraged to fill test_community.py yourself as you read through this tutorial.

Why and How?

After playing around with your first overlay, you may have discovered that running multiple processes and configuring your communities to test functionality is not very easy or reproducible. We certainly have. Therefore, we have created the TestBase class with all the tools you need to mock the Internet and make beautiful unit tests.

Because TestBase is a subclass of unittest.TestCase you can use common unit testing convenience methods, like testEqual, testTrue, setUp, setUpClass, etc. This also means that TestBase can be used with just about any test runner out there (like unittest, nosetests or pytest).

The way we will run our unit tests in this tutorial is with:

python3 -m unittest test_community.py

If you have custom logic in your subclass, please make sure to call your super() methods. Here’s an example of custom setUp and tearDown methods:

class MyTests(TestBase[MyCommunity]):

    def setUp(self) -> None:
        super().setUp()
        # Insert your setUp logic here

    async def tearDown(self) -> None:
        await super().tearDown()
        # Insert your tearDown logic here

Deadlock Detection

Before you start testing, you need to be warned about TestBase.MAX_TEST_TIME. By default, TestBase.MAX_TEST_TIME is set to 10 seconds. This means that if your testing class takes more than 10 seconds, TestBase will terminate it.

We should probably mention that in proper software engineering a unit test case should never take 10 seconds. However, we’re not here to judge. If you want this timeout increased, simply overwrite the value of MAX_TEST_TIME in your subclass. For example:

class MyTests(TestBase):

    MAX_TEST_TIME = 30.0  # Now this class can take 30 seconds

Creating Instances

The initialize() method takes care of initializing your Community subclass for you. It’s as easy as this:

    async def test_call(self) -> None:
        """
        Create a MyCommunity and check the output of some_constant().
        """
        # Create 1 MyCommunity
        self.initialize(MyCommunity, 1)

        # Nodes are 0-indexed
        value = self.overlay(0).some_constant()

        self.assertEqual(42, value)

What happened here? First, we instructed TestBase to create 1 instance of MyCommunity using initialize(). As a side note: the raw information needed to make this happen (the mocking of the Internet and the interconnection of overlays) is actually stored in the nodes list of TestBase. Second, we ask our TestBase to give us the overlay instance of node 0, which is our only node. The overlay() method is one of the many convenience methods in TestBase to access common data in overlays. We’ll provide a complete list of these convenience methods later in this document. Last, we use a common unittest.TestCase assertion to check if our some_constant() overlay method returned 42.

In some cases, you might need to give additional parameters to your Community’s __init__() method. In these cases, you can simply add additional keyword arguments to initialize().

    async def test_call2(self) -> None:
        """
        Create a MyCommunity with a custom some_constant.
        """
        self.initialize(MyCommunity, 1, MyCommunitySettings(some_constant=7))

        value = self.overlay(0).some_constant()

        self.assertEqual(7, value)

In yet more advanced use cases, you may want to provide your own MockIPv8 instances. This will usually be the case if your Community instance only supports specific keys. Commonly, Community instances may choose to only support curve25519 keys, which you can do as follows:

    def create_node(self, *args: Any, **kwargs) -> MockIPv8:
        return MockIPv8("curve25519", self.overlay_class, *args, **kwargs)

Communication

You should now be able to create Community instances and call their methods. However, these instances are not communicating with each other yet. Take note of this code in our Community instance that stores the last peer that sent us an introduction request:

    def introduction_request_callback(self, peer: Peer,
                                      dist: GlobalTimeDistributionPayload,
                                      payload: IntroductionRequestPayload) -> None:
        self.last_peer = peer

This code simply stores whatever Peer object last sent us a request. We’ll create a unit test to test whether this happened:

    async def test_intro_called(self) -> None:
        """
        Check if we got a request from another MyCommunity.
        """
        self.initialize(MyCommunity, 2)

        # We have the overlay of Peer 0 send a message to Peer 1.
        self.overlay(0).send_introduction_request(self.peer(1))
        # Our test is running in the asyncio main thread!
        # Let's yield to allow messages to be passed.
        await self.deliver_messages()

        # Peer 1 should have received the message from Peer 0.
        self.assertEqual(self.peer(0), self.overlay(1).last_peer)

Let’s run through this example. First we create two instances of MyCommunity using initialize(). Second, we instruct the first node in our test to send a message to our second node. Here send_introduction_request() creates and sends a message to another peer and deliver_messages() allows it to be received. Lastly, we assert that our second node received a message from our first node.

Note that the asyncio programming model of Python executes its events on the main thread (the event loop), including this test case and the communication that is caused by it. In other words, since the test itself is occupying the main thread, the messaging will only happen after our test is finished! By the time it is allowed to execute, the communication is already cancelled. The deliver_messages() method backs off for a given amount of time and then waits for the main thread to be freed.

Now comes the caveat. The main thread being freed may not mean your Community is actually done doing stuff. It is possible to schedule asyncio events in such a way that deliver_messages() can’t detect them. This commonly happens when you use threading or hardware (like sockets). In these exceptional cases, you can use asyncio.sleep() or, better yet, await a custom Future in the test.

Piggybacking on Introductions

Some Community instances prefer to piggyback information onto introductions. As TestBase simply adds peers to each other directly, this piggybacked information is not sent. The introduce_nodes() method allows you to send these introductions anyway, used as follows (note the absence of deliver_messages()):

    async def test_intro_called2(self) -> None:
        """
        Check if we got a request from another MyCommunity.
        """
        self.initialize(MyCommunity, 2)

        await self.introduce_nodes()

        self.assertEqual(self.peer(0), self.overlay(1).last_peer)
        self.assertEqual(self.peer(1), self.overlay(0).last_peer)

Using the RequestCache

In real Community instances, you will have many timeouts and lots of timeout logic in caches. To make it easier to trigger these timeouts in the RequestCache, we use the passthrough() method. Here’s an example:

    async def test_passthrough(self) -> None:
        """
        Check if a cache time out is properly handled.
        """
        self.initialize(MyCommunity, 1)

        with self.overlay(0).request_cache.passthrough():
            self.overlay(0).add_cache()
        await self.deliver_messages()

        self.assertTrue(self.overlay(0).timed_out)

In this example we use the passthrough() contextmanager while we invoke a function that adds a cache. This causes the timeout of the MyCache cache we add inside add_cache to be nullified and instantly fire. Do note that this timeout occurs in the asyncio event loop and we need to allow it to fire. To yield the main thread we use deliver_messages() again (though in this case await asyncio.sleep(0.0) would have also done the trick).

In some complex cases you may have more than one type of cache being added. In these cases you can add a filter to passthrough() to make it only nullify some particular classes (simply add these classes as arguments to passthrough()):

    async def test_passthrough2(self) -> None:
        """
        Check if a cache time out is properly handled.
        """
        self.initialize(MyCommunity, 1)

        with self.overlay(0).request_cache.passthrough(MyCache):
            self.overlay(0).add_cache()
        await self.deliver_messages()

        self.assertTrue(self.overlay(0).timed_out)

Fragile Packet Handling

By default IPv8 adds a general exception handler in Community instances, to disallow external messages crashing you. However, when testing, this exception handler is removed by TestBase. If you want to enable the general exception handler again, you can either add your class to the production_overlay_classes list or overwrite TestBase.patch_overlays(). For example:

    def patch_overlays(self, i: int) -> None:
        if i == 1:
            pass  # We'll run the general exception handler for Peer 1
        else:
            super().patch_overlays(i)

Temporary Files

In some cases, you may require temporary files in your unit tests. TestBase exposes the temporary_directory() method to generate directories for these files. This method is random.seed() compatible. Normally, TestBase will clean up these files automatically for you. However, if you hard-crash TestBase before its tearDown is invoked, the temporary directories will not be cleaned up.

The temporary directory names are prefixed with _temp_ and use a uuid as a unique name. The temporary directories will be created in the current working directory for the mechanism to work on all supported platforms (Windows, Mac and Linux) even with limited permissions.

Asserting Message Delivery

You may want to assert that an overlay receives certain messages after a function. The TestBase exposes the assertReceivedBy() function to do just that. We’ll run through its functionality by example.

Most of the time, you will want to check if a peer received certain messages. In the following example peer 0 first sends message 1 and then sends message 2 to peer 1. The following construction asserts this:

    with self.assertReceivedBy(1, [Message1, Message2]):
        self.overlay(0).send_msg_to(self.peer(1), 1)
        self.overlay(0).send_msg_to(self.peer(1), 2)
        await self.deliver_messages()

Sometimes, you can’t be sure in what order messages are sent. In these cases you can use ordered=False:

    with self.assertReceivedBy(1, [Message1, Message2, Message2], ordered=False):
        messages = [2, 1, 2]
        shuffle(messages)
        self.overlay(0).send_msg_to(self.peer(1), messages[0])
        self.overlay(0).send_msg_to(self.peer(1), messages[1])
        self.overlay(0).send_msg_to(self.peer(1), messages[2])
        await self.deliver_messages()

In other cases, your overlay may be sending messages which you cannot control and/or which you don’t care about. In these cases you can set a filter to only include the messages you want:

    with self.assertReceivedBy(1, [Message1, Message2], message_filter=[Message1, Message2]):
        self.overlay(0).send_msg_to(self.peer(1), 1)
        if random() > 0.5:
            self.overlay(0).send_msg_to(self.peer(1), 3)
        self.overlay(0).send_msg_to(self.peer(1), 2)
        await self.deliver_messages()

It may also be helpful to inspect the contents of each payload. You can simply use the return value of the assert function to perform further inspection:

    with self.assertReceivedBy(1, [Message1, Message2]) as received_messages:
        self.overlay(0).send_msg_to(self.peer(1), 1)
        self.overlay(0).send_msg_to(self.peer(1), 2)
        await self.deliver_messages()

    message1, message2 = received_messages
    self.assertEqual(1, message1.value)
    self.assertEqual(2, message2.value)

If you want to use assertReceivedBy(), make sure that:

  1. Your overlay message handlers only handle a single payload.

  2. Your messages specify a msg_id.

  3. Your messages are compatible with ez_send() and lazy_wrapper_*().

Shortcut Reference

As usual, there is an easy and a hard way to do everything in IPv8. You are welcome to call self.nodes[i].my_peer.public_key.key_to_bin() manually every time you wish to access the public key of node i. Or, instead, you may use the available shortcut self.key_bin(i). You may find your unit test become a lot more readable if you use the available TestBase shortcuts though.

Available TestBase shortcuts

method

description

address(i)

The IPv4 address of peer i.

endpoint(i)

The Endpoint instance of peer i.

key_bin(i)

The serialized public key (bytes) of peer i.

key_bin_private(i)

The serialized private key (bytes) of peer i.

mid(i)

The SHA-1 of the public key of peer i.

my_peer(i)

The private my_peer Peer instance of peer i.

network(i)

The Network instance of peer i.

node(i)

The MockIPv8 instance of peer i.

overlay(i)

The Community instance of peer i.

peer(i)

The public Peer instance of peer i.

private_key(i)

The private key instance of peer i.

public_key(i)

The public key instance of peer i.

You are encouraged to add shortcuts that may be relevant to your own Community instance in your own test class.