Storing states in IPv8

This document assumes you have a basic understanding of network overlays in IPv8, as documented in the overlay tutorial. You will learn how to use the IPv8’s RequestCache class to store the state of message flows.

When you need a state

More often than not messages come in flows. For example, one peer sends out a request and another peer provides a response. Or, as another example, your message is too big to fit into a single UDP packet and you need to keep track of multiple smaller messages that belong together. In these cases you need to keep a state. The RequestCache class keeps track of states and also natively includes a timeout mechanism to make sure you don’t get a memory leak.

Typically, you will use one RequestCache per network overlay, to which you add the caches that store states.

The hard way

The most straightforward way of interacting with the RequestCache is by adding NumberCache instances to it directly. Normally, you will use add() and pop() to respectively add new caches and remove existing caches from the RequestCache. This is a bare-bones example of how states can be stored and retrieved:

from asyncio import create_task, run, sleep

from ipv8.requestcache import NumberCacheWithName, RequestCache


class MyState(NumberCacheWithName):

    name = "my-state"

    def __init__(self, request_cache: RequestCache,
                 identifier: int, state: int) -> None:
        super().__init__(request_cache, self.name, identifier)
        self.state = state


async def foo(request_cache: RequestCache) -> None:
    """
    Add a new MyState cache to the global request cache.
    The state variable is set to 42 and the identifier of this cache is 0.
    """
    cache = MyState(request_cache, 0, 42)
    request_cache.add(cache)


async def bar() -> None:
    """
    Wait until a MyState cache with identifier 0 is added.
    Then, remove this cache from the global request cache and print its state.
    """
    # Normally, you would add this to a network overlay instance.
    request_cache = RequestCache()

    _ = create_task(foo(request_cache))

    while not request_cache.has(MyState, 0):
        await sleep(0.1)
    cache = request_cache.pop(MyState, 0)
    print("I found a cache with the state:", cache.state)


run(bar())

In the previous example we have assumed that a cache would eventually arrive. This will almost never be the case in practice. You can overwrite the on_timeout method of your NumberCache instances to deal with cleaning up when a cache times out. In this following example we shut down when the cache times out:

from asyncio import run, sleep

from ipv8.requestcache import NumberCacheWithName, RequestCache


class MyState(NumberCacheWithName):

    name = "my-state"

    def __init__(self, request_cache: RequestCache,
                 identifier: int, state: int) -> None:
        super().__init__(request_cache, self.name, identifier)
        self.state = state

    def on_timeout(self) -> None:
        print("Oh no! I never received a response!")

    @property
    def timeout_delay(self) -> float:
        # We will timeout after 3 seconds (default is 10 seconds)
        return 3.0


async def foo() -> None:
    request_cache = RequestCache()
    cache = MyState(request_cache, 0, 42)
    request_cache.add(cache)
    await sleep(4)


run(foo())

You may notice some inconvenient properties of these caches. You need to generate a unique identifier and manually keep track of it. This is why we have an easier way to interact with the RequestCache.

The easier way

Let’s look at the complete Community code for two peers that use each other to count to 10. For this toy box example we define two messages and a single cache. Unlike when doing things the hard way, we now use a RandomNumberCache to have IPv8 select a message identifier for us. Both the identifier fields for the messages and the name for the cache are required. Please attentively read through this code:

# We'll use this global variable to keep track of the IPv8 instances that finished.
DONE = []


@vp_compile
class MyRequest(VariablePayload):
    msg_id = 1
    format_list = ['I', 'I']
    names = ["value", "identifier"]


@vp_compile
class MyResponse(VariablePayload):
    msg_id = 2
    format_list = ['I', 'I']
    names = ["value", "identifier"]


class MyCache(RandomNumberCacheWithName):
    name = "my-cache"

    def __init__(self, request_cache: RequestCache, value: int) -> None:
        super().__init__(request_cache, self.name)
        self.value = value


class MyCommunity(Community):
    community_id = os.urandom(20)

    def __init__(self, settings: CommunitySettings) -> None:
        super().__init__(settings)
        self.add_message_handler(1, self.on_request)
        self.add_message_handler(2, self.on_response)

        # This is where the magic starts: add a ``request_cache`` variable.
        self.request_cache = RequestCache()

    async def unload(self) -> None:
        # Don't forget to shut down the RequestCache when you unload the Community!
        await self.request_cache.shutdown()
        await super().unload()

    def started(self) -> None:
        self.register_task("wait for peers and send a request", self.send)

    async def send(self) -> None:
        # Wait for our local peers to connect to eachother.
        while not self.get_peers():
            await sleep(0.1)
        # Then, create and register our cache.
        cache = self.request_cache.add(MyCache(self.request_cache, 0))
        # If the overlay is shutting down the cache will be None.
        if cache is not None:
            # Finally, send the overlay message to the other peer.
            for peer in self.get_peers():
                self.ez_send(peer, MyRequest(cache.value, cache.number))

    @lazy_wrapper(MyRequest)
    def on_request(self, peer: Peer, payload: MyRequest) -> None:
        # Our service is to increment the value of the request and send this in the response.
        self.ez_send(peer, MyResponse(payload.value + 1, payload.identifier))

    @lazy_wrapper(MyResponse)
    @retrieve_cache(MyCache)
    def on_response(self, peer: Peer, payload: MyResponse, cache: MyCache) -> None:
        print(peer, "responded to:", cache.value, "with:", payload.value)

        # Stop the experiment if both peers reach a value of 10.
        if payload.value == 10:
            DONE.append(True)
            return

        # Otherwise, do the same thing over again and ask for another increment.
        cache = self.request_cache.add(MyCache(self.request_cache, payload.value))
        if cache is not None:
            for peer in self.get_peers():
                self.ez_send(peer, MyRequest(payload.value, cache.number))
                # To spice things up, we'll perform a replay attack.
                # The RequestCache causes this second duplicate message to be ignored.
                self.ez_send(peer, MyRequest(payload.value, cache.number))

You are encouraged to play around with this code. Also, take notice of the fact that this example includes a replay attack (try removing the cache and see what happens).