Source code for passengersim.database.common_queries

import logging
import sqlite3
from typing import Literal

import numpy as np
import pandas as pd

from .database import Database

logger = logging.getLogger("passengersim.database")


[docs] def fare_class_mix(cnx: Database, scenario: str, burn_samples: int = 100) -> pd.DataFrame: """ Fare class mix by carrier. This query requires that the simulation was run while recording final fare details (i.e. with the `fare` or `fare_final` flags set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The average total demand will be computed ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `carrier` and `booking_class`, and has these columns: - `avg_sold`: Average number of sales in this booking class. - `avg_revenue`: Average total revenue earned from customers booking in this booking class. - `avg_price`: Average price per ticket from customers booking in this booking class. """ pre_qry = """ CREATE TABLE IF NOT EXISTS fare_summary AS SELECT trial, sample, scenario, carrier, booking_class, SUM(sold) AS sold, SUM(sold * price) AS revenue FROM fare_detail LEFT JOIN fare_defs USING (fare_id) WHERE days_prior = 0 GROUP BY trial, sample, scenario, carrier, booking_class """ cnx.execute(pre_qry) cnx._commit_raw() qry = """ SELECT carrier, booking_class, (AVG(sold)) AS avg_sold, (AVG(revenue)) AS avg_revenue, (AVG(revenue) / AVG(sold)) AS avg_price FROM fare_summary WHERE sample >= ?2 AND scenario = ?1 GROUP BY carrier, booking_class ORDER BY carrier, booking_class; """ return cnx.dataframe(qry, (scenario, burn_samples)).set_index(["carrier", "booking_class"])
[docs] def od_fare_class_mix(cnx: Database, orig: str, dest: str, scenario: str, burn_samples: int = 100) -> pd.DataFrame: """ Fare class mix by carrier for a particular origin-destination market. This query requires that the simulation was run while recording final fare details (i.e. with the `fare` or `fare_final` flags set on `Config.db.write_items`). Parameters ---------- cnx : Database orig, dest : str Origin and destination to query. scenario : str burn_samples : int, default 100 The average total demand will be computed ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `carrier` and `booking_class`, and has these columns: - `avg_sold`: Average number of sales in this booking class. - `avg_revenue`: Average total revenue earned from customers booking in this booking class. - `avg_price`: Average price per ticket from customers booking in this booking class. """ qry = """ SELECT carrier, booking_class, (AVG(sold)) AS avg_sold, (AVG(revenue)) AS avg_revenue, (AVG(revenue) / AVG(sold)) AS avg_price FROM ( SELECT trial, scenario, carrier, booking_class, SUM(sold) AS sold, SUM(sold * price) AS revenue, COUNT(*) AS nobs FROM fare_detail LEFT JOIN fare_defs USING (fare_id) WHERE days_prior = 0 AND sample >= ?2 AND scenario = ?1 AND orig = ?3 AND dest = ?4 GROUP BY trial, sample, carrier, booking_class ) tmp GROUP BY carrier, booking_class ORDER BY carrier, booking_class; """ return cnx.dataframe(qry, (scenario, burn_samples, orig, dest)).set_index(["carrier", "booking_class"])
[docs] def load_factors(cnx: Database, scenario: str, burn_samples: int = 100) -> pd.DataFrame: qry = """ SELECT carrier, ROUND(AVG(sold)) AS avg_legs_sold, ROUND(AVG(100.0 * sold / cap), 2) AS avg_leg_lf, ROUND(AVG(100.0 * rpm / asm), 2) AS sys_lf, ROUND(AVG(revenue), 2) AS avg_rev, ROUND(AVG(revenue / asm), 3) AS yield, ROUND(AVG(revenue) / AVG(sold)) AS avg_leg_price, COUNT(*) AS n_obs FROM (SELECT trial, sample, carrier, SUM(sold) AS sold, SUM(capacity) AS cap, SUM(sold * distance) AS rpm, SUM(capacity * distance) AS asm, SUM(revenue) AS revenue FROM leg_detail JOIN leg_defs USING (leg_id) WHERE days_prior = 0 AND sample >= ?2 AND scenario = ?1 GROUP BY trial, sample, carrier ) tmp GROUP BY carrier """ return cnx.dataframe(qry, (scenario, burn_samples))
[docs] def load_factor_distribution( cnx: Database, scenario: str, burn_samples: int = 100, cutoffs=(0.5, 0.6, 0.7, 0.8, 0.85, 0.9, 0.95), ) -> pd.DataFrame: cutoffs = sorted([float(j) for j in cutoffs]) if 0.0 not in cutoffs: cutoffs = [0.0] + cutoffs if 1.0 not in cutoffs: cutoffs = cutoffs + [1.0] vars = [] for i in range(len(cutoffs) - 2): vars.append( f"count(CASE WHEN lf>= {cutoffs[i]} AND lf < {cutoffs[i + 1]} THEN 1 END) " f"AS '{cutoffs[i]} - {cutoffs[i + 1]}'" ) vars.append( f"count(CASE WHEN lf>= {cutoffs[-2]} AND lf <= {cutoffs[-1]} THEN 1 END) AS '{cutoffs[-2]} - {cutoffs[-1]}'" ) vars = ",\n ".join(vars) qry = f""" SELECT carrier, {vars} FROM ( SELECT carrier, sold, capacity, (1.0*sold)/capacity AS lf FROM leg_detail JOIN leg_defs USING (leg_id) WHERE days_prior = 0 -- only after all sales are recorded AND sample >= ?2 -- only after burn period AND scenario = ?1 -- only for this scenario ) GROUP BY carrier """ return cnx.dataframe(qry, (scenario, burn_samples))
[docs] def total_demand(cnx: Database, scenario: str, burn_samples: int = 100) -> float: """ Average total demand. This query requires that the simulation was run while recording final demand details (i.e. with the `demand` or `demand_final` flags set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The average total demand will be computed ignoring this many samples from the beginning of each trial. Returns ------- float """ qry = """ SELECT AVG(sample_demand) FROM ( SELECT SUM(sample_demand) AS sample_demand FROM demand_detail WHERE days_prior = 0 AND sample >= ?2 AND scenario = ?1 GROUP BY trial, sample) tmp; """ return cnx.dataframe(qry, (scenario, burn_samples)).iloc[0, 0]
[docs] def bookings_by_timeframe( cnx: Database, scenario: str, from_fare_detail: bool = False, burn_samples: int = 100, ) -> pd.DataFrame: """ Average bookings and revenue by carrier, booking class, and timeframe. This query requires that the simulation was run while recording supporting details (i.e. with the `bookings` or `fare` flags set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str from_fare_detail : bool, default False Reconstruct this table from the `fare_detail` table. This is generally slower than accessing the `bookings` table directly, and also requires substantially more data to have been saved into the database by setting the `fare` flag on `Config.db.write_items` burn_samples : int, default 100 The bookings will be computed ignoring this many samples from the beginning of each trial. This argument is nominally ignored by this query unless `from_fare_detail` is true, although the simulator will have already ignored the burned samples when storing the data in the bookings table. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `trial`, `carrier`, `class`, and `days_prior`, and has these columns: - `avg_sold`: Average number of sales. - `avg_business`: Average number of sales to passengers in the business segment. - `avg_leisure`: Average number of sales to leisure passengers. - `avg_revenue`: Average total revenue earned from customers booking in this booking class in this time period. - `avg_price`: Average price per ticket from customers booking in this booking class in this time period """ qry_fare = """ SELECT trial, carrier, booking_class, days_prior, (AVG(sold)) AS avg_sold, (AVG(sold_business)) AS avg_business, (AVG(sold_leisure)) AS avg_leisure, (AVG(revenue)) AS avg_revenue, (AVG(revenue) / AVG(sold)) AS avg_price, (SUM(sold)) AS tot_sold FROM (SELECT trial, scenario, carrier, booking_class, days_prior, SUM(sold) AS sold, SUM(sold_business) AS sold_business, SUM(sold - sold_business) AS sold_leisure, SUM(sold * price) AS revenue FROM fare_detail LEFT JOIN fare_defs USING (fare_id) WHERE sample >= ?2 AND scenario = ?1 GROUP BY trial, sample, carrier, booking_class, days_prior) a GROUP BY carrier, booking_class, days_prior, trial ORDER BY carrier, booking_class, days_prior, trial; """ if from_fare_detail: return cnx.dataframe(qry_fare, (scenario, burn_samples)).set_index( ["trial", "carrier", "booking_class", "days_prior"] ) qry_bookings = """ SELECT trial, carrier, booking_class, days_prior, avg_sold, avg_business, avg_leisure, avg_revenue, avg_price FROM bookings_by_timeframe WHERE scenario = ?1 GROUP BY carrier, booking_class, days_prior, trial ORDER BY carrier, booking_class, days_prior, trial; """ return cnx.dataframe(qry_bookings, (scenario,)).set_index(["trial", "carrier", "booking_class", "days_prior"])
[docs] def leg_forecasts(cnx: Database, *, scenario: str = None, burn_samples: int = 100) -> pd.DataFrame: """ Average forecasts of demand by leg, bucket, and days to departure. This query requires that the simulation was run while recording leg bucket details (i.e. with the `bucket` flag set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The forecasts will be analyzed ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `carrier`, `leg_id`, `bucket_number`, `booking_class` and `days_prior`, and has these columns: - `forecast_mean`: Average forecast mean (mu). - `forecast_stdev`: Average forecast standard deviation (sigma). - `forecast_closed_in_tf`: Average fraction of time the timeframe was closed in the data used to make a forecast. - `forecast_closed_in_tf`: Average fraction of time any future timeframe was closed in the data used to make a forecast. """ qry = """ SELECT carrier, leg_id, bucket_number, name as booking_class, days_prior, AVG(forecast_mean) as forecast_mean, AVG(forecast_stdev) as forecast_stdev, AVG(forecast_closed_in_tf) as forecast_closed_in_tf, AVG(forecast_closed_in_future) as forecast_closed_in_future FROM leg_bucket_detail LEFT JOIN leg_defs USING (leg_id) WHERE sample >= ?1 AND scenario = ?2 GROUP BY carrier, leg_id, bucket_number, name, days_prior """ if scenario is None: qry = qry.replace("AND scenario = ?2", "") params = (burn_samples,) else: params = (burn_samples, scenario) return cnx.dataframe(qry, params).set_index(["carrier", "leg_id", "bucket_number", "booking_class", "days_prior"])
def _leg_bucket_trace( target_cols: str, cnx: "Database", scenario: str | None = None, burn_samples: int = 100, carrier: str | None = None, leg_id: int | None = None, booking_class: str | None = None, days_prior: int | None = None, ) -> pd.DataFrame: """ Recorded forecast of demand by leg. This query requires that the simulation was run while recording leg bucket details (i.e. with the `bucket` flag set on `Config.db.write_items`). This function is provided primarily for testing and debugging purposes. Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The forecasts will be analyzed ignoring this many samples from the beginning of each trial. carrier : str, optional If provided, only return forecasts for this carrier. leg_id : int, optional If provided, only return forecasts for this leg. booking_class : str, optional If provided, only return forecasts for this booking class. days_prior : int, optional If provided, only return forecasts for this many days prior to departure. Returns ------- pandas.DataFrame The resulting dataframe is indexed by any of `carrier`, `leg_id`, `booking_class`, and/or `days_prior` that were not filtered, and has these columns: - `forecast_mean`: Forecast mean (mu). - `forecast_stdev`: Forecast standard deviation (sigma). - `forecast_closed_in_tf`: Fraction of time the timeframe was closed in the data used to make a forecast. - `forecast_closed_in_tf`: Fraction of time any future timeframe was closed in the data used to make a forecast. """ conditions = [] indexers = [] columns = [] if scenario is not None: conditions.append("scenario = @scenario") if burn_samples is not None: conditions.append("sample >= @burn_samples") if carrier is not None: conditions.append("carrier = @carrier") elif leg_id is None: indexers.append("carrier") columns.append("carrier") if leg_id is not None: conditions.append("leg_id = @leg_id") else: indexers.append("leg_id") columns.append("leg_id") if booking_class is not None: conditions.append("name = @booking_class") else: indexers.append("booking_class") columns.append("name as booking_class") if days_prior is not None: conditions.append("days_prior = @days_prior") else: indexers.append("days_prior") columns.append("days_prior") qry = """ SELECT {cols}{comma} sample, {target_cols} FROM leg_bucket_detail LEFT JOIN leg_defs USING (leg_id) WHERE {conds} """.format( target_cols=target_cols, cols=", ".join(columns), conds=" AND ".join(conditions), comma="," if columns else "", ) return cnx.dataframe( qry, dict( scenario=scenario, burn_samples=burn_samples, carrier=carrier, leg_id=leg_id, booking_class=booking_class, days_prior=days_prior, ), ).set_index(indexers + ["sample"])
[docs] def leg_forecast_trace( cnx: "Database", scenario: str | None = None, burn_samples: int = 100, carrier: str | None = None, leg_id: int | None = None, booking_class: str | None = None, days_prior: int | None = None, ) -> pd.DataFrame: """ Recorded forecast of demand by leg. This query requires that the simulation was run while recording leg bucket details (i.e. with the `bucket` flag set on `Config.db.write_items`). This function is provided primarily for testing and debugging purposes. Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The forecasts will be analyzed ignoring this many samples from the beginning of each trial. carrier : str, optional If provided, only return forecasts for this carrier. leg_id : int, optional If provided, only return forecasts for this leg. booking_class : str, optional If provided, only return forecasts for this booking class. days_prior : int, optional If provided, only return forecasts for this many days prior to departure. Returns ------- pandas.DataFrame The resulting dataframe is indexed by any of `carrier`, `leg_id`, `booking_class`, and/or `days_prior` that were not filtered, and has these columns: - `forecast_mean`: Forecast mean (mu). - `forecast_stdev`: Forecast standard deviation (sigma). - `forecast_closed_in_tf`: Fraction of time the timeframe was closed in the data used to make a forecast. - `forecast_closed_in_tf`: Fraction of time any future timeframe was closed in the data used to make a forecast. """ return _leg_bucket_trace( "forecast_mean, forecast_stdev, forecast_closed_in_tf, forecast_closed_in_future", cnx, scenario=scenario, burn_samples=burn_samples, carrier=carrier, leg_id=leg_id, booking_class=booking_class, days_prior=days_prior, )
[docs] def leg_sales_trace( cnx: "Database", scenario: str | None = None, burn_samples: int = 100, carrier: str | None = None, leg_id: int | None = None, booking_class: str | None = None, days_prior: int | None = None, ) -> pd.DataFrame: """ Recorded forecast of demand by leg. This query requires that the simulation was run while recording leg bucket details (i.e. with the `bucket` flag set on `Config.db.write_items`). This function is provided primarily for testing and debugging purposes. Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The forecasts will be analyzed ignoring this many samples from the beginning of each trial. carrier : str, optional If provided, only return forecasts for this carrier. leg_id : int, optional If provided, only return forecasts for this leg. booking_class : str, optional If provided, only return forecasts for this booking class. days_prior : int, optional If provided, only return forecasts for this many days prior to departure. Returns ------- pandas.DataFrame The resulting dataframe is indexed by any of `carrier`, `leg_id`, `booking_class`, and/or `days_prior` that were not filtered, and has these columns: `sold`, `revenue`, `auth` """ return _leg_bucket_trace( "sold, revenue, auth", cnx, scenario=scenario, burn_samples=burn_samples, carrier=carrier, leg_id=leg_id, booking_class=booking_class, days_prior=days_prior, )
[docs] def path_forecasts(cnx: Database, *, scenario: str = None, burn_samples: int = 100) -> pd.DataFrame: """ Average forecasts of demand by path, class, and days to departure. This query requires that the simulation was run while recording path-class details (i.e. with the `pathclass` flag set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The forecasts will be analyzed ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `path_id`, `booking_class` and `days_prior`, and has these columns: - `forecast_mean`: Average forecast mean (mu). - `forecast_stdev`: Average forecast standard deviation (sigma). - `forecast_closed_in_tf`: Average fraction of time the timeframe was closed in the data used to make a forecast. - `forecast_closed_in_tf`: Average fraction of time any future timeframe was closed in the data used to make a forecast. """ qry = """ SELECT path_id, booking_class, days_prior, AVG(forecast_mean) as forecast_mean, AVG(forecast_stdev) as forecast_stdev, AVG(forecast_closed_in_tf) as forecast_closed_in_tf, AVG(forecast_closed_in_future) as forecast_closed_in_future, AVG(adjusted_price) as adjusted_price FROM path_class_detail WHERE sample >= ?1 AND scenario = ?2 GROUP BY path_id, booking_class, days_prior """ if scenario is None: qry = qry.replace("AND scenario = ?2", "") params = (burn_samples,) else: params = (burn_samples, scenario) return cnx.dataframe(qry, params).set_index(["path_id", "booking_class", "days_prior"])
[docs] def demand_to_come(cnx: Database, *, scenario: str = None, burn_samples: int = 100) -> pd.DataFrame: """ Demand by market and timeframe across each sample. This query delivers sample-by-sample timeframe demand results for the various markets (origin, destination, passenger type) in the simulation. It requires that the simulation was run while recording demand details (i.e. with the `demand` flag set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str, optional burn_samples : int, default 100 The demand will be returned ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `iteration`, `trial`, `sample`, `segment`, `orig`, and `dest`; and has columns defined by the DCPs. The values stored are the total remaining demand to come at each DCP. """ # Provides content similar to PODS *.DHS output file, but with market level detail qry = """ SELECT iteration, trial, sample, segment, orig, dest, days_prior, sold, no_go, (round(sample_demand) - sold - no_go) AS future_demand FROM demand_detail WHERE sample >= ?1 AND scenario = ?2 """ if scenario is None: qry = qry.replace("AND scenario = ?2", "") params = (burn_samples,) else: params = (burn_samples, scenario) dmd = cnx.dataframe(qry, params, dtype={"future_demand": np.int32}) dhs = ( dmd.set_index(["iteration", "trial", "sample", "segment", "orig", "dest", "days_prior"])["future_demand"] .unstack("days_prior") .sort_values(by="days_prior", axis=1, ascending=False) ) return dhs
[docs] def demand_to_come_summary(cnx: Database, scenario: str, burn_samples: int = 100) -> pd.DataFrame: """ Demand by market and timeframe across each sample. This query delivers sample-by-sample timeframe demand results for the various markets (origin, destination, passenger type) in the simulation. It requires that the simulation was run while recording demand details (i.e. with the `demand` flag set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The demand will be returned ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `iteration`, `trial`, `sample`, `segment`, `orig`, and `dest`; and has columns defined by the DCPs. The values stored are the total remaining demand to come at each DCP. """ # Provides content similar to PODS *.DHS output file, but with market level detail qry = """ CREATE TABLE IF NOT EXISTS demand_to_come_summary AS WITH tmp_demand_summary AS ( SELECT scenario, iteration, trial, sample, segment, days_prior, SUM(round(sample_demand) - sold - no_go) AS future_demand FROM demand_detail GROUP BY scenario, iteration, trial, sample, segment, days_prior ) SELECT scenario, segment, days_prior, AVG(future_demand) as mean_future_demand, STDEV(future_demand) as stdev_future_demand FROM tmp_demand_summary WHERE sample >= ?1 GROUP BY segment, days_prior ORDER BY segment, days_prior DESC """ cnx.execute(qry, (burn_samples,)) cnx._commit_raw() qry = """ SELECT segment, days_prior, mean_future_demand, stdev_future_demand FROM demand_to_come_summary WHERE scenario = ?1 """ dmd = cnx.dataframe(qry, (scenario,)) dhs = dmd.set_index(["segment", "days_prior"]) return dhs
[docs] def carrier_history(cnx: Database, *, scenario: str = None, burn_samples: int = 100) -> pd.DataFrame: """ Sample-level details of carrier-level measures. This query delivers sample-by-sample aggregated summary results for the various carriers in the simulation. It requires that the simulation was run while recording leg bucket details (i.e. with the `bucket` flag set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str, optional burn_samples : int, default 100 The history will be returned ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `iteration`, `trial` and `sample`, and columns defined with a two-level MultiIndex. The second level of the columns MultiIndex represents the carriers, while the top level includes these columns: - `forecast_mean`: Forecast mean (mu) at the beginning of the booking curve, summed over all this carrier's legs in this sample. - `forecast_stdev`: Forecast standard deviation (sigma) at the beginning of the booking curve, aggregated over all this carrier's legs in this sample. - `sold`: Total bookings accepted by this carrier in this sample. - `revenue`: Total revenue for this carrier in this sample. """ # Provides content similar to PODS *.HST output file. qry_params = {"burn_samples": burn_samples} if scenario is not None: qry_params["scenario"] = scenario max_days_prior = int( cnx.dataframe( """ SELECT max(days_prior) FROM leg_bucket_detail WHERE scenario == ?1 """, (scenario,), ).iloc[0, 0] ) else: max_days_prior = int( cnx.dataframe( """ SELECT max(days_prior) FROM leg_bucket_detail """, ).iloc[0, 0] ) qry_params["max_days_prior"] = max_days_prior qry1 = """ SELECT iteration, trial, sample, carrier, sum(forecast_mean) as forecast_mean, sqrt(sum(forecast_stdev*forecast_stdev)) as forecast_stdev FROM leg_bucket_detail LEFT JOIN leg_defs USING (leg_id) WHERE days_prior == @max_days_prior AND scenario == @scenario AND sample >= @burn_samples GROUP BY iteration, trial, sample, carrier """ if scenario is None: qry1 = qry1.replace("AND scenario == @scenario", "") bd1 = cnx.dataframe(qry1, qry_params).set_index(["iteration", "trial", "sample", "carrier"]) qry2 = """ SELECT iteration, trial, sample, carrier, sum(sold) as sold, sum(revenue) as revenue FROM leg_bucket_detail LEFT JOIN leg_defs USING (leg_id) WHERE days_prior == 0 AND scenario == @scenario AND sample >= @burn_samples GROUP BY iteration, trial, sample, carrier """ if scenario is None: qry2 = qry2.replace("AND scenario == @scenario", "") bd2 = cnx.dataframe(qry2, qry_params).set_index(["iteration", "trial", "sample", "carrier"]) return pd.concat([bd1, bd2], axis=1).unstack("carrier")
[docs] def bid_price_history( cnx: Database, scenario: str, burn_samples: int = 100, weighting: Literal["equal", "capacity"] = "equal", ) -> pd.DataFrame: """ Compute average bid price history over all legs for each carrier. This query requires that the simulation was run while recording leg details (i.e. with the `leg` flag set on `Config.db.write_items`), including bid prices. Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The bid prices will be analyzed ignoring this many samples from the beginning of each trial. weighting : {'equal', 'capacity'}, default 'equal' How to weight the bid prices. If 'equal', then each leg is weighted equally. If 'capacity', then each leg is weighted by its total capacity. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `carrier` and `days_prior`, and has these columns: - `bid_price_mean`: Average bid price across all samples and all legs - `bid_price_stdev`: Sample standard deviation of bid prices across all samples and all legs - `fraction_some_cap`: Fraction of all legs across all samples that have non-zero capacity available for sale. - `fraction_zero_cap`: Fraction of all legs across all samples that have zero capacity available for sale. Bid prices are computed for these legs but are not really meaningful. - `some_cap_bid_price_mean`: Average bid price across all samples and all legs conditional on the leg having non-zero capacity. - `some_cap_bid_price_stdev`: Sample standard deviation of bid prices across all samples and all legs conditional on the leg having non-zero capacity. """ if weighting not in ("equal", "capacity"): raise ValueError(f"unknown weighting {weighting}") preqry = """ CREATE TABLE IF NOT EXISTS bid_price_general_summary AS SELECT scenario, carrier, days_prior, avg(bid_price) as bid_price_mean, stdev(bid_price) as bid_price_stdev, avg(CASE WHEN leg_detail.sold < leg_defs.capacity THEN 1.0 ELSE 0.0 END) as fraction_some_cap, avg(CASE WHEN leg_detail.sold < leg_defs.capacity THEN 0.0 ELSE 1.0 END) as fraction_zero_cap FROM leg_detail LEFT JOIN leg_defs ON leg_detail.leg_id = leg_defs.leg_id WHERE sample >= ?1 GROUP BY carrier, days_prior """ cnx.execute(preqry, (burn_samples,)) try: cnx._commit_raw() except sqlite3.OperationalError: preqry = preqry.replace("CREATE TABLE IF NOT EXISTS", "CREATE TEMP TABLE IF NOT EXISTS") cnx.execute(preqry, (burn_samples,)) qry = """ SELECT carrier, days_prior, bid_price_mean, bid_price_stdev, fraction_some_cap, fraction_zero_cap FROM bid_price_general_summary WHERE scenario == ?1 """ bph = cnx.dataframe( qry, (scenario,), ) preqry2 = """ CREATE TABLE IF NOT EXISTS bid_price_somecap_summary AS SELECT scenario, carrier, days_prior, avg(bid_price) as some_cap_bid_price_mean_unweighted, stdev(bid_price) as some_cap_bid_price_stdev, (SUM(bid_price * leg_defs.capacity) / SUM(leg_defs.capacity)) as some_cap_bid_price_mean_capweighted FROM leg_detail LEFT JOIN leg_defs ON leg_detail.leg_id = leg_defs.leg_id WHERE sample >= ?1 AND leg_detail.sold < leg_defs.capacity GROUP BY carrier, days_prior """ cnx.execute(preqry2, (burn_samples,)) try: cnx._commit_raw() except sqlite3.OperationalError: preqry2 = preqry2.replace("CREATE TABLE IF NOT EXISTS", "CREATE TEMP TABLE IF NOT EXISTS") cnx.execute(preqry2, (burn_samples,)) qry2 = """ SELECT carrier, days_prior, some_cap_bid_price_mean_unweighted, some_cap_bid_price_stdev, some_cap_bid_price_mean_capweighted FROM bid_price_somecap_summary WHERE scenario == ?1 """ bph_some_cap = cnx.dataframe( qry2, (scenario,), ).set_index(["carrier", "days_prior"]) bph = bph.set_index(["carrier", "days_prior"]).join(bph_some_cap) bph = bph.sort_index(ascending=(True, False)) if weighting == "equal": bph["some_cap_bid_price_mean"] = bph["some_cap_bid_price_mean_unweighted"] elif weighting == "capacity": bph["some_cap_bid_price_mean"] = bph["some_cap_bid_price_mean_capweighted"] else: raise ValueError(f"unknown weighting {weighting}") return bph
[docs] def displacement_history( cnx: Database, scenario: str, burn_samples: int = 100, ) -> pd.DataFrame: """ Compute average displacement cost history over all legs for each carrier. This query requires that the simulation was run while recording leg details (i.e. with the `leg` flag set on `Config.db.write_items`), including displacement costs. Parameters ---------- cnx : Database scenario : str burn_samples : int, default 100 The bid prices will be analyzed ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `carrier` and `days_prior`, and has these columns: - `displacement_mean`: Average displacement cost across all samples and all legs - `displacement_stdev`: Sample standard deviation of displacement cost across all samples and all legs """ preqry = """ CREATE TABLE IF NOT EXISTS displacement_summary AS SELECT scenario, carrier, days_prior, avg(displacement) as displacement_mean, stdev(displacement) as displacement_stdev FROM leg_detail LEFT JOIN leg_defs ON leg_detail.leg_id = leg_defs.leg_id WHERE sample >= ?1 GROUP BY scenario, carrier, days_prior ORDER BY carrier, days_prior DESC """ cnx.execute(preqry, (burn_samples,)) try: cnx._commit_raw() except sqlite3.OperationalError: preqry = preqry.replace("CREATE TABLE IF NOT EXISTS", "CREATE TEMP TABLE IF NOT EXISTS") cnx.execute(preqry, (burn_samples,)) qry = """ SELECT carrier, days_prior, displacement_mean, displacement_stdev FROM displacement_summary WHERE scenario == ?1 """ df = cnx.dataframe( qry, (scenario,), ) df = df.set_index(["carrier", "days_prior"]) return df
[docs] def leg_detail( cnx: Database, scenario: str, burn_samples: int = 100, ) -> pd.DataFrame: """ Dump leg_detail into a dataframe """ qry = """ SELECT l.trial, l.sample, l.leg_id, d.carrier, d.flt_no, d.orig, d.dest, l.days_prior, l.sold, l.revenue, l.local_sold, l.local_revenue, l.bid_price FROM leg_detail l JOIN leg_defs d USING (leg_id) """ df = cnx.dataframe( qry, # (scenario,), ) # print(f"Dataframe has {len(df)} rows, scenario = {scenario}") # df = df.set_index(["trial", "sample", "flt_no", "orig", "dest", "days_prior"]) return df
[docs] def local_and_flow_yields(cnx: Database, *, scenario: str = None, burn_samples: int = 100) -> pd.DataFrame: """ Compute yields for local (nonstop) and flow (connecting) passengers. This query requires that the simulation was run while recording path class details (i.e. with the `pathclass` or `pathclass_final` flags set on `Config.db.write_items`). Parameters ---------- cnx : Database scenario : str, optional burn_samples : int, default 100 The yields will be computed ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame """ qry = """ WITH path_yields AS ( SELECT iteration, trial, sample, path_id, leg1, leg2, SUM(sold) as total_sold, SUM(revenue) as total_revenue, distance, SUM(revenue) / (SUM(sold) * distance) AS yield, leg2 IS NULL AS local FROM path_class_detail LEFT JOIN path_defs USING (path_id) WHERE days_prior == 0 AND sample >= ?1 AND scenario == ?2 GROUP BY path_id ) SELECT leg_id, carrier, orig, dest, capacity, leg_defs.distance, yield AS local_yield, CAST(total_sold AS REAL) / (total_sold + IFNULL(f1.flow_sold, 0) + IFNULL(f2.flow_sold, 0)) AS local_fraction, (IFNULL(f1.flow_revenue, 0) + IFNULL(f2.flow_revenue, 0)) / (IFNULL(f1.flow_rpm, 0) + IFNULL(f2.flow_rpm, 0)) AS flow_yield FROM leg_defs LEFT JOIN path_yields locals ON locals.leg1 == leg_id AND locals.leg2 IS NULL LEFT JOIN ( SELECT leg1, SUM(total_sold) AS flow_sold, SUM(total_revenue) AS flow_revenue, SUM(total_sold * distance) AS flow_rpm FROM path_yields WHERE leg2 IS NOT NULL GROUP BY leg1 ) f1 ON f1.leg1 == leg_defs.leg_id LEFT JOIN ( SELECT leg2, SUM(total_sold) AS flow_sold, SUM(total_revenue) AS flow_revenue, SUM(total_sold * distance) AS flow_rpm FROM path_yields GROUP BY leg2 ) f2 ON f2.leg2 == leg_defs.leg_id """ if scenario is None: qry = qry.replace("AND scenario == ?2", "") params = (burn_samples,) else: params = (burn_samples, scenario) df = cnx.dataframe(qry, params) return df
[docs] def leg_local_and_flow_by_class(cnx: Database, scenario: str, burn_samples: int = 100) -> pd.DataFrame: logger.info("creating pthcls temp table") cnx.execute( """ CREATE TEMP TABLE IF NOT EXISTS pthcls AS SELECT sold, leg1, booking_class, iteration, trial, sample FROM path_class_detail LEFT JOIN path_defs USING(path_id) WHERE days_prior == 0 AND leg2 IS NULL AND scenario == ?1 AND sample >= ?2 """, ( scenario, burn_samples, ), ) logger.info("indexing pthcls temp table") cnx.execute( """ CREATE INDEX IF NOT EXISTS idx_pthcls_1 ON pthcls ( iteration, trial, sample, leg1, booking_class ); """ ) logger.info("running leg_local_and_flow_by_class query") qry = """ SELECT leg_id, leg_defs.carrier, leg_defs.orig, leg_defs.dest, leg_bucket_detail.name as booking_class, AVG(leg_bucket_detail.sold) AS carried_all, IFNULL(AVG(pthcls.sold), 0) AS carried_loc FROM leg_bucket_detail LEFT JOIN leg_defs USING (leg_id) LEFT JOIN pthcls ON leg_id == pthcls.leg1 AND leg_bucket_detail.name == pthcls.booking_class AND leg_bucket_detail.iteration == pthcls.iteration AND leg_bucket_detail.trial == pthcls.trial AND leg_bucket_detail.sample == pthcls.sample WHERE leg_bucket_detail.scenario == ?1 AND leg_bucket_detail.sample >= ?2 AND days_prior == 0 GROUP BY leg_id, leg_defs.carrier, leg_defs.orig, leg_defs.dest, leg_bucket_detail.name """ df = cnx.dataframe( qry, ( scenario, burn_samples, ), ) return df
[docs] def edgar(cnx: Database, *, scenario: str = None, burn_samples: int = 100) -> pd.DataFrame: """ Forecast accuracy information. Parameters ---------- cnx : Database scenario : str, optional burn_samples : int, default 100 The demand will be returned ignoring this many samples from the beginning of each trial. Returns ------- pandas.DataFrame The resulting dataframe is indexed by `iteration`, `trial`, `sample`, `segment`, `orig`, and `dest`; and has columns defined by the DCPs. The values stored are the total remaining demand to come at each DCP. """ # Provides content similar to PODS *.DHS output file, but with market level detail qry = """ SELECT iteration, trial, sample, timeframe, path_id, booking_class, sold, sold_priceable, forecast_mean, forecast_stdev, closed FROM edgar WHERE sample >= ?1 AND scenario = ?2 """ if scenario is None: qry = qry.replace("AND scenario = ?2", "") params = (burn_samples,) else: params = (burn_samples, scenario) e = cnx.dataframe(qry, params) # , dtype={"edgar": np.int32}) return e