"""
This module defines the FlankUp class, which is responsible for extracting
neighboring (flanking) observations for specific target rows in a dataset.
It is primarily used for feature engineering with Copernicus CTD data.
"""
from typing import Optional, Dict
import polars as pl
from aiqclib.common.base.feature_base import FeatureBase
from aiqclib.common.utils.normalization import is_scaling_type, scale_flat_columns
[docs]
class FlankUp(FeatureBase):
"""
A feature-extraction class for retrieving target values and their "flanking" values
from Copernicus CTD data, extending :class:`aiqclib.common.base.feature_base.FeatureBase`.
The term "flanking values" refers to the concept of capturing neighboring observations
around a specified index (e.g., observation_no) by shifting backward a specified amount.
"""
def __init__(
self,
target_name: Optional[str] = None,
feature_info: Optional[Dict] = None,
selected_profiles: Optional[pl.DataFrame] = None,
filtered_input: Optional[pl.DataFrame] = None,
selected_rows: Optional[Dict[str, pl.DataFrame]] = None,
summary_stats: Optional[pl.DataFrame] = None,
) -> None:
"""
Initialize an instance of FlankUp.
:param target_name: The key identifying which target's rows to extract
features for from :attr:`selected_rows`, defaults to None.
:type target_name: Optional[str]
:param feature_info: A dictionary containing feature-related parameters,
including a "stats" sub-dict with min/max info
and a "flank_up" integer specifying how many
upstream observations to retrieve, defaults to None.
:type feature_info: Optional[Dict]
:param selected_profiles: A Polars DataFrame with selected profiles, typically
used for further merges or lookups, defaults to None.
:type selected_profiles: Optional[pl.DataFrame]
:param filtered_input: A potentially filtered Polars DataFrame containing
full observed variables, defaults to None.
:type filtered_input: Optional[pl.DataFrame]
:param selected_rows: A dictionary mapping target names to their respective
DataFrames of relevant rows, defaults to None.
:type selected_rows: Optional[Dict[str, pl.DataFrame]]
:param summary_stats: A Polars DataFrame of summary statistics
(unused in this subclass), defaults to None.
:type summary_stats: Optional[pl.DataFrame]
"""
super().__init__(
target_name=target_name,
feature_info=feature_info,
selected_profiles=selected_profiles,
filtered_input=filtered_input,
selected_rows=selected_rows,
summary_stats=summary_stats,
)
self._expanded_observations: Optional[pl.DataFrame] = None
self._feature_wide: Optional[pl.DataFrame] = None
def _init_features(self) -> None:
"""
Initialize :attr:`features` by selecting core columns
from :attr:`selected_rows[target_name]`.
"""
self.features = self.selected_rows[self.target_name].select(
["row_id", "platform_code", "profile_no"]
)
def _expand_observations(self) -> None:
"""
Generate a DataFrame with additional rows for each "flank" step.
This expands each row in :attr:`selected_rows[target_name]` by
cross joining with a sequence from 1 to ``feature_info["flank_up"]``,
then adjusts ``observation_no`` to shift backwards for each flank step.
"""
self._expanded_observations = (
self.selected_rows[self.target_name]
.select(["row_id", "platform_code", "profile_no", "observation_no"])
.join(
pl.DataFrame(
{"flank_seq": list(range(1, self.feature_info.get("flank_up") + 1))}
),
how="cross",
)
.with_columns(
(pl.col("observation_no") - pl.col("flank_seq")).alias("observation_no")
)
.with_columns(
pl.when(pl.col("observation_no") < 1)
.then(1)
.otherwise(pl.col("observation_no"))
.alias("observation_no")
)
)
def _pivot_features(self, col_name: str) -> None:
"""
Pivot the expanded observations to create columns for each flank step
of the specified data column.
:param col_name: The original data column to be pivoted (e.g., "temp").
:type col_name: str
"""
self._feature_wide = (
self._expanded_observations.join(
self.filtered_input.select(
pl.col("platform_code"),
pl.col("profile_no"),
pl.col("observation_no"),
pl.col(col_name).alias("value"),
),
on=["platform_code", "profile_no", "observation_no"],
maintain_order="left",
)
.with_columns(
pl.concat_str(
[
pl.lit(f"{col_name}_up"),
pl.col("flank_seq").cast(pl.Utf8),
],
separator="_",
).alias("col_name")
)
.drop(["observation_no", "flank_seq"])
.pivot(
"col_name",
index=["row_id", "platform_code", "profile_no"],
values="value",
)
)
def _add_features(self) -> None:
"""
Join the pivoted columns from :attr:`_feature_wide` onto :attr:`features`.
"""
self.features = self.features.join(
self._feature_wide,
on=["row_id", "platform_code", "profile_no"],
maintain_order="left",
)
def _clean_features(self) -> None:
"""
Drop columns that are no longer needed in the final feature set.
"""
self.features = self.features.drop(["platform_code", "profile_no"])
[docs]
def scale_first(self) -> None:
"""
Apply a pre-feature-extraction scaling step on :attr:`filtered_input`
using min-max scaling derived from :attr:`feature_info["stats"]`.
This modifies :attr:`filtered_input` in place for each relevant column.
"""
stats_type = self.feature_info.get("stats_set", {}).get("type", "raw")
if is_scaling_type(stats_type) and self.feature_info.get("stats"):
self.filtered_input = scale_flat_columns(
self.filtered_input, self.feature_info["stats"], stats_type
)
[docs]
def scale_second(self) -> None:
"""
Apply a post-feature-extraction scaling step if needed.
Currently, unimplemented; retains placeholders for additional
scaling/normalization after feature pivoting and expansion.
"""
pass # pragma: no cover