3.0.ipv8.REST.asyncio_endpoint

Classes

Module Contents

class 3.0.ipv8.REST.asyncio_endpoint.DequeLogHandler(maxlen: int = 50)

Bases: logging.Handler

Log handler that stores the past log records.

deque: collections.deque[str]
emit(record: logging.LogRecord) None

Callback for when a log record is written.

class 3.0.ipv8.REST.asyncio_endpoint.DriftMeasurementStrategy(core_update_rate: float)

Bases: 3.0.ipv8.peerdiscovery.discovery.DiscoveryStrategy

Strategy to measure how far off the invocation time of strategies is from the IPv8 tick rate.

last_measurement
history: collections.deque[tuple[float, float]]
core_update_rate
take_step() None

Measure the time since our last invocation and compare it to the expected update rate.

class 3.0.ipv8.REST.asyncio_endpoint.AsyncioEndpoint

Bases: 3.0.ipv8.REST.base_endpoint.BaseEndpoint[3.0.ipv8.types.IPv8]

This endpoint helps with the monitoring of the Asyncio thread.

strategy: DriftMeasurementStrategy | None = None
asyncio_log_handler: DequeLogHandler | None = None
setup_routes() None

Register the names to make this endpoint callable.

enable() bool

Create a new measurement strategy and hook it into IPv8.

disable() bool

Destroy the previously registered measurement strategy.

async retrieve_drift(_: aiohttp.abc.Request) 3.0.ipv8.REST.base_endpoint.Response

Calculate the current drift.

async enable_measurements(request: aiohttp.abc.Request) 3.0.ipv8.REST.base_endpoint.Response

Enable or disable drift measurements.

async get_asyncio_tasks(_: aiohttp.abc.Request) 3.0.ipv8.REST.base_endpoint.Response

Return all currently running Asyncio tasks.

async set_asyncio_debug(request: aiohttp.abc.Request) 3.0.ipv8.REST.base_endpoint.Response

Set asyncio debug options.

async get_asyncio_debug(_: aiohttp.abc.Request) 3.0.ipv8.REST.base_endpoint.Response

Return Asyncio log messages.