Unit Testing Overlays
This document assumes you have a basic understanding of network overlays in IPv8, as documented in the overlay tutorial.
You will learn how to use the IPv8’s
TestBase class to unit test your overlays.
This tutorial will place all of its files in the
You are free to choose whatever directory you want, to place your files in.
This tutorial uses the following files in the working directory:
We will use the following
community.py in this tutorial:
import os import unittest from typing import TYPE_CHECKING, Any from ipv8.community import Community, CommunitySettings from ipv8.requestcache import NumberCache, RequestCache from ipv8.test.base import TestBase from ipv8.test.mocking.ipv8 import MockIPv8 if TYPE_CHECKING: from ipv8.messaging.payload import IntroductionRequestPayload from ipv8.messaging.payload_headers import GlobalTimeDistributionPayload from ipv8.types import Peer class MyCache(NumberCache): def __init__(self, request_cache: RequestCache, overlay: MyCommunity) -> None: super().__init__(request_cache, "", 0) self.overlay = overlay def on_timeout(self) -> None: self.overlay.timed_out = True class MyCommunitySettings(CommunitySettings): some_constant: int | None = None class MyCommunity(Community): community_id = os.urandom(20) settings_class = MyCommunitySettings def __init__(self, settings: MyCommunitySettings) -> None: super().__init__(settings) self.request_cache = RequestCache() self._some_constant = 42 if settings.some_constant is None else settings.some_constant self.last_peer = None self.timed_out = False async def unload(self) -> None: await self.request_cache.shutdown() await super().unload() def some_constant(self) -> int: return self._some_constant def introduction_request_callback(self, peer: Peer, dist: GlobalTimeDistributionPayload, payload: IntroductionRequestPayload) -> None: self.last_peer = peer def add_cache(self) -> None: self.request_cache.add(MyCache(self.request_cache, self))
You’re encouraged to fill
test_community.py yourself as you read through this tutorial.
Why and How?
After playing around with your first overlay, you may have discovered that running multiple processes
and configuring your communities to test functionality is not very easy or reproducible.
We certainly have.
Therefore, we have created the
TestBase class with all the tools you need to mock the Internet and
make beautiful unit tests.
TestBase is a subclass of
unittest.TestCase you can use common unit testing convenience methods, like
This also means that
TestBase can be used with just about any test runner out there
The way we will run our unit tests in this tutorial is with:
python3 -m unittest test_community.py
If you have custom logic in your subclass, please make sure to call your
Here’s an example of custom
class MyTests(TestBase[MyCommunity]): def setUp(self) -> None: super().setUp() # Insert your setUp logic here async def tearDown(self) -> None: await super().tearDown() # Insert your tearDown logic here
Before you start testing, you need to be warned about
TestBase.MAX_TEST_TIME is set to 10 seconds.
This means that if your testing class takes more than 10 seconds,
TestBase will terminate it.
We should probably mention that in proper software engineering a unit test case should never take 10 seconds.
However, we’re not here to judge.
If you want this timeout increased, simply overwrite the value of
MAX_TEST_TIME in your subclass.
class MyTests(TestBase): MAX_TEST_TIME = 30.0 # Now this class can take 30 seconds
initialize() method takes care of initializing your
Community subclass for you.
It’s as easy as this:
async def test_call(self) -> None: """ Create a MyCommunity and check the output of some_constant(). """ # Create 1 MyCommunity self.initialize(MyCommunity, 1) # Nodes are 0-indexed value = self.overlay(0).some_constant() self.assertEqual(42, value)
What happened here?
First, we instructed
TestBase to create 1 instance of
As a side note: the raw information needed to make this happen (the mocking of the Internet and the interconnection
of overlays) is actually stored in the
nodes list of
Second, we ask our
TestBase to give us the overlay instance of node 0, which is our only node.
overlay() method is one of the many convenience methods in
TestBase to access common data in overlays.
We’ll provide a complete list of these convenience methods later in this document.
Last, we use a common
unittest.TestCase assertion to check if our
some_constant() overlay method returned 42.
In some cases, you might need to give additional parameters to your
In these cases, you can simply add additional keyword arguments to
async def test_call2(self) -> None: """ Create a MyCommunity with a custom some_constant. """ self.initialize(MyCommunity, 1, MyCommunitySettings(some_constant=7)) value = self.overlay(0).some_constant() self.assertEqual(7, value)
In yet more advanced use cases, you may want to provide your own
This will usually be the case if your
Community instance only supports specific keys.
Community instances may choose to only support
curve25519 keys, which you can do as follows:
def create_node(self, *args: Any, **kwargs) -> MockIPv8: return MockIPv8("curve25519", self.overlay_class, *args, **kwargs)
You should now be able to create
Community instances and call their methods.
However, these instances are not communicating with each other yet.
Take note of this code in our
Community instance that stores the last peer that sent us an introduction request:
def introduction_request_callback(self, peer: Peer, dist: GlobalTimeDistributionPayload, payload: IntroductionRequestPayload) -> None: self.last_peer = peer
This code simply stores whatever
Peer object last sent us a request.
We’ll create a unit test to test whether this happened:
async def test_intro_called(self) -> None: """ Check if we got a request from another MyCommunity. """ self.initialize(MyCommunity, 2) # We have the overlay of Peer 0 send a message to Peer 1. self.overlay(0).send_introduction_request(self.peer(1)) # Our test is running in the asyncio main thread! # Let's yield to allow messages to be passed. await self.deliver_messages() # Peer 1 should have received the message from Peer 0. self.assertEqual(self.peer(0), self.overlay(1).last_peer)
Let’s run through this example.
First we create two instances of
Second, we instruct the first node in our test to send a message to our second node.
send_introduction_request() creates and sends a message to another peer and
allows it to be received.
Lastly, we assert that our second node received a message from our first node.
Note that the
asyncio programming model of Python executes its events on the main thread (the event loop),
including this test case and the communication that is caused by it.
In other words, since the test itself is occupying the main thread, the messaging will only happen
after our test is finished!
By the time it is allowed to execute, the communication is already cancelled.
deliver_messages() method backs off for a given amount of time and then waits for the main thread to be freed.
Now comes the caveat.
The main thread being freed may not mean your
Community is actually done doing stuff.
It is possible to schedule asyncio events in such a way that deliver_messages() can’t detect them.
This commonly happens when you use threading or hardware (like sockets).
In these exceptional cases, you can use
asyncio.sleep() or, better yet, await a custom
Future in the test.
Piggybacking on Introductions
Community instances prefer to piggyback information onto introductions.
TestBase simply adds peers to each other directly, this piggybacked information is not sent.
introduce_nodes() method allows you to send these introductions anyway, used as follows
(note the absence of
async def test_intro_called2(self) -> None: """ Check if we got a request from another MyCommunity. """ self.initialize(MyCommunity, 2) await self.introduce_nodes() self.assertEqual(self.peer(0), self.overlay(1).last_peer) self.assertEqual(self.peer(1), self.overlay(0).last_peer)
Using the RequestCache
Community instances, you will have many timeouts and lots of timeout logic in caches.
To make it easier to trigger these timeouts in the
RequestCache, we use the
Here’s an example:
async def test_passthrough(self) -> None: """ Check if a cache time out is properly handled. """ self.initialize(MyCommunity, 1) with self.overlay(0).request_cache.passthrough(): self.overlay(0).add_cache() await self.deliver_messages() self.assertTrue(self.overlay(0).timed_out)
In this example we use the
passthrough() contextmanager while we invoke a function that adds a cache.
This causes the timeout of the
MyCache cache we add inside
add_cache to be nullified and instantly fire.
Do note that this timeout occurs in the
asyncio event loop and we need to allow it to fire.
To yield the main thread we use
deliver_messages() again (though in this case
await asyncio.sleep(0.0) would have also
done the trick).
In some complex cases you may have more than one type of cache being added.
In these cases you can add a filter to
passthrough() to make it only nullify some particular classes
(simply add these classes as arguments to
async def test_passthrough2(self) -> None: """ Check if a cache time out is properly handled. """ self.initialize(MyCommunity, 1) with self.overlay(0).request_cache.passthrough(MyCache): self.overlay(0).add_cache() await self.deliver_messages() self.assertTrue(self.overlay(0).timed_out)
Fragile Packet Handling
IPv8 adds a general exception handler in
to disallow external messages crashing you.
However, when testing, this exception handler is removed by
If you want to enable the general exception handler again, you can either add your class to the
production_overlay_classes list or overwrite
def patch_overlays(self, i: int) -> None: if i == 1: pass # We'll run the general exception handler for Peer 1 else: super().patch_overlays(i)
In some cases, you may require temporary files in your unit tests.
TestBase exposes the
temporary_directory() method to generate directories for these files.
This method is
TestBase will clean up these files automatically for you.
However, if you hard-crash
TestBase before its
tearDown is invoked,
the temporary directories will not be cleaned up.
The temporary directory names are prefixed with
_temp_ and use a
uuid as a unique name.
The temporary directories will be created in the current working directory
for the mechanism to work on all supported platforms (Windows, Mac and Linux) even with limited permissions.
Asserting Message Delivery
You may want to assert that an overlay receives certain messages after a function.
TestBase exposes the
assertReceivedBy() function to do just that.
We’ll run through its functionality by example.
Most of the time, you will want to check if a peer received certain messages. In the following example peer 0 first sends message 1 and then sends message 2 to peer 1. The following construction asserts this:
with self.assertReceivedBy(1, [Message1, Message2]): self.overlay(0).send_msg_to(self.peer(1), 1) self.overlay(0).send_msg_to(self.peer(1), 2) await self.deliver_messages()
Sometimes, you can’t be sure in what order messages are sent.
In these cases you can use
with self.assertReceivedBy(1, [Message1, Message2, Message2], ordered=False): messages = [2, 1, 2] shuffle(messages) self.overlay(0).send_msg_to(self.peer(1), messages) self.overlay(0).send_msg_to(self.peer(1), messages) self.overlay(0).send_msg_to(self.peer(1), messages) await self.deliver_messages()
In other cases, your overlay may be sending messages which you cannot control and/or which you don’t care about. In these cases you can set a filter to only include the messages you want:
with self.assertReceivedBy(1, [Message1, Message2], message_filter=[Message1, Message2]): self.overlay(0).send_msg_to(self.peer(1), 1) if random() > 0.5: self.overlay(0).send_msg_to(self.peer(1), 3) self.overlay(0).send_msg_to(self.peer(1), 2) await self.deliver_messages()
It may also be helpful to inspect the contents of each payload. You can simply use the return value of the assert function to perform further inspection:
with self.assertReceivedBy(1, [Message1, Message2]) as received_messages: self.overlay(0).send_msg_to(self.peer(1), 1) self.overlay(0).send_msg_to(self.peer(1), 2) await self.deliver_messages() message1, message2 = received_messages self.assertEqual(1, message1.value) self.assertEqual(2, message2.value)
If you want to use
assertReceivedBy(), make sure that:
Your overlay message handlers only handle a single payload.
Your messages specify a
Your messages are compatible with
As usual, there is an easy and a hard way to do everything in IPv8.
You are welcome to call
every time you wish to access the public key of node
Or, instead, you may use the available shortcut
You may find your unit test become a lot more readable if you use the available
TestBase shortcuts though.
The IPv4 address of peer i.
The Endpoint instance of peer i.
The serialized public key (bytes) of peer i.
The serialized private key (bytes) of peer i.
The SHA-1 of the public key of peer i.
The private my_peer Peer instance of peer i.
The Network instance of peer i.
The MockIPv8 instance of peer i.
The Community instance of peer i.
The public Peer instance of peer i.
The private key instance of peer i.
The public key instance of peer i.
You are encouraged to add shortcuts that may be relevant to your own
Community instance in your own test class.