Task Management
This document assumes you have a basic understanding of network overlays in IPv8, as documented in the overlay tutorial.
You will learn how to use the IPv8’s TaskManager
class to manage asyncio
tasks and how to use NumberCache
to manage Future
instances.
What is a task?
Essentially, a task is a way for asyncio
to point to code that should be executed at some point.
The asyncio
library checks if it has something to execute and executes it until it has no more tasks to execute.
However, there are many intricacies when dealing with tasks.
Consider the following example:
import asyncio
COMPLETED = []
async def execute_me_too(i: int) -> None:
await asyncio.sleep(0.5)
COMPLETED.append(i)
async def execute_me() -> None:
execute_me_too(1) # 1
await execute_me_too(2) # 2
COMPLETED.append(3) # 3
_ = asyncio.ensure_future(execute_me_too(4)) # 4
COMPLETED.append(5) # 5
await asyncio.sleep(1) # 6
asyncio.run(execute_me())
print(COMPLETED)
Can you guess what will be printed?
The correct answer is [2, 3, 5, 4]
and equally important is that the answer changes to [2, 3, 5]
if you omit the final await asyncio.sleep(1)
.
Feel free to skip to the “The TaskManager” section if both of these answers are obvious to you.
Let’s run through execute_me()
to explain this behavior:
The
execute_me_too(1)
will not be executed, though your application will not crash. You will be informed of this through theRuntimeWarning: coroutine 'execute_me_too' was never awaited
warning message. You should have awaited thisexecute_me_too
call, we do this properly in the next line.By awaiting
execute_me_too(2)
theexecute_me()
call will wait untilexecute_me_too(2)
has finished executing. This adds the value2
to theCOMPLETED
list.After waiting for
execute_me_too(2)
to finishCOMPLETED.append(3)
is allowed to append the value3
to theCOMPLETED
list.By creating a future for
execute_me_too(4)
, we allowexecute_me()
to continue executing.While we wait for
execute_me_too(4)
to finish,COMPLETED.append(5)
can already access theCOMPLETED
list and insert its value5
.While
execute_me()
waits for another second,execute_me_too(4)
is allowed to add to theCOMPLETED
list.
Note that if you had omitted await asyncio.sleep(1)
, the execute_me()
call would not be waiting for anything anymore and return (outputting [2, 3, 5]
).
Instead, the future returned by asyncio.ensure_future()
should have been awaited, as follows:
async def execute_me() -> None:
await execute_me_too(2)
COMPLETED.append(3)
fut = asyncio.ensure_future(execute_me_too(4)) # store future
COMPLETED.append(5)
await fut # await future before exiting
As an added bonus, this is also faster than the previous method.
This new example will finish when the execute_me_too(4)
call completes, instead of after waiting a full second.
This concludes the basics of asyncio
tasks.
Now, we’ll show you how to make your life easier by using IPv8’s TaskManager
class.
The TaskManager
Managing Future
instances and coroutine
instances is complex and error-prone and, to help you with managing these, IPv8 has the TaskManager
class.
The TaskManager
takes care of managing all of your calls that have not completed yet and allows you to cancel them on demand.
You can even find this class in a separate PyPi repository: https://pypi.org/project/ipv8-taskmanager/
Adding tasks
To add tasks to your TaskManager
you can call register_task()
.
You register a task with a name, which you can later use to inspect the state of a task or cancel it.
For example:
import asyncio
from ipv8.taskmanager import TaskManager
COMPLETED = []
async def execute_me_too(i: int) -> None:
await asyncio.sleep(0.5)
COMPLETED.append(i)
async def main() -> None:
task_manager = TaskManager()
task_manager.register_task("execute_me_too1", execute_me_too, 1)
task_manager.register_task("execute_me_too2", execute_me_too, 2)
task_manager.cancel_pending_task("execute_me_too1")
await task_manager.wait_for_tasks()
await task_manager.shutdown_task_manager()
print(COMPLETED)
asyncio.run(main())
This example prints [2]
.
If you had not called wait_for_tasks()
before shutdown_task_manager()
in this example, all of your registered tasks would have been canceled, printing []
.
In some cases you may also not be interested in canceling a particular task.
For this use case the register_anonymous_task()
call exists, for example:
for i in range(20):
task_manager.register_anonymous_task("execute_me_too",
execute_me_too, i)
await task_manager.wait_for_tasks()
Note that this example takes just over half a second to execute, all 20 calls to execute_me_too()
are scheduled at (almost) the same time!
Periodic and delayed tasks
Next to simply adding tasks, the TaskManager
also allows you to invoke calls after a delay or periodically.
The following example will add the value 1
to the COMPLETED
list periodically and inject a single value of 2
after half a second:
async def execute_me_too(i: int, task_manager: TaskManager) -> None:
if len(COMPLETED) == 20:
task_manager.cancel_pending_task("keep adding 1")
return
COMPLETED.append(i)
async def main() -> None:
task_manager = TaskManager()
task_manager.register_task("keep adding 1", execute_me_too,
1, task_manager, interval=0.1)
task_manager.register_task("sneaky inject", execute_me_too,
2, task_manager, delay=0.5)
await task_manager.wait_for_tasks()
await task_manager.shutdown_task_manager()
print(COMPLETED)
This example prints [1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
.
Note that you can also create a task with both an interval and a delay.
Community Integration
For your convenience, every Community
is also a TaskManager
.
However, try to avoid spawning any tasks in the __init__
of your Community
.
Specifically, you could end up scheduling a task in between your Community
and other Community
initialization.
Working with a half-initialized IPv8 may lead to problems.
This practice also makes your Community
more difficult to test.
If you do end up in a situation where you absolutely must schedule a task from the __init__()
, you can use the TaskManager
’s cancel_all_pending_tasks()
method to cancel all registered tasks.
This is an example of how to deal with this in your unit tests:
class TestMyCommunity(TestBase):
def create_node(self, *args: Any, **kwargs) -> MockIPv8:
mock_ipv8 = super().create_node(*args, **kwargs)
mock_ipv8.overlay.cancel_all_pending_tasks()
return mock_ipv8
async def test_something(self) -> None:
self.initialize(MyCommunity, 1) # Will not run tasks
Futures and caches
Sometimes you may find that certain tasks belong to a message context.
In other words, you may have a task that belongs to a cache (see the storing states tutorial).
By registering a Future
instance in your NumberCache
subclass it will automatically be canceled when the NumberCache
gets canceled or times out.
You can do so using the register_future()
method.
This is a complete example:
import asyncio
from ipv8.requestcache import NumberCacheWithName, RequestCache
class MyCache(NumberCacheWithName):
name = "my cache"
def __init__(self,
request_cache: RequestCache,
number: int) -> None:
super().__init__(request_cache, self.name, number)
self.awaitable = asyncio.Future()
self.register_future(self.awaitable, on_timeout=False)
@property
def timeout_delay(self) -> float:
return 1.0
def finish(self) -> None:
self.awaitable.set_result(True)
async def main() -> None:
rq = RequestCache()
rq.add(MyCache(rq, 0))
with rq.passthrough():
rq.add(MyCache(rq, 1)) # Overwritten timeout = 0.0
rq.add(MyCache(rq, 2))
future0 = rq.get(MyCache, 0).awaitable
future1 = rq.get(MyCache, 1).awaitable
future2 = rq.get(MyCache, 2).awaitable
rq.get(MyCache, 0).finish()
await future0
print(f"future0.result()={future0.result()}")
rq.pop(MyCache, 0)
await future1
print(f"future1.result()={future1.result()}")
await rq.shutdown()
print(f"future2.cancelled()={future2.cancelled()}")
asyncio.run(main())
This example prints:
future0.result()=True future1.result()=False future2.cancelled()=True
This example showcases the three states your Future
may find itself in.
First, it may have been called and received an explicit result (True
in this case).
Second, the future may have timed out.
By default a timed-out Future
will have its result set to None
, but you can even give this method an exception class to have it raise an exception for you.
In this example we set the value to False
.
Third and last is the case where your future was cancelled, which will raise a asyncio.exceptions.CancelledError
if you await
it.