Source code for trails.temporal_distributions

# temporal_distributions.py

from dataclasses import dataclass
from typing import Optional, Iterable, Tuple, Sequence
import numpy as np

import numpy as np
from math import erf, sqrt, exp

import logging

logger = logging.getLogger(__name__)


[docs] @dataclass class TemporalExchange: """Metadata for a single temporally-distributed exchange. - :param distribution: Integer code for the distribution shape. - 1: discrete (all mass at loc) - 2: lognormal - 3: normal - 4: uniform - 5: triangular - 6: discrete empirical (explicit offsets + weights) :param loc: Location parameter (mean, median, or mode depending on distribution). :param scale: Scale parameter (stddev for normal, sigma for lognormal). :param offset_min: Minimum integer offset (inclusive). :param offset_max: Maximum integer offset (inclusive). :param amount_source: Source of the amount for each pulse year.""" distribution: int loc: Optional[float] scale: Optional[float] offset_min: int offset_max: int # "port" (default): use CSV row value scaled by weights # "matrix": ignore CSV row value; at each pulse year use interpolated A/B[t, i, j] or B[t, i, f] amount_source: str = "port" offsets: Optional[Sequence[int]] = None weights: Optional[Sequence[float]] = None
[docs] class TemporalDistribution: """Turn a TemporalExchange into discrete (offset, weight) pairs.""" def __init__(self, tex: TemporalExchange) -> None: """init . :param tex: Value for `tex`. :type tex: TemporalExchange""" self.tex = tex # ------------------------------------------------------------------ # Individual distribution helpers # ------------------------------------------------------------------ @staticmethod def _default_sigma(offsets: np.ndarray, scale: Optional[float]) -> float: """default sigma. :param offsets: Value for `offsets`. :type offsets: np.ndarray :param scale: Value for `scale`. :type scale: Optional[float] :returns: Return value. :rtype: float""" if scale is not None and scale > 0: return float(scale) # Heuristic: spread over about half the range rng = float(offsets.max() - offsets.min()) if rng <= 0: return 1.0 return max(rng / 4.0, 1e-6) @staticmethod def _triangular_weights(offsets: np.ndarray, loc: Optional[float]) -> np.ndarray: """triangular weights. :param offsets: Value for `offsets`. :type offsets: np.ndarray :param loc: Value for `loc`. :type loc: Optional[float] :returns: Return value. :rtype: np.ndarray""" if loc is None: # If no loc given, fall back to symmetric around the midpoint loc = 0.0 offsets = offsets.astype(float) distances = np.abs(offsets - float(loc)) max_d = distances.max() if max_d == 0.0: # all offsets equal to loc -> put all mass there w = np.zeros_like(offsets, dtype=float) w[:] = 1.0 return w # Linear "tent" function: max_d - distance w = max_d - distances w[w < 0.0] = 0.0 return w @staticmethod def _normal_weights( offsets: np.ndarray, loc: float, scale: float, offset_min: int, offset_max: int, ) -> np.ndarray: """normal weights. :param offsets: Value for `offsets`. :type offsets: np.ndarray :param loc: Value for `loc`. :type loc: float :param scale: Value for `scale`. :type scale: float :param offset_min: Value for `offset_min`. :type offset_min: int :param offset_max: Value for `offset_max`. :type offset_max: int :returns: Return value. :rtype: np.ndarray""" if scale is None or scale <= 0: scale = 1.0 offsets = np.asarray(offsets, dtype=float) # Normal PDF evaluated at integer offsets pdf_vals = np.exp(-0.5 * ((offsets - loc) / scale) ** 2) # --- Normalization for TRUNCATED NORMAL BETWEEN [offset_min, offset_max] --- # CDF helper def normal_cdf(x: float) -> float: """Normal cdf. :param x: Value for `x`. :type x: float :returns: Return value. :rtype: float""" return 0.5 * (1 + erf((x - loc) / (scale * sqrt(2)))) # Continuous probability mass inside the allowed range Z = normal_cdf(offset_max + 0.5) - normal_cdf(offset_min - 0.5) if Z <= 0: # fallback to uniform if something is wrong return np.ones_like(offsets, dtype=float) # Discrete normalization to preserve total mass = 1 pdf_vals /= pdf_vals.sum() # sum to 1 over sampled offsets pdf_vals /= Z # rescale to match truncated continuous total mass return pdf_vals def _lognormal_weights( self, offsets: np.ndarray, loc: Optional[float], scale: Optional[float] ) -> np.ndarray: """lognormal weights. :param offsets: Value for `offsets`. :type offsets: np.ndarray :param loc: Value for `loc`. :type loc: Optional[float] :param scale: Value for `scale`. :type scale: Optional[float] :returns: Return value. :rtype: np.ndarray""" x = offsets.astype(float) mask = x > 0 if not mask.any(): # No positive offsets to assign lognormal mass -> uniform fallback return np.ones_like(offsets, dtype=float) if loc is None or loc <= 0: # Fallback: use median at geometric mid of positive range pos_vals = x[mask] loc = float(np.sqrt(pos_vals.min() * pos_vals.max())) sigma = self._default_sigma(x[mask], scale) mu = float(np.log(loc)) w = np.zeros_like(x, dtype=float) # lognormal pdf, up to a constant factor: z = (np.log(x[mask]) - mu) / sigma # divide by x to get the right qualitative shape; constant factor ignored w[mask] = np.exp(-0.5 * z * z) / x[mask] if not np.isfinite(w).any() or w.sum() <= 0: return np.ones_like(offsets, dtype=float) return w @staticmethod def _discrete_weights(offsets: np.ndarray, loc: Optional[float]) -> np.ndarray: """discrete weights. :param offsets: Value for `offsets`. :type offsets: np.ndarray :param loc: Value for `loc`. :type loc: Optional[float] :returns: Return value. :rtype: np.ndarray""" if loc is None: # try to anchor at 0 if possible if offsets.min() <= 0 <= offsets.max(): target = 0 else: # nearest boundary target = offsets[np.argmin(np.abs(offsets))] else: target = int(round(loc)) # clip to available range target = int(max(offsets.min(), min(offsets.max(), target))) w = np.zeros_like(offsets, dtype=float) idx = int(np.argmin(np.abs(offsets - target))) w[idx] = 1.0 return w @staticmethod def _discrete_empirical_weights( offsets: np.ndarray, pulse_offsets: Optional[Sequence[int]], pulse_weights: Optional[Sequence[float]], *, fallback_loc: Optional[float], ) -> np.ndarray: """Build explicit discrete pulse weights over integer offsets.""" if pulse_offsets is None or pulse_weights is None: return TemporalDistribution._discrete_weights(offsets, fallback_loc) if len(pulse_offsets) == 0 or len(pulse_weights) == 0: return TemporalDistribution._discrete_weights(offsets, fallback_loc) if len(pulse_offsets) != len(pulse_weights): return TemporalDistribution._discrete_weights(offsets, fallback_loc) w = np.zeros_like(offsets, dtype=float) offset_to_idx = {int(off): idx for idx, off in enumerate(offsets.tolist())} for off, wei in zip(pulse_offsets, pulse_weights): try: off_i = int(off) wei_f = float(wei) except Exception: continue if not np.isfinite(wei_f) or wei_f < 0.0: continue idx = offset_to_idx.get(off_i) if idx is None: continue w[idx] += wei_f if not np.isfinite(w).any() or w.sum() <= 0: return TemporalDistribution._discrete_weights(offsets, fallback_loc) return w
[docs] def iter_offsets_and_weights( self, debug: bool = False ) -> Iterable[Tuple[int, float]]: """Iter offsets and weights. :param debug: Value for `debug`. :type debug: bool :yields: Yielded values. :returns: Return value. :rtype: Iterable[Tuple[int, float]]""" t = self.tex dist = int(t.distribution) # For discrete empirical distributions, explicit pulse offsets define support. if dist == 6 and getattr(t, "offsets", None): try: explicit_offsets = [int(round(float(o))) for o in t.offsets] # type: ignore[arg-type] except Exception: explicit_offsets = [] if explicit_offsets: offsets = np.array(sorted(set(explicit_offsets)), dtype=int) else: offsets = np.arange(int(t.offset_min), int(t.offset_max) + 1, dtype=int) else: offsets = np.arange(int(t.offset_min), int(t.offset_max) + 1, dtype=int) if offsets.size == 0: return iter(()) # 1) Base (unnormalized) weights if dist == 5: weights = self._triangular_weights(offsets, t.loc) elif dist == 2: weights = self._lognormal_weights(offsets, t.loc, t.scale) elif dist == 3: weights = self._normal_weights( offsets=offsets, loc=float(t.loc) if t.loc is not None else 0.0, scale=float(t.scale) if (t.scale is not None and t.scale > 0) else 1.0, offset_min=int(t.offset_min), offset_max=int(t.offset_max), ) elif dist == 4: weights = np.ones_like(offsets, dtype=float) elif dist == 1: weights = self._discrete_weights(offsets, t.loc) elif dist == 6: weights = self._discrete_empirical_weights( offsets, getattr(t, "offsets", None), getattr(t, "weights", None), fallback_loc=t.loc, ) else: # Unknown distribution -> uniform fallback weights = np.ones_like(offsets, dtype=float) weights = np.asarray(weights, dtype=float) total = float(weights.sum()) if not np.isfinite(total) or total <= 0.0: if debug: logger.warning( "TemporalDistribution: invalid total weight (dist=%s, offsets=[%d..%d]) -> empty", dist, int(t.offset_min), int(t.offset_max), ) return iter(()) # 2) Normalize weights /= total # 3) Yield for k, w in zip(offsets, weights): w = float(w) if w != 0.0: yield int(k), w
def resolve_temporal_offset_bounds( *, distribution: int, loc: Optional[float], offset_min: Optional[int], offset_max: Optional[int], ) -> tuple[int, int]: """Resolve optional temporal offset bounds to integer support. Discrete pulse distributions use ``loc`` as the pulse offset. If no explicit ``temporal_min``/``temporal_max`` support is provided, the support must therefore be the pulse offset itself, not the generic 0-year fallback. """ if int(distribution) == 1 and offset_min is None and offset_max is None: if loc is not None and np.isfinite(float(loc)): target = int(round(float(loc))) return target, target off_min = 0 if offset_min is None else int(offset_min) off_max = 0 if offset_max is None else int(offset_max) return off_min, off_max