"""
Normalization utilities.
This module centralises the logic shared by every feature class and the feature
extraction step when applying normalization. It supports four normalization
"types", each selected per-feature via ``stats_set.type`` in the
``feature_param_sets`` section of a configuration file:
- ``raw``: no normalization (the default).
- ``min_max``: min-max scaling using values supplied **by hand** in the
``feature_stats_sets`` section of the config. This is the historical
behaviour and is kept unchanged.
- ``auto_min_max``: min-max scaling using min/max values **derived
automatically** from the dataset's summary statistics.
- ``standard``: standard scaling ``(x - mean) / sd`` using mean/sd values
derived automatically from the dataset's summary statistics.
For ``auto_min_max`` and ``standard`` the derived values are written to a YAML
normalization file during dataset preparation and re-loaded during
classification, so the same fitted normalization is applied at classification
time without re-entering any values (and without access to the original
training data).
The helpers here are deliberately small and pure so they can be unit-tested
with synthetic Polars frames, independently of the wider pipeline.
"""
import os
from typing import Dict, List, Optional
import polars as pl
import yaml
#: Normalization types that actually transform feature values. ``raw`` is
#: intentionally excluded because it is a no-op.
SCALING_TYPES = ("min_max", "auto_min_max", "standard")
#: Normalization types whose values are derived from data (and therefore must
#: be persisted to a normalization file for reuse at classification time),
#: as opposed to ``min_max`` whose values are supplied directly in the config.
AUTO_SCALING_TYPES = ("auto_min_max", "standard")
[docs]
def is_scaling_type(stats_type: Optional[str]) -> bool:
"""
Return whether a given ``stats_set`` type performs a value transformation.
:param stats_type: The normalization type (e.g. ``"min_max"``, ``"raw"``).
:type stats_type: Optional[str]
:returns: ``True`` for ``min_max``, ``auto_min_max`` and ``standard``;
``False`` for ``raw``, ``None`` and any unknown value.
:rtype: bool
"""
return stats_type in SCALING_TYPES
[docs]
def build_scaling_expr(col_name: str, params: Dict, stats_type: str) -> pl.Expr:
"""
Build a Polars expression that normalizes a single column.
The formula depends on ``stats_type``:
- ``min_max`` / ``auto_min_max``: ``(x - min) / (max - min)``
- ``standard``: ``(x - mean) / sd``
A zero denominator (a constant column, e.g. a per-profile location whose
standard deviation is zero) is handled gracefully by only subtracting the
centre, which yields ``0`` for the constant value rather than ``inf``/``nan``.
:param col_name: Name of the column to scale (the output keeps the name).
:type col_name: str
:param params: The statistics for this column. For min-max types this is
``{"min": ..., "max": ...}``; for ``standard`` it is
``{"mean": ..., "sd": ...}``.
:type params: Dict
:param stats_type: One of :data:`SCALING_TYPES`.
:type stats_type: str
:returns: A Polars expression aliased back to ``col_name``.
:rtype: pl.Expr
"""
if stats_type in ("min_max", "auto_min_max"):
lo = params["min"]
hi = params["max"]
denom = hi - lo
if denom == 0:
return (pl.col(col_name) - lo).alias(col_name)
return ((pl.col(col_name) - lo) / denom).alias(col_name)
if stats_type == "standard":
mean = params["mean"]
sd = params["sd"]
if sd == 0:
return (pl.col(col_name) - mean).alias(col_name)
return ((pl.col(col_name) - mean) / sd).alias(col_name)
# Unknown / raw: leave the column untouched.
return pl.col(col_name)
[docs]
def scale_flat_columns(
df: pl.DataFrame, stats: Dict[str, Dict], stats_type: str
) -> pl.DataFrame:
"""
Apply scaling to a frame whose stats are keyed directly by column name.
Used by features that operate on raw observed variables (e.g.
``basic_values``, ``flank_up``, ``flank_down``, ``location``), where
``stats`` looks like ``{"temp": {"min": ..., "max": ...}, ...}``.
Columns present in ``stats`` but absent from ``df`` are skipped, so a single
shared stats set can be reused across features that expose different subsets
of columns.
:param df: The frame to transform.
:type df: pl.DataFrame
:param stats: Mapping of column name to its statistics.
:type stats: Dict[str, Dict]
:param stats_type: One of :data:`SCALING_TYPES`.
:type stats_type: str
:returns: A new frame with the relevant columns scaled.
:rtype: pl.DataFrame
"""
exprs = [
build_scaling_expr(col_name, params, stats_type)
for col_name, params in stats.items()
if col_name in df.columns
]
if not exprs:
return df
return df.with_columns(exprs)
[docs]
def scale_nested_columns(
df: pl.DataFrame, stats: Dict[str, Dict], stats_type: str
) -> pl.DataFrame:
"""
Apply scaling to a frame whose stats are keyed by ``variable`` then ``stat``.
Used by ``profile_summary_stats``, whose feature columns are named
``{variable}_{stat}`` (e.g. ``temp_mean``) and whose ``stats`` looks like
``{"temp": {"mean": {"min": ..., "max": ...}, ...}, ...}``.
Columns derived from the nested keys but absent from ``df`` are skipped.
:param df: The frame to transform.
:type df: pl.DataFrame
:param stats: Nested mapping ``{variable: {stat: stats_dict}}``.
:type stats: Dict[str, Dict]
:param stats_type: One of :data:`SCALING_TYPES`.
:type stats_type: str
:returns: A new frame with the relevant columns scaled.
:rtype: pl.DataFrame
"""
exprs = []
for variable, per_stat in stats.items():
for stat_name, params in per_stat.items():
col_name = f"{variable}_{stat_name}"
if col_name in df.columns:
exprs.append(build_scaling_expr(col_name, params, stats_type))
if not exprs:
return df
return df.with_columns(exprs)
[docs]
def aggregate_profile_stats(
summary_stats: pl.DataFrame,
variables: Optional[List[str]] = None,
exclude: List[str] = ["longitude", "latitude"],
) -> pl.DataFrame:
"""
Aggregate per-profile summary statistics across profiles.
This reshapes the long per-profile rows of a ``summary_stats`` table (i.e.
the rows whose ``platform_code`` is not ``"all"``) into one row per
``(variable, stats)`` pair, computing the distribution of each per-profile
statistic **across profiles**: its ``min``, ``mean``, ``pct97.5``, ``max``
and ``sd``.
The across-profile ``sd`` is the only addition relative to the historical
``SummaryStatsBase.create_summary_stats_profile`` output; it is required to
standard-scale ``profile_summary_stats`` features (whose columns are
themselves per-profile statistics).
:param summary_stats: The combined summary statistics table produced by
:meth:`SummaryStatsBase.calculate_stats`.
:type summary_stats: pl.DataFrame
:param variables: Optional list of variables to keep. ``None`` keeps all.
:type variables: Optional[List[str]]
:param exclude: Variables to drop before aggregating (location variables
have no meaningful per-profile spread).
:type exclude: List[str]
:returns: A long-form frame with columns ``variable``, ``stats``, ``min``,
``mean``, ``pct97.5``, ``max`` and ``sd``.
:rtype: pl.DataFrame
"""
df = summary_stats.filter(pl.col("platform_code") != "all")
if exclude:
df = df.filter(~pl.col("variable").is_in(exclude))
if variables is not None:
df = df.filter(pl.col("variable").is_in(variables))
return (
df.unpivot(
index=["platform_code", "profile_no", "variable"], variable_name="stats"
)
.group_by(["variable", "stats"])
.agg(
min=pl.col("value").min(),
mean=pl.col("value").mean(),
pct97_5=pl.col("value").quantile(0.975),
max=pl.col("value").max(),
sd=pl.col("value").std(),
)
.rename({"pct97_5": "pct97.5"})
.sort(["variable", "stats"])
)
[docs]
def derive_observation_stats(
summary_stats: pl.DataFrame, variables: List[str], stats_type: str
) -> Dict[str, Dict]:
"""
Derive flat per-variable normalization stats from the global ("all") rows.
For each requested variable this reads its global summary row
(``platform_code == "all"``) and extracts either ``{min, max}``
(for ``auto_min_max``) or ``{mean, sd}`` (for ``standard``).
:param summary_stats: The combined summary statistics table.
:type summary_stats: pl.DataFrame
:param variables: The variables (column names) to derive stats for.
:type variables: List[str]
:param stats_type: ``"auto_min_max"`` or ``"standard"``.
:type stats_type: str
:returns: ``{variable: {"min"/"max"} or {"mean"/"sd"}}``.
:rtype: Dict[str, Dict]
"""
all_rows = summary_stats.filter(pl.col("platform_code") == "all")
out: Dict[str, Dict] = {}
for variable in variables:
match = all_rows.filter(pl.col("variable") == variable)
if match.height == 0:
continue
row = match.row(0, named=True)
out[variable] = _stats_entry(row, stats_type)
return out
[docs]
def derive_profile_stats(
profile_stats_long: pl.DataFrame,
variables: List[str],
summary_stats_names: List[str],
stats_type: str,
) -> Dict[str, Dict]:
"""
Derive nested per-(variable, stat) normalization stats across profiles.
Used for ``profile_summary_stats`` features. ``profile_stats_long`` is the
output of :func:`aggregate_profile_stats`; for each requested variable and
each requested per-profile statistic, this extracts ``{min, max}`` (for
``auto_min_max``) or ``{mean, sd}`` (for ``standard``).
:param profile_stats_long: The across-profile aggregation.
:type profile_stats_long: pl.DataFrame
:param variables: The variables (e.g. ``["temp", "psal", "pres"]``).
:type variables: List[str]
:param summary_stats_names: The per-profile statistics that become feature
columns (e.g. ``["mean", "median", "sd"]``).
:type summary_stats_names: List[str]
:param stats_type: ``"auto_min_max"`` or ``"standard"``.
:type stats_type: str
:returns: ``{variable: {stat: {"min"/"max"} or {"mean"/"sd"}}}``.
:rtype: Dict[str, Dict]
"""
out: Dict[str, Dict] = {}
for row in profile_stats_long.iter_rows(named=True):
if row["variable"] not in variables:
continue
if row["stats"] not in summary_stats_names:
continue
out.setdefault(row["variable"], {})[row["stats"]] = _stats_entry(
row, stats_type
)
return out
def _stats_entry(row: Dict, stats_type: str) -> Dict:
"""
Build a single stats entry from a summary row for the given type.
:param row: A mapping that contains ``min``/``max`` and ``mean``/``sd`` keys.
:type row: Dict
:param stats_type: ``"auto_min_max"`` or ``"standard"``.
:type stats_type: str
:returns: ``{"min": ..., "max": ...}`` or ``{"mean": ..., "sd": ...}``.
:rtype: Dict
:raises ValueError: If ``stats_type`` is not a derivable (auto) type.
"""
if stats_type == "standard":
return {"mean": row["mean"], "sd": row["sd"]}
if stats_type == "auto_min_max":
return {"min": row["min"], "max": row["max"]}
raise ValueError(
f"Cannot derive statistics for non-auto normalization type '{stats_type}'."
)
[docs]
def write_normalization_file(
output_file: str, stats_set_name: str, resolved: Dict[str, Dict[str, Dict]]
) -> None:
"""
Write derived normalization values to a YAML file.
The file mirrors the structure of a single ``feature_stats_sets`` entry so
it can be loaded straight back into a configuration's ``feature_stats_set``
and consumed by the existing stats-injection machinery. For example::
name: feature_set_1_stats_set_1
auto_min_max:
- name: basic_values3
stats: {temp: {min: 0.0, max: 20.0}, ...}
standard:
- name: location
stats: {longitude: {mean: 18.8, sd: 2.0}, ...}
:param output_file: Destination path. Parent directories are created.
:type output_file: str
:param stats_set_name: The ``name`` recorded at the top of the file.
:type stats_set_name: str
:param resolved: ``{stats_type: {entry_name: stats_dict}}`` to serialise.
:type resolved: Dict[str, Dict[str, Dict]]
:returns: None
:rtype: None
"""
document: Dict = {"name": stats_set_name}
for stats_type, entries in resolved.items():
document[stats_type] = [
{"name": name, "stats": stats} for name, stats in entries.items()
]
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "w", encoding="utf-8") as handle:
yaml.safe_dump(document, handle, sort_keys=False, default_flow_style=None)
[docs]
def read_normalization_file(input_file: str) -> Dict:
"""
Read a normalization YAML file written by :func:`write_normalization_file`.
:param input_file: Path to the YAML normalization file.
:type input_file: str
:raises FileNotFoundError: If the file does not exist.
:returns: A dictionary shaped like a ``feature_stats_set`` entry (i.e. with
``name`` plus ``auto_min_max`` / ``standard`` lists).
:rtype: Dict
"""
if not os.path.exists(input_file):
raise FileNotFoundError(
f"Normalization file '{input_file}' does not exist. It is produced "
f"during dataset preparation when 'auto_min_max' or 'standard' "
f"normalization is used."
)
with open(input_file, "r", encoding="utf-8") as handle:
return yaml.safe_load(handle)