Source code for passengersim.summaries.tools

from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING

import pandas as pd

if TYPE_CHECKING:
    from . import SimulationTables


[docs] def aggregate_by_concat_dataframe( name: str, ) -> Callable: """Create function to aggregate from summaries by simple concatenation.""" def concat_dataframe( summaries: list[SimulationTables], ) -> pd.DataFrame | None | Exception: frames = [] for s in summaries: frame = getattr(s, f"_raw_{name}", None) if isinstance(frame, Exception): return frame if frame is not None: frames.append(frame) if frames: return pd.concat(frames) return None return concat_dataframe
[docs] def aggregate_by_summing_dataframe( name: str, extra_idxs: list[str] = None, ) -> Callable[[list[SimulationTables]], pd.DataFrame | None]: """Create function to aggregate from summaries by summing.""" def sum_dataframe( summaries: list[SimulationTables], ) -> pd.DataFrame | None | Exception: frames = [] for s in summaries: frame = getattr(s, f"_raw_{name}", None) if isinstance(frame, Exception): return frame if frame is not None: if extra_idxs: frame = frame.set_index(extra_idxs, append=True) frames.append(frame) while len(frames) > 1: frames[0] = frames[0].add(frames.pop(1), fill_value=0) if frames: if extra_idxs: return frames[0].reset_index(extra_idxs) return frames[0] return None return sum_dataframe
[docs] def aggregate_by_averaging_dataframe( name: str, extra_idxs: list[str] = None, ) -> Callable[[list[SimulationTables]], pd.DataFrame | None]: """Create function to aggregate from summaries by summing.""" def avg_dataframe( summaries: list[SimulationTables], ) -> pd.DataFrame | None | Exception: frames = [] for s in summaries: frame = getattr(s, f"_raw_{name}", None) if isinstance(frame, Exception): return frame if frame is not None: if extra_idxs: frame = frame.set_index(extra_idxs, append=True) frames.append(frame) n = len(frames) while len(frames) > 1: frames[0] = frames[0].add(frames.pop(1), fill_value=0) if frames: result = frames[0] / n if extra_idxs: result = result.reset_index(extra_idxs) return result return None return avg_dataframe
[docs] def aggregate_by_first_dataframe( name: str, ) -> Callable[[list[SimulationTables]], pd.DataFrame | None]: """Create function to aggregate from summaries by taking the first.""" def first_dataframe( summaries: list[SimulationTables], ) -> pd.DataFrame | None | Exception: for s in summaries: frame = getattr(s, f"_raw_{name}", None) if frame is not None: return frame return None return first_dataframe
[docs] def total_sum_of_squares(mu, sigma, n): return (mu**2 + (sigma**2) * ((n - 1) / (n))) * (n)
[docs] def total_sum(mu, n): return mu * n
[docs] def combine_sigmas(sigma, sigma2, mu, mu2, n, n2, ddof=0): nn = n + n2 mean_sq = (total_sum_of_squares(mu, sigma, n) + total_sum_of_squares(mu2, sigma2, n2)) / nn sq_mean = ((total_sum(mu, n) + total_sum(mu2, n2)) / (nn)) ** 2 adj = nn / (nn - ddof) raw = mean_sq - sq_mean return raw * adj
[docs] def break_on_integer(s: pd.Series, breakpoints: tuple[int, ...], minimum=0, maximum=100, result_name=None): """Break a series into categories based on integer breakpoints. Parameters ---------- s : pd.Series The series to break into categories. breakpoints : tuple of int The breakpoints for the categories. If the first breakpoint is less than the minimum value it is moved up, and if the last breakpoint is greater than the maximum value it is moved down. minimum : int, default 0 The minimum value for the series. Values less than this will be assigned to the first category. maximum : int, default 100 The maximum value for the series. Values greater than this will be assigned to the last category. result_name : str, default None The name to assign to the resulting series. Returns ------- pd.Series A series with categorical values. """ if not isinstance(breakpoints, tuple): breakpoints = tuple(breakpoints) if breakpoints[0] <= minimum: breakpoints = (minimum - 1,) + breakpoints[1:] else: breakpoints = (minimum - 1,) + breakpoints if breakpoints[-1] >= maximum + 1: breakpoints = breakpoints[:-1] + (maximum + 1,) else: breakpoints = breakpoints + (maximum + 1,) # Create labels for categories def make_label(i, j): if i == j - 1: return f"{i}" else: return f"{i}-{j - 1}" labels = [make_label(0, breakpoints[1])] for i in range(1, len(breakpoints) - 2): labels += [make_label(breakpoints[i], breakpoints[i + 1])] if breakpoints[-2] < 100: labels += [make_label(breakpoints[-2], maximum + 1)] else: labels += [str(maximum)] breaker = pd.cut( s, bins=breakpoints, right=False, labels=labels, ) if result_name: breaker = breaker.rename(result_name) return breaker