Unit Testing Overlays
This document assumes you have a basic understanding of network overlays in IPv8, as documented in the overlay tutorial.
You will learn how to use the IPv8’s TestBase
class to unit test your overlays.
Files
This tutorial will place all of its files in the ~/Documents/ipv8_tutorial
directory.
You are free to choose whatever directory you want, to place your files in.
This tutorial uses the following files in the working directory:
community.py
test_community.py
We will use the following community.py
in this tutorial:
import os
import unittest
from typing import TYPE_CHECKING, Any
from ipv8.community import Community, CommunitySettings
from ipv8.requestcache import NumberCache, RequestCache
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8
if TYPE_CHECKING:
from ipv8.messaging.payload import IntroductionRequestPayload
from ipv8.messaging.payload_headers import GlobalTimeDistributionPayload
from ipv8.types import Peer
class MyCache(NumberCache):
def __init__(self, request_cache: RequestCache, overlay: MyCommunity) -> None:
super().__init__(request_cache, "", 0)
self.overlay = overlay
def on_timeout(self) -> None:
self.overlay.timed_out = True
class MyCommunitySettings(CommunitySettings):
some_constant: int | None = None
class MyCommunity(Community):
community_id = os.urandom(20)
settings_class = MyCommunitySettings
def __init__(self, settings: MyCommunitySettings) -> None:
super().__init__(settings)
self.request_cache = RequestCache()
self._some_constant = 42 if settings.some_constant is None else settings.some_constant
self.last_peer = None
self.timed_out = False
async def unload(self) -> None:
await self.request_cache.shutdown()
await super().unload()
def some_constant(self) -> int:
return self._some_constant
def introduction_request_callback(self, peer: Peer,
dist: GlobalTimeDistributionPayload,
payload: IntroductionRequestPayload) -> None:
self.last_peer = peer
def add_cache(self) -> None:
self.request_cache.add(MyCache(self.request_cache, self))
You’re encouraged to fill test_community.py
yourself as you read through this tutorial.
Why and How?
After playing around with your first overlay, you may have discovered that running multiple processes
and configuring your communities to test functionality is not very easy or reproducible.
We certainly have.
Therefore, we have created the TestBase
class with all the tools you need to mock the Internet and
make beautiful unit tests.
Because TestBase
is a subclass of unittest.TestCase
you can use common unit testing convenience methods, like testEqual
, testTrue
, setUp
, setUpClass
, etc.
This also means that TestBase
can be used with just about any test runner out there
(like unittest
, nosetests
or pytest
).
The way we will run our unit tests in this tutorial is with:
python3 -m unittest test_community.py
If you have custom logic in your subclass, please make sure to call your super()
methods.
Here’s an example of custom setUp
and tearDown
methods:
class MyTests(TestBase[MyCommunity]):
def setUp(self) -> None:
super().setUp()
# Insert your setUp logic here
async def tearDown(self) -> None:
await super().tearDown()
# Insert your tearDown logic here
Deadlock Detection
Before you start testing, you need to be warned about TestBase.MAX_TEST_TIME
.
By default, TestBase.MAX_TEST_TIME
is set to 10 seconds.
This means that if your testing class takes more than 10 seconds, TestBase
will terminate it.
We should probably mention that in proper software engineering a unit test case should never take 10 seconds.
However, we’re not here to judge.
If you want this timeout increased, simply overwrite the value of MAX_TEST_TIME
in your subclass.
For example:
class MyTests(TestBase):
MAX_TEST_TIME = 30.0 # Now this class can take 30 seconds
Creating Instances
The initialize()
method takes care of initializing your Community
subclass for you.
It’s as easy as this:
async def test_call(self) -> None:
"""
Create a MyCommunity and check the output of some_constant().
"""
# Create 1 MyCommunity
self.initialize(MyCommunity, 1)
# Nodes are 0-indexed
value = self.overlay(0).some_constant()
self.assertEqual(42, value)
What happened here?
First, we instructed TestBase
to create 1 instance of MyCommunity
using initialize()
.
As a side note: the raw information needed to make this happen (the mocking of the Internet and the interconnection
of overlays) is actually stored in the nodes
list of TestBase
.
Second, we ask our TestBase
to give us the overlay instance of node 0, which is our only node.
The overlay()
method is one of the many convenience methods in TestBase
to access common data in overlays.
We’ll provide a complete list of these convenience methods later in this document.
Last, we use a common unittest.TestCase
assertion to check if our some_constant()
overlay method returned 42.
In some cases, you might need to give additional parameters to your Community
’s __init__()
method.
In these cases, you can simply add additional keyword arguments to initialize()
.
async def test_call2(self) -> None:
"""
Create a MyCommunity with a custom some_constant.
"""
self.initialize(MyCommunity, 1, MyCommunitySettings(some_constant=7))
value = self.overlay(0).some_constant()
self.assertEqual(7, value)
In yet more advanced use cases, you may want to provide your own MockIPv8
instances.
This will usually be the case if your Community
instance only supports specific keys.
Commonly, Community
instances may choose to only support curve25519
keys, which you can do as follows:
def create_node(self, *args: Any, **kwargs) -> MockIPv8:
return MockIPv8("curve25519", self.overlay_class, *args, **kwargs)
Communication
You should now be able to create Community
instances and call their methods.
However, these instances are not communicating with each other yet.
Take note of this code in our Community
instance that stores the last peer that sent us an introduction request:
def introduction_request_callback(self, peer: Peer,
dist: GlobalTimeDistributionPayload,
payload: IntroductionRequestPayload) -> None:
self.last_peer = peer
This code simply stores whatever Peer
object last sent us a request.
We’ll create a unit test to test whether this happened:
async def test_intro_called(self) -> None:
"""
Check if we got a request from another MyCommunity.
"""
self.initialize(MyCommunity, 2)
# We have the overlay of Peer 0 send a message to Peer 1.
self.overlay(0).send_introduction_request(self.peer(1))
# Our test is running in the asyncio main thread!
# Let's yield to allow messages to be passed.
await self.deliver_messages()
# Peer 1 should have received the message from Peer 0.
self.assertEqual(self.peer(0), self.overlay(1).last_peer)
Let’s run through this example.
First we create two instances of MyCommunity
using initialize()
.
Second, we instruct the first node in our test to send a message to our second node.
Here send_introduction_request()
creates and sends a message to another peer and deliver_messages()
allows it to be received.
Lastly, we assert that our second node received a message from our first node.
Note that the asyncio
programming model of Python executes its events on the main thread (the event loop),
including this test case and the communication that is caused by it.
In other words, since the test itself is occupying the main thread, the messaging will only happen
after our test is finished!
By the time it is allowed to execute, the communication is already cancelled.
The deliver_messages()
method backs off for a given amount of time and then waits for the main thread to be freed.
Now comes the caveat.
The main thread being freed may not mean your Community
is actually done doing stuff.
It is possible to schedule asyncio events in such a way that deliver_messages() can’t detect them.
This commonly happens when you use threading or hardware (like sockets).
In these exceptional cases, you can use asyncio.sleep()
or, better yet, await a custom Future
in the test.
Piggybacking on Introductions
Some Community
instances prefer to piggyback information onto introductions.
As TestBase
simply adds peers to each other directly, this piggybacked information is not sent.
The introduce_nodes()
method allows you to send these introductions anyway, used as follows
(note the absence of deliver_messages()
):
async def test_intro_called2(self) -> None:
"""
Check if we got a request from another MyCommunity.
"""
self.initialize(MyCommunity, 2)
await self.introduce_nodes()
self.assertEqual(self.peer(0), self.overlay(1).last_peer)
self.assertEqual(self.peer(1), self.overlay(0).last_peer)
Using the RequestCache
In real Community
instances, you will have many timeouts and lots of timeout logic in caches.
To make it easier to trigger these timeouts in the RequestCache
, we use the passthrough()
method.
Here’s an example:
async def test_passthrough(self) -> None:
"""
Check if a cache time out is properly handled.
"""
self.initialize(MyCommunity, 1)
with self.overlay(0).request_cache.passthrough():
self.overlay(0).add_cache()
await self.deliver_messages()
self.assertTrue(self.overlay(0).timed_out)
In this example we use the passthrough()
contextmanager while we invoke a function that adds a cache.
This causes the timeout of the MyCache
cache we add inside add_cache
to be nullified and instantly fire.
Do note that this timeout occurs in the asyncio
event loop and we need to allow it to fire.
To yield the main thread we use deliver_messages()
again (though in this case await asyncio.sleep(0.0)
would have also
done the trick).
In some complex cases you may have more than one type of cache being added.
In these cases you can add a filter to passthrough()
to make it only nullify some particular classes
(simply add these classes as arguments to passthrough()
):
async def test_passthrough2(self) -> None:
"""
Check if a cache time out is properly handled.
"""
self.initialize(MyCommunity, 1)
with self.overlay(0).request_cache.passthrough(MyCache):
self.overlay(0).add_cache()
await self.deliver_messages()
self.assertTrue(self.overlay(0).timed_out)
Fragile Packet Handling
By default IPv8
adds a general exception handler in Community
instances,
to disallow external messages crashing you.
However, when testing, this exception handler is removed by TestBase
.
If you want to enable the general exception handler again, you can either add your class to the
production_overlay_classes
list or overwrite TestBase.patch_overlays()
.
For example:
def patch_overlays(self, i: int) -> None:
if i == 1:
pass # We'll run the general exception handler for Peer 1
else:
super().patch_overlays(i)
Temporary Files
In some cases, you may require temporary files in your unit tests.
TestBase
exposes the temporary_directory()
method to generate directories for these files.
This method is random.seed()
compatible.
Normally, TestBase
will clean up these files automatically for you.
However, if you hard-crash TestBase
before its tearDown
is invoked,
the temporary directories will not be cleaned up.
The temporary directory names are prefixed with _temp_
and use a uuid
as a unique name.
The temporary directories will be created in the current working directory
for the mechanism to work on all supported platforms (Windows, Mac and Linux) even with limited permissions.
Asserting Message Delivery
You may want to assert that an overlay receives certain messages after a function.
The TestBase
exposes the assertReceivedBy()
function to do just that.
We’ll run through its functionality by example.
Most of the time, you will want to check if a peer received certain messages. In the following example peer 0 first sends message 1 and then sends message 2 to peer 1. The following construction asserts this:
with self.assertReceivedBy(1, [Message1, Message2]):
self.overlay(0).send_msg_to(self.peer(1), 1)
self.overlay(0).send_msg_to(self.peer(1), 2)
await self.deliver_messages()
Sometimes, you can’t be sure in what order messages are sent.
In these cases you can use ordered=False
:
with self.assertReceivedBy(1, [Message1, Message2, Message2], ordered=False):
messages = [2, 1, 2]
shuffle(messages)
self.overlay(0).send_msg_to(self.peer(1), messages[0])
self.overlay(0).send_msg_to(self.peer(1), messages[1])
self.overlay(0).send_msg_to(self.peer(1), messages[2])
await self.deliver_messages()
In other cases, your overlay may be sending messages which you cannot control and/or which you don’t care about. In these cases you can set a filter to only include the messages you want:
with self.assertReceivedBy(1, [Message1, Message2], message_filter=[Message1, Message2]):
self.overlay(0).send_msg_to(self.peer(1), 1)
if random() > 0.5:
self.overlay(0).send_msg_to(self.peer(1), 3)
self.overlay(0).send_msg_to(self.peer(1), 2)
await self.deliver_messages()
It may also be helpful to inspect the contents of each payload. You can simply use the return value of the assert function to perform further inspection:
with self.assertReceivedBy(1, [Message1, Message2]) as received_messages:
self.overlay(0).send_msg_to(self.peer(1), 1)
self.overlay(0).send_msg_to(self.peer(1), 2)
await self.deliver_messages()
message1, message2 = received_messages
self.assertEqual(1, message1.value)
self.assertEqual(2, message2.value)
If you want to use assertReceivedBy()
, make sure that:
Your overlay message handlers only handle a single payload.
Your messages specify a
msg_id
.Your messages are compatible with
ez_send()
andlazy_wrapper_*()
.
Shortcut Reference
As usual, there is an easy and a hard way to do everything in IPv8.
You are welcome to call self.nodes[i].my_peer.public_key.key_to_bin()
manually
every time you wish to access the public key of node i
.
Or, instead, you may use the available shortcut self.key_bin(i)
.
You may find your unit test become a lot more readable if you use the available TestBase
shortcuts though.
method |
description |
---|---|
address(i) |
The IPv4 address of peer i. |
endpoint(i) |
The Endpoint instance of peer i. |
key_bin(i) |
The serialized public key (bytes) of peer i. |
key_bin_private(i) |
The serialized private key (bytes) of peer i. |
mid(i) |
The SHA-1 of the public key of peer i. |
my_peer(i) |
The private my_peer Peer instance of peer i. |
network(i) |
The Network instance of peer i. |
node(i) |
The MockIPv8 instance of peer i. |
overlay(i) |
The Community instance of peer i. |
peer(i) |
The public Peer instance of peer i. |
private_key(i) |
The private key instance of peer i. |
public_key(i) |
The public key instance of peer i. |
You are encouraged to add shortcuts that may be relevant to your own Community
instance in your own test class.