Source code for passengersim.rm.dynamic_prog

from __future__ import annotations

import io
import pathlib
import warnings
from typing import TYPE_CHECKING, Literal

import numpy as np
from ortools.linear_solver import pywraplp
from passengersim_core import (
    DynamicProgram,
    SimulationEngine,
)

from passengersim.snapshot.filtering import LegSnapshotFilter

from ._common import RmAction

if TYPE_CHECKING:
    from passengersim.config import Config
    from passengersim.driver import Simulation


[docs] class WarnSome: def __init__(self, max_times: int = 10, title: str = "") -> None: self.n = 0 self.max_times = max_times self.title = title
[docs] def warn(self, message: str, stacklevel: int = 2, **kwargs) -> None: if self.n < self.max_times: warnings.warn(message, stacklevel=stacklevel + 1, **kwargs) self.n += 1
def __del__(self) -> None: if self.title and self.n > self.max_times: warnings.warn( f"On WarnSome {self.title!r}: {self.n - self.max_times} additional warnings were suppressed", stacklevel=2, )
[docs] class LpDisplacementSolver: __slots__ = ("carrier", "solver", "objective", "lp_vars", "constraints", "cabin_code", "warnings", "error_log") def __init__(self, sim: SimulationEngine, carrier: str, cabin_code: str, error_log: pathlib.Path | None = None): self.carrier = carrier self.cabin_code = cabin_code self.solver = pywraplp.Solver.CreateSolver("GLOP") pywraplp.Solver.SetSolverSpecificParametersAsString(self.solver, "use_dual_simplex:1") self.objective = self.solver.Objective() self.objective.SetMaximization() self.warnings = WarnSome() self.error_log = pathlib.Path(error_log) if error_log else None # create LP decision variables, which are how many passengers to accept for each path class self.lp_vars = {} for path in sim.paths.set_filters(carrier=carrier): for pc in path.pathclasses: if self.cabin_code != "" and pc.cabin_code != self.cabin_code: continue name = pc.identifier fare = pc.current_tf_adjusted_fare_price(floor=0.0001, nan_to_zero=True) # fare = pc.fare.price pc_dmd = pc.fcst_mean self.lp_vars[name] = x = self.solver.NumVar(0, pc_dmd, name) self.objective.SetCoefficient(x, fare) # Create capacity constraints self.constraints = {} for leg in sim.legs.set_filters(carrier=carrier): base = leg if self.cabin_code != "": for cab in leg.cabins: if cab.name == self.cabin_code: base = cab break # we set the remaining capacity equal to an epsilon above zero to avoid numerical issues with the solver # when legs are sold out, as numerical imprecision can lead the solver to declare the problem infeasible. # This should not have a material effect on the results, as the displacement cost should then be very high # when the remaining capacity is effectively zero. rem_cap = max(self.get_remaining_capacity(base), 0.000001) ct = self.solver.Constraint(0, rem_cap, f"Leg-{leg.flt_no}") for pc_id in base.pathclass_identifiers: ct.SetCoefficient(self.lp_vars[pc_id], 1) self.constraints[leg.flt_no] = ct # Set a hint that all zeros is feasible self.solver.SetHint(self.lp_vars.values(), [0.0] * len(self.lp_vars))
[docs] def update(self, sim: SimulationEngine): # update LP decision variables, which are how many passengers to accept for each path class # also update LP coefficients, which is the marginal value of taking one more passenger in this pathclass for path in sim.paths.set_filters(carrier=self.carrier): for pc in path.pathclasses: v = self.lp_vars[pc.identifier] v.SetUb(pc.fcst_mean) self.objective.SetCoefficient(v, pc.current_tf_adjusted_fare_price(floor=0.0001, nan_to_zero=True)) # Update capacity constraints for leg in sim.legs.set_filters(carrier=self.carrier): base = leg if self.cabin_code != "": for cab in leg.cabins: if cab.name == self.cabin_code: base = cab break rem_cap = max(self.get_remaining_capacity(base), 0) self.constraints[leg.flt_no].SetUb(rem_cap)
def _write_log( self, filename: pathlib.Path | io.TextIOBase | None = None, with_solution: bool = False, with_pickle: bool = False, ): if isinstance(filename, io.TextIOBase): where = filename elif isinstance(filename, pathlib.Path | str): filename = pathlib.Path(filename) filename.parent.mkdir(parents=True, exist_ok=True) where = open(filename, "a") else: return try: print("## PROBLEM ##", file=where) problem_statement = self.solver.ExportModelAsLpFormat(False) print(problem_statement.replace("+", "\n +"), file=where) print("\n\n", file=where) if with_solution: print("## SOLUTION ##", file=where) print(f"Objective Value: {self.solver.Objective().Value()}", file=where) print("Decision Variables: ", file=where) # Loop through all variables registered in the solver for k, v in self.lp_vars.items(): print(f" {k}: {v.solution_value()} ({v.lb()} < {v.ub()})", file=where) print("Constraint Dual Values: ", file=where) for k, v in self.constraints.items(): print(f" {k}: {v.dual_value()}", file=where) print("\n\n", file=where) finally: if isinstance(filename, pathlib.Path): where.close() if with_pickle and isinstance(filename, pathlib.Path): pkl_filename = pathlib.Path(filename).with_suffix(".pkl") import dill with open(pkl_filename, "wb") as f: dill.dump(self.solver, f)
[docs] def get_remaining_capacity(self, base): remaining_cap = base.capacity - base.sold return remaining_cap
[docs] def solve(self, sim: SimulationEngine, days_prior: int) -> int: status = self.solver.Solve() if status == pywraplp.Solver.INFEASIBLE: self.warnings.warn( f"LP Displacement Problem is infeasible: " f"carrier={self.carrier}, trial={sim.trial}, sample={sim.sample}, days_prior={days_prior}", stacklevel=2, ) if self.error_log: self._write_log( self.error_log / f"trial{sim.trial}" / f"sample{sim.sample}" / f"carrier{self.carrier}_cabin{self.cabin_code}_daysprior{days_prior}_infeasible.log" ) elif status == pywraplp.Solver.UNBOUNDED: self.warnings.warn( f"LP Displacement Problem is unbounded: " f"carrier={self.carrier}, trial={sim.trial}, sample={sim.sample}, days_prior={days_prior}", stacklevel=2, ) if self.error_log: self._write_log( self.error_log / f"trial{sim.trial}" / f"sample{sim.sample}" / f"carrier{self.carrier}_cabin{self.cabin_code}_daysprior{days_prior}_unbounded.log" ) elif status == pywraplp.Solver.ABNORMAL: self.warnings.warn( f"LP Displacement Problem is abnormal: " f"carrier={self.carrier}, trial={sim.trial}, sample={sim.sample}, days_prior={days_prior}", stacklevel=2, ) if self.error_log: self._write_log( self.error_log / f"trial{sim.trial}" / f"sample{sim.sample}" / f"carrier{self.carrier}_cabin{self.cabin_code}_daysprior{days_prior}_abnormal.log" ) elif status == pywraplp.Solver.MODEL_INVALID: self.warnings.warn( f"LP Displacement Problem is invalid: " f"carrier={self.carrier}, trial={sim.trial}, sample={sim.sample}, days_prior={days_prior}", stacklevel=2, ) if self.error_log: self._write_log( self.error_log / f"trial{sim.trial}" / f"sample{sim.sample}" / f"carrier{self.carrier}_cabin{self.cabin_code}_daysprior{days_prior}_invalid.log" ) elif status in (pywraplp.Solver.OPTIMAL, pywraplp.Solver.FEASIBLE): # The bid prices used to find displacement adjusted fares are the dual # values of the capacity constraints num_zero = 0 for leg in sim.legs.set_filters(carrier=self.carrier): # the duals of the constraints are the displacement costs. # In theory they should be non-negative by construction, but we # enforce that here because the solver can sometimes return tiny # negative values due to numerical imprecision. leg.displacement = max(self.constraints[leg.flt_no].dual_value(), 0) if leg.displacement < 0.01: num_zero += 1 # print("Num zero displacements = ", num_zero) # If the LP is somehow insolvable, we will not change the existing displacement costs. # This generally happens closer to departure, probably due to numerical issues with the # solver when remaining capacities are tight or sometimes zero. return status
[docs] class UnbucketedDynamicProgram(RmAction): """UDP (Unbucketed Dynamic Programming) is a path-based RM optimization algorithm.""" requires: set[str] = {"path_forecast"} frequency = "daily_pre_dep" snapshot_filter_type = LegSnapshotFilter def __init__( self, *, carrier: str = "", cfg: Config | None = None, store_bid_prices: Literal["daily", "dcp"] = "daily", reoptimize: Literal["once", "dcp"] = "dcp", arrivals_per_time_slice: float = 0.5, min_time_slices_per_dcp: int = 1, use_current_bp: bool = False, minimum_sample: int = 10, normalization_method: int[0, 1, 2, 3, 4] = 0, cabins: str | list[str] | None = None, capacity_sharing: bool | None = False, capacity_sharing_start_dcp_index: int | None = 0, capacity_sharing_start_lf: float | None = 0.0, snapshot_filters: LegSnapshotFilter | list[LegSnapshotFilter] | None = None, error_log: pathlib.Path | None = None, ): super().__init__( carrier=carrier, minimum_sample=minimum_sample, cfg=cfg, snapshot_filters=snapshot_filters, ) self.error_log = error_log """ If problems are encountered, write about them in a log file at this location. If not provided, no error log is written. """ self.store_bid_prices: Literal["daily", "dcp"] = store_bid_prices """ The `store_bid_prices` parameter determines how bid prices are used. If `store_bid_prices` is set to `daily`, then the bid prices are stored for each day prior to departure by dividing each DCP into its component days, and then the bid prices are stored for each day, so they can vary for the same level of remaining capacity over the days in the DCP. If set to `dcp`, then the bid prices are stored only for each DCP, and are the same for each day within the DCP. Note that if `store_bid_prices` is set to `daily`, then there must be a call to the `udpupdate` RM step in the DAILY section of the RM configuration, which updates the bid price algorithm about which day's bid prices to be using in offer generation. """ self.reoptimize: Literal["once", "dcp"] = reoptimize """ The `reoptimize` parameter determines how often the UDP is re-run. If `reoptimize` is set to `once`, then the UDP is run once for each sample, based only the forecast sales by DCP over the entire booking curve. If set to `dcp`, then the UDP is re-run for each DCP, and information on prior sales in this sample is used to adjust the displacement costs. """ self.arrivals_per_time_slice: float = arrivals_per_time_slice """ The number of arrivals in each time slice. The number of DP simulation time slices in each time period is scaled so the total arrivals over all path-classes in one time slice is approximately equal to this value. It should not be set to a value greater than 1.0, as this will cause the DP to be under-populated. """ if self.arrivals_per_time_slice <= 0.0 or self.arrivals_per_time_slice > 1.0: raise ValueError("arrivals_per_time_slice must be in the range (0.0, 1.0]") self.min_time_slices_per_dcp: int = min_time_slices_per_dcp """ The minimum number of time slices in each DCP. This minimum overrides the `arrivals_per_time_slice` parameter, and ensures that each DCP has at least this many time slices. """ self.use_current_bp: bool = use_current_bp """ Experimental. Turning this on will assume that bid prices have been set by a prior step and just copy them as the displacement values. """ self.normalization_method = normalization_method """Normalization is used when the adjusted fares don't sum to the fare value 0 = No normalization 1 = Normalize when SUM(adj_fares) > Fare 2 = Normalize when SUM(adj_fares) < Fare 3 = Normalize when SUM(adj_fares) != Fare 4 = Multiply by bid price ratios (based on Juhasz 2023) """ self.cabins = cabins if cabins is not None else [] """Optional list of cabin codes to optimize. If not provided, this step will optimize on the leg as a whole.""" self.capacity_sharing = capacity_sharing """ Capacity sharing flag between cabins. When set to True, will use method 3 from Peter Belobaba's presentation. Higher cabin(s) will get max of combined cabins or itself alone. Lower cabin(s) will get min of combined cabins or itself alone.""" self.capacity_sharing_start_dcp_index = capacity_sharing_start_dcp_index self.capacity_sharing_start_lf = capacity_sharing_start_lf """We can optionally turn on capacity sharing when the coach cabin reaches a specified load factor. Based on a suggestion by Darius (PROS)""" # init some internal variables self._udp_engine = None self._lp_engine = {} self._max_days_prior = None self._dcp_days = None dcps = sorted(self.dcps, reverse=True) self._max_days_prior = max(dcps) self._dcps = dcps if self.store_bid_prices == "daily": self._dcp_days = -np.diff(dcps, append=0) if self._dcp_days.min() <= 0: raise ValueError("DCPs must be strictly descending") @property def udp(self): return self._udp
[docs] def run(self, sim: Simulation, days_prior: int): if not self.should_run(sim, days_prior): return if days_prior in self.dcps: self._run_dcp(sim, days_prior) else: self._set_bp_indices(sim, days_prior)
def _run_dcp(self, sim: Simulation, days_prior: int): snapshot_instruction = None if sim.snapshot_filters is not None: snapshot_filters = sim.snapshot_filters for sf in snapshot_filters: if sf.type != "udp": continue snapshot_instruction = sf.run(sim, carrier=self.carrier) # if self._max_days_prior is None: # self.set_dcps(sim.config.dcps) # We keep a map of core objects, as the CoreDynamicProgram code caches # the data structures it needs for each iteration if self._udp_engine is None: # TODO: check that we don't need to recreate the engine if the sim changes self._udp_engine = DynamicProgram(sim.eng, self.carrier) self._udp_engine.initialize() dcp_index = self.get_dcp_index(days_prior, allow_between=True) if self.reoptimize == "dcp" or (days_prior == self._max_days_prior): # Go through the legs and see where we turn on capacity sharing for leg in sim.eng.legs.set_filters(carrier=self.carrier): if self.capacity_sharing: if self.capacity_sharing_start_lf > 0.01: leg.capacity_sharing = False for cab in leg.cabins: if cab.name == "Y" and cab.sold / cab.capacity >= self.capacity_sharing_start_lf: leg.capacity_sharing = True elif self.capacity_sharing_start_dcp_index >= dcp_index: leg.capacity_sharing = True else: leg.capacity_sharing = False # todo: cache the setup for the LP if needed if not self.use_current_bp: if len(self.cabins) > 0: for cabin_code in self.cabins: self.solve_carrier_lp_for_leg_fare_displacements(sim.eng, cabin_code, days_prior) else: self.solve_carrier_lp_for_leg_fare_displacements(sim.eng, "", days_prior) for leg in sim.eng.legs.set_filters(carrier=self.carrier): snapshot_instruction = self.apply_snapshot_filters(sim, days_prior, leg) if self.capacity_sharing: if self.capacity_sharing_start_lf > 0.01: leg.capacity_sharing = False for cab in leg.cabins: if cab.name == "Y" and cab.sold / cab.capacity >= self.capacity_sharing_start_lf: leg.capacity_sharing = True elif self.capacity_sharing_start_dcp_index >= dcp_index: leg.capacity_sharing = True else: leg.capacity_sharing = False if len(self.cabins) == 0 or self.capacity_sharing: self._run_udp_optimizer(leg, "", snapshot_instruction) for cabin_code in self.cabins: self._run_udp_optimizer(leg, cabin_code, snapshot_instruction) # leg needs to know current BP index to pick the column when getting bid price if self.store_bid_prices == "daily": leg.bp_index = self._max_days_prior - days_prior else: leg.bp_index = dcp_index else: self._set_bp_indices(sim, days_prior) def _run_udp_optimizer(self, leg, cabin_code, snapshot_instruction): dp = self._udp_engine dp.solve_for_leg( leg, n_dcps=len(self._dcps), snapshot_instruction=snapshot_instruction, dcp_days=self._dcp_days, arrivals_per_time_slice=self.arrivals_per_time_slice, min_time_slices_per_dcp=self.min_time_slices_per_dcp, normalization_method=self.normalization_method, cabin=cabin_code, )
[docs] def solve_carrier_lp_for_leg_fare_displacements(self, eng: SimulationEngine, cabin_code="", days_prior: int = -1): if cabin_code in self._lp_engine: try: # it is faster to update the LP than to recreate it self._lp_engine[cabin_code].update(eng) except KeyError: # If we didn't set up the LP correctly with all the pathclasses, # then we need to recreate it. # TODO: this should not be needed if all pathclasses are initialized correctly self._lp_engine[cabin_code] = LpDisplacementSolver( eng, self.carrier, cabin_code, error_log=self.error_log ) else: self._lp_engine[cabin_code] = LpDisplacementSolver(eng, self.carrier, cabin_code, error_log=self.error_log) self._lp_engine[cabin_code].solve(eng, days_prior)
def _set_bp_indices(self, sim: Simulation, days_prior: int): dcp_index = self.get_dcp_index(days_prior, allow_between=True) if self.store_bid_prices == "daily": new_bp_index = self._max_days_prior - days_prior else: new_bp_index = dcp_index for leg in sim.eng.legs.set_filters(carrier=self.carrier): if self.store_bid_prices == "daily": leg.bp_index = new_bp_index for cab in leg.cabins: if cab.name in self.cabins: cab.bp_index = new_bp_index
[docs] class LegDynamicProgram(UnbucketedDynamicProgram): def _run_udp_optimizer(self, leg, cabin_code, snapshot_instruction, starting_dcp_index: int): dp = self._udp_engine dp.solve_for_leg( leg, # starting_dcp_index=starting_dcp_index, n_dcps=len(self._dcps), snapshot_instruction=snapshot_instruction, dcp_days=self._dcp_days, arrivals_per_time_slice=self.arrivals_per_time_slice, min_time_slices_per_dcp=self.min_time_slices_per_dcp, normalization_method=self.normalization_method, cabin=cabin_code, using_buckets=True, ) def _run_dcp(self, sim: Simulation, days_prior: int): # We keep a map of core objects, as the CoreDynamicProgram code caches # the data structures it needs for each iteration if self._udp_engine is None: # TODO: check that we don't need to recreate the engine if the sim changes self._udp_engine = DynamicProgram(sim.eng, self.carrier) self._udp_engine.initialize() dcp_index = self.get_dcp_index(days_prior, allow_between=True) if self.reoptimize == "dcp" or (days_prior == self._max_days_prior): # note: there are no displacement costs when using LegDP. starting_dcp_index = self.get_dcp_index(days_prior, allow_between=True) for leg in sim.eng.legs.set_filters(carrier=self.carrier): snapshot_instruction = self.apply_snapshot_filters(sim, days_prior, leg) if len(self.cabins) > 0: for cabin_code in self.cabins: self._run_udp_optimizer(leg, cabin_code, snapshot_instruction, starting_dcp_index) else: self._run_udp_optimizer(leg, "", snapshot_instruction, starting_dcp_index) # leg needs to know current BP index to pick the column when getting bid price if self.store_bid_prices == "daily": leg.bp_index = self._max_days_prior - days_prior else: leg.bp_index = dcp_index else: self._set_bp_indices(sim, days_prior)