Source code for passengersim.rm.systems

from __future__ import annotations

import importlib
from collections.abc import Sequence
from typing import TYPE_CHECKING, Literal

import dill

from ._common import RmAction, RmActionFactory
from ._system_options import RmSysOption  # noqa: F401

if TYPE_CHECKING:
    from passengersim.config import Config
    from passengersim.driver import Simulation


def _make_factory_queue(
    actions: Sequence[RmActionFactory | type[RmAction]], check_parameters: dict | None = None
) -> list[RmActionFactory]:
    """Get a list of RmActionFactory instances for this sequence of actions.

    For actions that are not already RmActionFactory instances, this method
    will call their `configure()` class method to create a factory that has
    no configurable parameters.
    """
    factory_queue = []
    for i in actions:
        if isinstance(i, RmActionFactory):
            factory_queue.append(i)
        elif isinstance(i, type) and issubclass(i, RmAction):
            factory_queue.append(i.configure())
        else:
            raise TypeError(
                f"Every item in RmSys.actions must be an RmAction class or a RmActionFactory instance; "
                f"found {type(i).__name__} instead."
            )
    return factory_queue


[docs] class RmSys: """A revenue management (RM) system that executes a sequence of RM actions.""" priority: int = -1 """Priority of this RM system when scheduled as an event callback. The default setting is -1, which means it will run just before other daily callbacks with the default priority of 0. """ availability_control: Literal["leg", "cabin", "bp", "bp_loose", "classless", None] = None """The type of availability control used in this RM system. The selected availability control is injected into the Carrier object at the time it is created in the simulation, as this setting is used during the passenger arrival simulation loop, and instead of during the RM system steps run from the action queue each day. Subclasses of `RmSys` *must* set this class variable to one of the allowed values (other than None) to indicate the type of availability control used by that RM system. """ actions: list[RmActionFactory | type[RmAction]] = [] """List of RM actions (or action factories) to execute in this RM system. Each item in this list should be either an `RmActionFactory` instance, or an `RmAction` subclass. If an `RmAction` subclass is provided, its `configure()` class method will be called to create an `RmActionFactory` with no configurable parameters. Subclasses of `RmSys` *must* set this class variable to a non-empty list of actions to be executed in order when this RM system runs. """ def _check_action_parameters(self, factory_queue: Sequence[RmActionFactory], provided_parameters: dict) -> None: """Check that all provided parameters match at least one action factory.""" for k in provided_parameters: if not any(f.has_option(k) for f in factory_queue): raise ValueError( f"Keyword argument {k!r} does not match any configuration parameter in {self.__class__.__name__!r}." )
[docs] def __init__(self, carrier: str, cfg: Config | None = None, **kwargs) -> None: """ Initialize the RM system. Parameters ---------- carrier : str The carrier code for which this RM system is configured. cfg : Config, optional The top level configuration object for the simulation, which is also used to help initialize this system. For example, the collection of data collection points (DCPs) given as days prior to departure, is used for steps that have a frequency of "DCP" to identify which days to run. **kwargs Additional keyword arguments to configure the RM actions in this system. Raises ------ ValueError If `availability_control` is not defined, or if no actions are defined, or if any provided keyword argument does not match any action's configuration options. """ if self.availability_control is None: raise ValueError(f"availability_control is not defined on {self.__class__.__name__}.") if not self.actions: raise ValueError(f"no actions are defined on {self.__class__.__name__}.") # prepare a factory queue, which we will use to create action instances factory_queue = _make_factory_queue(self.actions) # check that all provided kwargs are valid for at least one action factory self._check_action_parameters(factory_queue, kwargs) self.action_queue: list[RmAction] = [f.make_action(carrier=carrier, cfg=cfg, **kwargs) for f in factory_queue] """List of RM actions to be executed in order."""
[docs] def run(self, sim: Simulation, days_prior: int) -> None: """Run all actions in the RM system's action queue. This will call all the actions in the action queue in order, passing the simulation and days prior to each action's run method. Each action should handle its own logic for whether it should execute based on the current simulation state and days prior. """ for action in self.action_queue: action.run(sim, days_prior)
def __call__(self, sim: Simulation, days_prior: int) -> None: """Run all actions in the RM system's action queue. This method allows the RM system instance to be called directly (i.e., as a callback), which internally invokes the run method. See `run()` for details. """ self.run(sim, days_prior)
[docs] @classmethod def get_name(cls) -> str: """Get the name of this RM system class.""" n = getattr(cls, "_name", None) if n is None: n = cls.__name__ return n
def __repr__(self) -> str: return ( f"<RmSys {self.get_name()!r}: {len(self.action_queue)} actions, " f"availability_control={self.availability_control}>" )
### RM SYSTEM REGISTRATION ### _REGISTERED_SYSTEMS: dict[str, type[RmSys]] = {} # Decorator to register a revenue management system class with a given name
[docs] def register_rm_system(rm_system: type[RmSys]) -> type[RmSys]: """Register an RM system class. This can be used as a decorator on an RmSys subclass. By default, the name used for registration is the class name, unless the class defines a class variable `_name`, in which case that is used instead. The registered RM system can then be retrieved by name using `get_registered_rm_system(name)`. You can also use registered RM systems by name in configuration files, to attach them to carriers. Parameters ---------- rm_system : type[RmSys] The RM system class to register. Returns ------- type[RmSys] The same RM system class that was registered. This makes it possible to use this function as a decorator to register RM system but otherwise act transparently. """ global _REGISTERED_SYSTEMS name = rm_system.get_name() if name in _REGISTERED_SYSTEMS: raise ValueError(f"RM system {name!r} is already registered.") _REGISTERED_SYSTEMS[name] = rm_system return rm_system
[docs] def get_registered_rm_system(name: str) -> type[RmSys]: """Retrieve a registered RM system class by name.""" global _REGISTERED_SYSTEMS if name not in _REGISTERED_SYSTEMS: raise KeyError(f"RM system {name!r} is not registered.") return _REGISTERED_SYSTEMS[name]
[docs] def check_registered_rm_system(name: str) -> bool: """Check if an RM system with the given name is registered.""" global _REGISTERED_SYSTEMS return name in _REGISTERED_SYSTEMS
[docs] def list_registered_rm_systems() -> list[str]: """List the names of all registered RM systems.""" global _REGISTERED_SYSTEMS return list(_REGISTERED_SYSTEMS.keys())
[docs] def export_registered_rm_systems() -> bytes: """Serialize and export the dictionary of registered RM systems. Returns ------- bytes The serialized data containing the registered RM systems. """ global _REGISTERED_SYSTEMS return dill.dumps(_REGISTERED_SYSTEMS)
[docs] def restore_registered_rm_systems(data: bytes) -> None: """Restore the dictionary of registered RM systems from serialized data. This will update the existing registered RM systems with those found in the provided data. Existing registrations with the same names will be overwritten, but others will be preserved. Parameters ---------- data : bytes The serialized data containing the registered RM systems. """ global _REGISTERED_SYSTEMS _REGISTERED_SYSTEMS.update(dill.loads(data))
[docs] def make_rm_system_variant(new_cls: type[RmSys]) -> type[RmSys]: """Define a named variant of an existing RM system, with particular options. Use this decorator on a class which defines alternative default values for RmSys options, and which inherits from an existing RmSys class. This will create a new RM system class with these defined values as the defaults, and register it with the name of the new class. Raises ------ ValueError The an RM system class with the given name is already registered. Examples -------- To start with the existing `Qu` system, but change the default fare adjustment algorithm to `ki` and the fare adjustment to 0.25, do >>> from passengersim.rm.standard_systems import Qu >>> @make_rm_system_variant ... class Qu25(Qu): ... fare_adjustment = "ki" ... fare_adjustment_scale = 0.25 """ # the new class must be defined with exactly one base class, so we can easily determine which kwargs are new if len(new_cls.__bases__) != 1: raise ValueError( f"A new variant RmSys must have exactly one base class, but {new_cls} has {len(new_cls.__bases__)}" ) base_cls = new_cls.__bases__[0] # the base_cls must be an RmSys if not issubclass(base_cls, RmSys): raise ValueError(f"The base class of a new variant RmSys must be a subclass of RmSys, but {base_cls} is not.") variant_defines = { k: v for k, v in new_cls.__dict__.items() if not k.startswith("__") # Exclude magic methods/docs } def _new_init(self, *args, **kwargs): for k, v in variant_defines.items(): if k not in kwargs: kwargs[k] = v super(base_cls, self).__init__(*args, **kwargs) final_cls = type( new_cls.__name__, (base_cls,), {"__init__": _new_init, "_variant_defines": variant_defines, "_name": new_cls.__name__}, ) return register_rm_system(final_cls)
[docs] def describe_rm_systems(cfg: Config | None = None) -> dict: """Describe the RM systems used in the configuration. If no configuration is provided, a description of all available systems is returned. """ rm_systems_setup = {} if cfg is None: for k in list_registered_rm_systems(): s = get_registered_rm_system(k) if hasattr(s, "_variant_defines"): rm_systems_setup[k] = { "base_class_name": s.__base__.get_name(), "variant_defines": s._variant_defines, } else: v = {"module": s.__module__} class_name = s.get_name() if class_name != k: v["class_name"] = class_name if s.__name__ != k: v["import_name"] = s.__name__ rm_systems_setup[k] = v else: for carrier in cfg.carriers.values(): k = carrier.rm_system s = get_registered_rm_system(k) while hasattr(s, "_variant_defines"): rm_systems_setup[k] = { "base_class_name": s.__base__.get_name(), "variant_defines": s._variant_defines, } s = s.__base__ k = s.get_name() if get_registered_rm_system(k) is not s: s = None if s is not None: v = {"module": s.__module__} class_name = s.get_name() if class_name != k: v["class_name"] = class_name if s.__name__ != k: v["import_name"] = s.__name__ rm_systems_setup[k] = v return rm_systems_setup
[docs] def reload_rm_systems(description: dict) -> None: """Reload RM systems used in the configuration, based on the provided description.""" # step 1 systems are RmSys classes that are "root" systems, defined with # `availability_control` and `actions` attributes. These systems are tagged # with their module so we can import them directly. They also give a class # name if that name differs from the registered name. step1 = [] # step 2 systems are variants, which take an existing RM system and modify # only the default setting for some (or none) of its options. These systems # are tagged with a `base_class_name` and `variant_defines` which gives the # settings they modify step2 = [] # To begin, we parse the description into step1 and step2 systems. for k, v in description.items(): if "variant_defines" in v: step2.append([k, v]) else: step1.append([k, v]) # step 1 systems are then imported. The modules where these systems live must # have already been imported, or be findable on `sys.path`. These systems should # register themselves on import. for k, v in step1: _module = importlib.import_module(v["module"]) _import_name = v.get("import_name", k) _obj = getattr(_module, _import_name) if get_registered_rm_system(k) is not _obj: raise ValueError(f"tried and failed to load {_import_name!r} as {k!r} from {v['module']!r}") # Now we handle step 2 systems. Since they may inherit from each other, we need # to processes them as a re-entrant queue, in case a system wants to inherit from # something we didn't handle yet. n = 0 while len(step2): (k, v) = step2.pop(0) try: base_cls = get_registered_rm_system(v["base_class_name"]) except (ValueError, KeyError): step2.append([k, v]) n += 1 if n > 1000: raise RecursionError( f"recursion limit reached, cannot reload {k!r} as a variant of {v['base_class_name']!r}" ) variant_defines = v["variant_defines"] # if this system is already registered, check that it matches try: existing_system = get_registered_rm_system(k) except KeyError: existing_system = None if existing_system is not None: if existing_system.__base__.get_name() != v["base_class_name"]: raise ValueError( f"existing system {k!r} has different base class ({existing_system.__base__.get_name()!r}) " f"than expected ({v['base_class_name']!r})" ) existing_variant_defines = getattr(existing_system, "_variant_defines", {}) if existing_variant_defines != variant_defines: _missing_keys_from_expected = variant_defines.keys() - existing_variant_defines.keys() _missing_keys_from_existing = existing_variant_defines.keys() - variant_defines.keys() _common_keys = set(variant_defines.keys()).intersection(set(existing_variant_defines.keys())) _mismatch_values = { k: dict(expected=variant_defines[k], existing=existing_variant_defines[k]) for k in _common_keys if variant_defines[k] != existing_variant_defines[k] } _errmsg = f"existing system {k!r} has different settings than expected\n" if _missing_keys_from_existing: _errmsg += f" - existing system missing {_missing_keys_from_existing}\n" if _missing_keys_from_expected: _errmsg += f" - expected system missing {_missing_keys_from_expected}\n" for k, v in _mismatch_values.items(): _errmsg += ( f" - setting {k!r} has value {v['existing']!r} in existing system " f"but expected {v['expected']!r}\n" ) raise ValueError(_errmsg) continue # existing_system is None, so we need to create and register it _new_cls = make_rm_system_variant(type(k, (base_cls,), variant_defines)) # we don't need to do anything else with _new_cls, it should now be registered if get_registered_rm_system(k) is not _new_cls: raise ValueError("unexpected error")