Source code for passengersim.driver._firehose
from __future__ import annotations
from typing import TYPE_CHECKING, TextIO
if TYPE_CHECKING:
from passengersim.core import SimulationEngine
[docs]
class Firehose:
"""Mixin for attaching and retrieving simulation firehose outputs.
This helper expects subclasses to provide an ``eng`` attribute with the
core firehose logger methods. Writable destinations are cached locally so
buffered output can be retrieved after the engine flushes pending messages.
"""
eng: SimulationEngine
def __init__(self):
self._firehose_buffers = {}
"""A mapping of firehose topic to buffer, for topics with a writable destination."""
[docs]
def attach_firehose(self, topic: str, destination: str | bool | TextIO | None) -> None:
"""Attach a firehose logger destination for a topic.
Parameters
----------
topic : str
Firehose topic name to route from the simulation engine.
destination : str or bool or file-like or None
Destination passed through to the engine. Writable destinations are
also stored locally so they can be returned by :meth:`get_firehose`.
"""
if not isinstance(destination, str | bool | type(None)):
# when adding a file-like object, attach here so we can get it back later if needed
self._firehose_buffers[topic] = destination
self.eng.attach_firehose_logger(topic, destination)
[docs]
def get_firehose(self, topic: str) -> TextIO | None:
"""Return the cached writable firehose destination for a topic.
Parameters
----------
topic : str
Firehose topic name to look up.
Returns
-------
TextIO or None
The cached writable destination for ``topic``, if one was attached.
Notes
-----
Pending firehose log output is flushed from the engine before the
cached destination is returned.
"""
self.eng.flush_firehoses()
return self._firehose_buffers.get(topic)