Source code for passengersim.tracers.welford

from __future__ import annotations

from typing import Any

import numpy as np


[docs] class Welford: def __init__(self): self._n = 0 self._mean = 0 self._mean2 = 0
[docs] def update(self, x: np.typing.ArrayLike) -> None: self._n += 1 if isinstance(x, tuple | list): x = np.array(x) delta = x - self._mean self._mean += delta / self._n self._mean2 += delta * (x - self._mean)
@property def mean(self) -> np.typing.ArrayLike: return self._mean @property def variance(self) -> np.typing.ArrayLike: if self._n < 2: return np.nan return self._mean2 / self._n @property def std_dev(self) -> np.typing.ArrayLike: return np.sqrt(self.variance) @property def sample_variance(self) -> np.typing.ArrayLike: if self._n < 2: return np.nan return self._mean2 / (self._n - 1) @property def sample_std_dev(self) -> np.typing.ArrayLike: return np.sqrt(self.sample_variance) @property def n(self) -> int: return self._n
[docs] class MultiWelford:
[docs] def __init__(self, keys: list[str], aux: dict[str, Any] | set[str] = None): """ Initialize a MultiWelford object. Parameters ---------- keys : list[str] The keys of the input dict. aux : dict[str, Any], optional Auxiliary data to store with the statistics. This can be labels or other constant data, which is not tracked by the online statistics algorithm, but is useful for plotting or other analysis and is stored and returned with the statistics. """ self._n = 0 self._mean = {k: 0 for k in keys} self._mean2 = {k: 0 for k in keys} self._aux = aux or {}
[docs] def update(self, x: dict[str, np.typing.ArrayLike]) -> None: self._n += 1 if not isinstance(x, dict) and hasattr(x, "__dict__"): x = x.__dict__ if isinstance(self._aux, set): # convert set of key to dict on first update self._aux = {k: x[k] for k in self._aux} for k in self._mean.keys(): # missing keys are ignored, equivalent to treating them as zero if k in x: delta = x[k] - self._mean[k] self._mean[k] += delta / self._n self._mean2[k] += delta * (x[k] - self._mean[k])
@property def mean(self) -> dict[str, np.typing.ArrayLike]: return self._aux | {k: v for (k, v) in self._mean.items()} @property def variance(self) -> dict[str, np.typing.ArrayLike]: if self._n < 2: return {k: np.nan for k in self._mean.keys()} return self._aux | {k: (self._mean2[k] / self._n) for k in self._mean.keys()} @property def std_dev(self) -> dict[str, np.typing.ArrayLike]: return self._aux | {k: np.sqrt(v) for k, v in self.variance.items()} @property def sample_variance(self) -> dict[str, np.typing.ArrayLike]: if self._n < 2: return {k: np.nan for k in self._mean.keys()} return self._aux | {k: (self._mean2[k] / (self._n - 1)) for k in self._mean.keys()} @property def sample_std_dev(self) -> dict[str, np.typing.ArrayLike]: return self._aux | {k: np.sqrt(v) for k, v in self.sample_variance.items()} @property def n(self) -> int: return self._n
[docs] class WeightedWelford: def __init__(self): self._w_sum = 0 self._w_sum2 = 0 self._mean = 0 self._S = 0
[docs] def update(self, x: np.typing.ArrayLike, w: np.typing.ArrayLike) -> None: self._w_sum += w self._w_sum2 += w**2 mean_old = self.mean self._mean = mean_old + (w / self._w_sum) * (x - mean_old) self.S = self._S + w * (x - mean_old) * (x - self.mean)
@property def mean(self) -> np.typing.ArrayLike: return self._mean @property def variance(self) -> np.typing.ArrayLike: return self._S / self._w_sum @property def std_dev(self) -> np.typing.ArrayLike: return np.sqrt(self.variance) @property def sample_variance(self) -> np.typing.ArrayLike: return self._S / (self._w_sum - 1) @property def sample_std_dev(self) -> np.typing.ArrayLike: return np.sqrt(self.sample_variance) @property def n(self) -> int: return self._w_sum
[docs] class SingleWelford: """ In this tool, any array passed in is flattened, and each element is treated as another sample. """ def __init__(self): self._n = 0 self._mean = 0 self._mean2 = 0
[docs] def update(self, x: np.typing.ArrayLike) -> None: if isinstance(x, tuple | list): x = np.array(x) x = np.asarray(x).ravel() n2 = len(x) if n2 == 0: return # Compute batch statistics for the incoming array. mean2 = np.mean(x) m2_2 = np.sum((x - mean2) ** 2) # Merge existing statistics with batch statistics using the parallel # Welford / Chan et al. combination formula: # M2 = M2_a + M2_b + delta^2 * n_a * n_b / (n_a + n_b) n1 = self._n n = n1 + n2 delta = mean2 - self._mean self._mean = self._mean + delta * n2 / n self._mean2 = self._mean2 + m2_2 + delta**2 * n1 * n2 / n self._n = n
@property def mean(self) -> np.typing.ArrayLike: return self._mean @property def variance(self) -> np.typing.ArrayLike: if self._n < 2: return np.nan return self._mean2 / self._n @property def std_dev(self) -> np.typing.ArrayLike: return np.sqrt(self.variance) @property def sample_variance(self) -> np.typing.ArrayLike: if self._n < 2: return np.nan return self._mean2 / (self._n - 1) @property def sample_std_dev(self) -> np.typing.ArrayLike: return np.sqrt(self.sample_variance) @property def n(self) -> int: return self._n