Source code for passengersim.rm.linear_program

#
# Several LP approaches to RM optimization
# - Uses Google OR-Tools (open source optimizer)
#
# Alan W
# (c) PassengerSim LLC, January 2026
#

from __future__ import annotations

import random
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from statistics import NormalDist
from typing import TYPE_CHECKING

from ortools.linear_solver import pywraplp
from passengersim_core import DynamicProgram

from ._common import RmAction

if TYPE_CHECKING:
    from passengersim.config import Config
    from passengersim.core import SimulationEngine
    from passengersim.driver import Simulation


[docs] class LinearProgramming(RmAction): """ Uses various LP implementations as an RM step, either for displacement computations or for bid price. """ requires: set[str] = {"path_forecast"} frequency = "dcp" lp = {} def __init__( self, *, carrier: str = "", cfg: Config | None = None, algorithm: str = "piecewise", epsilon: float = 0.01, num_pieces: float = 25, max_std_dev: float = 2.5, debug: bool = False, ): super().__init__( carrier=carrier, cfg=cfg, ) self.algorithm = algorithm self.epsilon = epsilon self.carrier = carrier self.epsilon = epsilon self.num_pieces = num_pieces self.max_std_dev = max_std_dev self.debug = debug
[docs] def run(self, sim: Simulation, days_prior: int): if not self.should_run(sim, days_prior): return if self.carrier not in self.lp: if self.algorithm.lower() in ["deterministic", "dlp"]: s = DLP(self.carrier, self.debug) elif self.algorithm.lower() in ["piecewise", "pnlp"]: self.frequency = "daily_pre_dep" s = LpPiecewiseSolver(self.carrier, self.debug) elif self.algorithm.lower() in ["piecewise2", "rithvik"]: self.frequency = "daily_pre_dep" s = LpPiecewise2(self.carrier, 0.01, self.debug) elif self.algorithm.lower() == "ssa": s = StochasticSampleSolver(self.carrier, self.debug) elif self.algorithm.lower() == "ssblp": raise ValueError("Not implemented yet") else: raise ValueError("Unknown algorithm") s.initialize(sim.eng) self.lp[self.carrier] = s else: self.lp[self.carrier].update(sim.eng) self.lp[self.carrier].solve(sim.eng)
# ##########################################################################################
[docs] class LpBase(ABC): """Some common code, used by each implementation. Also forces each implementation to have a standard interface"""
[docs] @abstractmethod def initialize(self, eng: SimulationEngine): pass
[docs] @abstractmethod def update(self, eng: SimulationEngine): pass
[docs] @abstractmethod def solve(self, sim: SimulationEngine, debug=False): pass
[docs] def test_status(self, status): if status in (pywraplp.Solver.OPTIMAL, pywraplp.Solver.FEASIBLE): return True elif status == pywraplp.Solver.INFEASIBLE: msg = "LP Problem is infeasible" elif status == pywraplp.Solver.UNBOUNDED: msg = "LP Problem is unbounded" elif status == pywraplp.Solver.ABNORMAL: msg = "LP Problem is abnormal" elif status == pywraplp.Solver.MODEL_INVALID: msg = "LP Displacement Problem is invalid" else: msg = "LP Unknown status - this should never happen !!!" warnings.warn(f"{msg}: carrier={self.carrier}, sample={self.sim.sample}", stacklevel=2) return False
# ##########################################################################################
[docs] class DLP(LpBase): """Deterministic LP, can be used to get displacement for UDP Can also set bid-prices as a demo of why deterministic LP isn't used for this purpose in any real RM system as the answer will suck :-)""" __slots__ = ("carrier", "solver", "objective", "lp_vars", "constraints", "cabin_code") def __init__(self, sim: SimulationEngine, carrier: str, cabin_code: str): self.carrier = carrier self.cabin_code = cabin_code self.solver = pywraplp.Solver.CreateSolver("GLOP") pywraplp.Solver.SetSolverSpecificParametersAsString(self.solver, "use_dual_simplex:1") self.objective = self.solver.Objective() self.objective.SetMaximization() # create LP decision variables, which are how many passengers to accept for each path class self.lp_vars = {} for path in sim.paths.set_filters(carrier=carrier): for pc in path.pathclasses: if self.cabin_code != "" and pc.cabin_code != self.cabin_code: continue name = pc.identifier fare = pc.fare_price pc_dmd = pc.fcst_mean self.lp_vars[name] = x = self.solver.NumVar(0, pc_dmd, name) self.objective.SetCoefficient(x, fare) # Create capacity constraints self.constraints = {} for leg in sim.legs.set_filters(carrier=carrier): base = leg if self.cabin_code != "": for cab in leg.cabins: if cab.name == self.cabin_code: base = cab break rem_cap = max(self.get_remaining_capacity(base), 0) ct = self.solver.Constraint(0, rem_cap, f"Leg-{leg.flt_no}") for pc_id in base.pathclass_identifiers: ct.SetCoefficient(self.lp_vars[pc_id], 1) self.constraints[leg.flt_no] = ct
[docs] def update(self, eng: SimulationEngine): # update LP decision variables, which are how many passengers to accept for each path class for path in eng.paths.set_filters(carrier=self.carrier): for pc in path.pathclasses: self.lp_vars[pc.identifier].SetUb(pc.fcst_mean) # Create capacity constraints for leg in eng.legs.set_filters(carrier=self.carrier): base = leg if self.cabin_code != "": for cab in leg.cabins: if cab.name == self.cabin_code: base = cab break rem_cap = max(self.get_remaining_capacity(base), 0) self.constraints[leg.flt_no].SetUb(rem_cap)
[docs] def get_remaining_capacity(self, base): remaining_cap = base.capacity - base.sold return remaining_cap
[docs] def solve(self, sim: SimulationEngine): status = self.solver.Solve() if self.test_status(status): # The bid prices used to find displacement adjusted fares are the dual # values of the capacity constraints num_zero = 0 for leg in sim.legs.set_filters(carrier=self.carrier): # the duals of the contraints are the displacement costs. # In theory they should be non-negative by construction, but we # enforce that here because the solver can sometimes return tiny # negative values due to numerical imprecision. leg.displacement = max(self.constraints[leg.flt_no].dual_value(), 0) if leg.displacement < 0.01: num_zero += 1 # print("Num zero displacements = ", num_zero) return # If the LP is infeasible, we need to set the displacement costs to zero for leg in sim.legs.set_filters(carrier=self.carrier): leg.displacement = 0
# ##########################################################################################
[docs] class LpPiecewiseSolver(LpBase): """Just fiddling with an approach to approximate the non-linear stochastic problem. Break each demand into pieces, and price them at the expected marginal revenue NOTES: - We don't need an "allocation <= demand" constraint, as demand is effectively unbounded but the marginal revenue is monotonic descending, asymptotic to zero and so the pieces will get chosen in order until leg capacity is filled. """ __slots__ = ( "carrier", "solver", "objective", "lp_vars", "constraints", "epsilon", "num_pieces", "max_std_dev", "set_bp", "debug", ) def __init__(self, carrier: str, _debug=False): self.epsilon = 0.01 self.carrier = carrier self.num_pieces = 25 self.max_std_dev = 2.5 self.set_bp = False self.debug = _debug
[docs] def initialize(self, eng: SimulationEngine): self.solver = pywraplp.Solver.CreateSolver("GLOP") self.objective = self.solver.Objective() self.objective.SetMaximization() tmp = DynamicProgram(eng, self.carrier) tmp.initialize() # This step sets up the PcPtr structures, will refactor later to make this a separate call # create LP decision variables, which are how many passengers to accept for each path class self.lp_vars = {} if self.debug: print("********** Setup decision variables **********") for path in eng.paths: # .set_filters(carrier=self.carrier): if path.carrier_name != self.carrier: continue if self.debug: print("Path:", path) for pc in path.pathclasses: fare = pc.fare_price pc_dmd = pc.fcst_mean pc_std_dev = pc.fcst_std_dev var_max = pc_dmd + self.max_std_dev * pc_std_dev piece_size = var_max / self.num_pieces lh_rev = fare for piece in range(self.num_pieces): # What the probability that demand is at least (piece * piece_size)? if pc_dmd < self.epsilon or pc_std_dev < self.epsilon: prob = 0.0 else: prob = 1.0 - NormalDist(mu=pc_dmd, sigma=pc_std_dev).cdf(piece * piece_size) # As we step along the EMSR curve, compute the LH and RH of the piece, then take the average rh_rev = fare * prob avg_rev = (lh_rev + rh_rev) / 2.0 lh_rev = rh_rev name = f"{pc.identifier}-{piece}" self.lp_vars[name] = x = self.solver.NumVar(0, piece_size, name) if self.debug: print(f" Var: {name}, size={piece_size}, rev={avg_rev}") self.objective.SetCoefficient(x, avg_rev) # Create capacity constraints if self.debug: print("CAPACITY CONSTRAINTS") self.constraints = {} for leg in eng.legs: if leg.carrier_name != self.carrier: continue ct = self.solver.Constraint(0, leg.capacity - leg.sold, f"Leg-{leg.flt_no}") for pc_id in leg.pathclass_identifiers: # Add up all the pieces on the leg for piece in range(self.num_pieces): name = f"{pc_id}-{piece}" ct.SetCoefficient(self.lp_vars[name], 1) if self.debug: print(f" Leg={leg}, name={name}") self.constraints[leg.flt_no] = ct
[docs] def update(self, eng: SimulationEngine): """I didn't have a simple way to just update the LP, so we recreate it from scratch""" self.initialize(eng)
[docs] def solve(self, sim: SimulationEngine, debug=False): _result_status = self.solver.Solve() if self.test_status(_result_status): # The bid prices are the dual values of the capacity constraints for leg in sim.legs: if leg.carrier_name != self.carrier: continue # The duals of the constraints are the displacement costs. # In theory, they should be non-negative by construction, but we # enforce that here because the solver can sometimes return tiny # negative values due to numerical imprecision. leg.displacement = max(self.constraints[leg.flt_no].dual_value(), 0) if self.set_bp: leg.bid_price = leg.displacement if debug: print(f"Leg: {leg}, bp={leg.displacement}")
# ##########################################################################################
[docs] class StochasticSampleSolver(LpBase): """Testing a sample-based LP solver. The x variables correspond to each sample The y variables are the actual (nested) controls We have constraints so that the x variables nest in the y variables Based on [Kleywegt 2002] """ __slots__ = ( "carrier", "solver", "objective", "lp_vars", "constraints", "num_samples", "set_bp", "debug", ) def __init__(self, carrier: str, num_samples=5, debug=False): self.carrier = carrier self.num_samples = num_samples self.set_bp = True self.debug = debug
[docs] def initialize(self, sim: SimulationEngine): self.solver = pywraplp.Solver.CreateSolver("GLOP") self.objective = self.solver.Objective() self.objective.SetMaximization() tmp = DynamicProgram(sim, self.carrier) tmp.initialize(False) # Sets up the PcPtr structures, will refactor later to make this a separate call # create LP decision variables, which are how many passengers to accept for each path class self.lp_vars = {} if self.debug: print(f"********** Sample = {sim.sample}, setup decision variables **********") for path in sim.paths: # .set_filters(carrier=self.carrier): if path.carrier_name != self.carrier: continue if self.debug: print("Path:", path) p_id = f"{path.carrier_name}_{path.orig}_{path.dest}_{path.path_id}" c_name = f"PathRev-{p_id}" rev_c = self.solver.Constraint(0, self.solver.infinity(), c_name) pc_vars = [] sample_vars = defaultdict(list) for pc in path.pathclasses: y_name = "y_" + pc.identifier fare = pc.fare_price pc_dmd = pc.fcst_mean self.lp_vars[y_name] = y = self.solver.NumVar(0, pc_dmd, y_name) if self.debug: print(" Set Obj:", y, fare) self.objective.SetCoefficient(y, 0.0) rev_c.SetCoefficient(y, fare) pc_vars.append(y) # Sample variables rev = fare / self.num_samples for s in range(self.num_samples): sample_demand = max(random.normalvariate(mu=pc_dmd, sigma=pc.fcst_std_dev), 0.0) x_name = f"x_{s}_{pc.identifier}" self.lp_vars[x_name] = x = self.solver.NumVar(0, sample_demand, x_name) sample_vars[s].append(x) if self.debug: print(" Set Obj", x, rev, pc_dmd, round(sample_demand, 2)) self.objective.SetCoefficient(x, rev) if self.debug: print(" Set Constraint", c_name, x, -1.0 * rev) rev_c.SetCoefficient(x, -1.0 * rev) # Nesting constraints # Suppose we have Y0, Y1 & Y2, we do this for each sample: # x_Y2 <= y_Y2 # x_Y2 + x_Y1 <= y_Y2 + y_Y1 # x_Y2 + x_Y1 + x_Y0 <= y_Y2 + y_Y1 + y_Y0 for s in range(self.num_samples): for j in range(len(pc_vars)): nest_name = f"SampleNest-{p_id}-{s}-{j}" if self.debug: print(" Create sample nest constraint:", nest_name) nest_c = self.solver.Constraint(0, self.solver.infinity(), nest_name) for i in range(j, len(pc_vars)): x = sample_vars[s][i] y = pc_vars[i] if self.debug: print(" Coeffs:", x, y) nest_c.SetCoefficient(x, -1.0) nest_c.SetCoefficient(y, 1.0) # Create capacity constraints self.constraints = {} for leg in sim.legs.set_filters(carrier=self.carrier): rem_cap = leg.capacity - leg.sold ct_name = f"Leg-{leg.flt_no}" if self.debug: print("Capacity constraint", ct_name, rem_cap) ct = self.solver.Constraint(0, rem_cap, ct_name) for pc_id in leg.pathclass_identifiers: name = "y_" + pc_id if self.debug: print(" Adding variable", name) ct.SetCoefficient(self.lp_vars[name], 1) self.constraints[leg.flt_no] = ct
[docs] def update(self, eng: SimulationEngine): self.initialize(eng)
[docs] def solve(self, sim: SimulationEngine): status = self.solver.Solve() if self.test_status(status): for leg in sim.legs.set_filters(carrier=self.carrier): dual = self.constraints[leg.flt_no].dual_value() leg.displacement = max(dual, 0) if self.set_bp: leg.bid_price = leg.displacement print(f"Dual for {leg} = {round(dual, 2)}") for k, var in self.lp_vars.items(): if k[0] != "x": print(k, round(var.solution_value(), 2))
# ##########################################################################################
[docs] class LpPiecewise2(LpBase): """ Piecewise Linear Program from Talluri & Van Ryzin (3.8). Formulation and code by Rithvik For each product j and each unit of capacity d = 1, ..., M_j, define variable z_{j,d} in [0, 1] representing the d-th unit of capacity allocated to product j. The objective coefficient is p_j * P(D_j >= d), the marginal expected revenue of that unit. Because P(D_j >= d) is non-increasing in d, the LP automatically fills lower-indexed (higher probability) units first. For continuous normal demand: P(D_j >= d) = 1 - Phi((d - mu_j) / sigma_j) The resulting LP is: max sum_j p_j * sum_{d=1}^{M_j} P(D_j >= d) * z_{j,d} s.t. sum_{j uses leg l} sum_d z_{j,d} <= cap_l - sold_l 0 <= z_{j,d} <= 1 M_j is the minimum remaining capacity across all legs used by product j, which is the tightest feasible upper bound on allocations to that product. The dual of each leg capacity constraint is the displacement cost (bid price), written back to leg.displacement. Parameters ---------- sim : SimulationEngine carrier : str epsilon : float Treat demand mean/sigma below this as effectively zero. _debug : bool """ __slots__ = ( "carrier", "solver", "objective", "lp_vars", "constraints", "epsilon", "set_bp", "debug", ) def __init__( self, carrier: str, epsilon: float = 0.01, _debug: bool = False, ): self.carrier = carrier self.epsilon = epsilon self.set_bp = False self.debug = _debug # Build (or rebuild) the LP for the current simulation state
[docs] def initialize(self, eng: SimulationEngine): """ Construct the LP from current path-class forecasts and remaining leg capacities. Called once per DCP. Implements (3.8): - Variable z_{j,d} in [0,1] for each product j, unit d = 1..M_j - Objective coefficient: p_j * P(D_j >= d) - Capacity constraint per leg: sum of all z_{j,d} for products on that leg <= remaining capacity Requires sim.build_connections() to have been called so that path.num_legs() and path.get_leg_fltno() are populated. """ self.solver = pywraplp.Solver.CreateSolver("GLOP") self.objective = self.solver.Objective() self.objective.SetMaximization() tmp = DynamicProgram(eng, self.carrier) tmp.initialize() # This step sets up the PcPtr structures, will refactor later to make this a separate call self.lp_vars = {} if self.debug: print(f"********** Sample={eng.sample} PNLP setup — carrier={self.carrier} **********") # Collect remaining capacity per leg leg_remaining: dict[int, int] = {} for leg in eng.legs: if leg.carrier_name != self.carrier: continue leg_remaining[leg.flt_no] = int(leg.capacity - leg.sold) _std = NormalDist() # Decision variables: z_{pc_id, d} for d = 1, ..., M_j # # M_j = min remaining capacity over all legs on this path, or the # tightest upper bound on how many seats this product can consume. for path in eng.paths.set_filters(carrier=self.carrier): legs_on_path = [path.get_leg_fltno(i) for i in range(path.num_legs())] M_j = int( min( (leg_remaining[f] for f in legs_on_path if f in leg_remaining), default=0, ) ) for pc in path.pathclasses: fare = pc.decision_fare mu = pc.fcst_mean sigma = pc.fcst_std_dev if self.debug: print(f" PathClass {pc.identifier}: fare={fare:.2f}, mu={mu:.3f}, sigma={sigma:.3f}, M_j={M_j}") for d in range(1, M_j + 1): # P(D_j >= d): probability that demand meets or exceeds # this unit, or the marginal value of accepting one more seat if mu < self.epsilon or sigma < self.epsilon: # prob = 1.0 if d <= mu else 0.0 prob = 0.0 else: prob = 1.0 - _std.cdf((d - mu) / sigma) name = f"{pc.identifier}-{d}" var = self.solver.NumVar(0.0, 1.0, name) self.lp_vars[name] = var self.objective.SetCoefficient(var, fare * prob) if self.debug: print(f" d={d}: P(D>={d})={prob:.4f} coeff={fare * prob:.4f}") # Capacity constraints: one per leg # sum_{j uses leg l} sum_{d=1}^{M_j} z_{j,d} <-- remaining_l self.constraints = {} for leg in eng.legs: if leg.carrier_name != self.carrier: continue ct = self.solver.Constraint(0.0, float(leg_remaining[leg.flt_no]), f"Leg-{leg.flt_no}") self.constraints[leg.flt_no] = ct if self.debug: print(" CAPACITY CONSTRAINTS") for path in eng.paths.set_filters(carrier=self.carrier): legs_on_path = [path.get_leg_fltno(i) for i in range(path.num_legs())] M_j = int( min( (leg_remaining[f] for f in legs_on_path if f in leg_remaining), default=0, ) ) for pc in path.pathclasses: for d in range(1, M_j + 1): name = f"{pc.identifier}-{d}" var = self.lp_vars[name] for flt_no in legs_on_path: if flt_no in self.constraints: self.constraints[flt_no].SetCoefficient(var, 1.0) if self.debug: print(f" Leg {flt_no}: var={name}")
[docs] def update(self, eng: SimulationEngine): self.initialize(eng)
# Solve and write displacement costs (bid prices) back to the engine
[docs] def solve(self, eng: SimulationEngine, debug: bool = False): """ Solve the LP and write dual values (displacement costs / bid prices) back to each leg. """ status = self.solver.Solve() if self.test_status(status): for leg in eng.legs: if leg.carrier_name != self.carrier: continue # Dual of the capacity constraint = displacement cost. # Clamp to zero; tiny negatives can arise from numerical noise. dual = max(self.constraints[leg.flt_no].dual_value(), 0.0) leg.displacement = dual if self.set_bp: leg.bid_price = dual if debug: print(f" Leg {leg.flt_no}: displacement={dual:.4f}")
# ##########################################################################################
[docs] class SSBLP(LpBase): """Stochastic Sales-Based LP, following [Ratliff 2025] Currrently a placeholder, with planned implementation in June 2026""" pass