Skip to content

transforms

Cast(target_type, *, features=None)

Bases: Transform

Cast features to a different datatype.

This transform allows to change the datatype of features in a DataFrame. To cast all features to the same datatype, provide a single type as the target_type argument e.g.

transform = Cast(pl.Float64)

By specifying the features keyword argument, only the selected features will be cast e.g.

transform = Cast(pl.Float64, features=["feature_a"])

Lastly, to cast features to different datatypes, provide a dictionary with feature names as keys and target types as values e.g.

transform = Cast(
    {
        "feature_a": pl.Boolean,
        "feature_b": pl.Float64,
    },
)

Initializes the Cast transform.

Parameters:

Name Type Description Default
target_type PolarsDataType | dict[str, PolarsDataType]

Type to which the features will be cast. If a single type is provided, all features or those provided in the features keyword argument will be cast to that specific type. To cast features to different types, provide a dictionary with feature names as keys and target types as values.

required
features Iterable[str] | None

The features to cast. If None all features will be cast. This is the default behaviour.

None
Source code in src/flowcean/polars/transforms/cast.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    target_type: PolarsDataType | dict[str, PolarsDataType],
    *,
    features: Iterable[str] | None = None,
) -> None:
    """Initializes the Cast transform.

    Args:
        target_type: Type to which the features will be cast.
            If a single type is provided, all features or those provided in
            the `features` keyword argument will be cast to that specific
            type. To cast features to different types, provide a dictionary
            with feature names as keys and target types as values.
        features: The features to cast. If `None` all
            features will be cast. This is the default behaviour.
    """
    self.target_type = target_type
    self.features = features

Drop(features)

Bases: Transform

Drop features from the data.

Initializes the Drop transform.

Source code in src/flowcean/polars/transforms/drop.py
12
13
14
15
def __init__(self, features: str | Iterable[str]) -> None:
    """Initializes the Drop transform."""
    super().__init__()
    self.features = features

Explode(features)

Bases: Transform

Explodes a Dataframe to long format by exploding the given features.

Parameters:

Name Type Description Default
features list[str]

List of features to explode.

required

The below example shows the usage of a Explode transform in an experiment.yaml file. Assuming the loaded data is represented by the table:

time feature_a feature_b constant
[0, 1] [2, 1] [9, 3] 1
[0, 2] [3, 4] [8, 4] 2

This transform can be used to explode the columns time, feature_a, and feature_b.

The resulting Dataframe after the transform is:

time feature_a feature_b constant
0 2 9 1
1 1 3 1
0 3 8 2
2 4 4 2
Source code in src/flowcean/polars/transforms/explode.py
39
40
def __init__(self, features: list[str]) -> None:
    self.features = features

Filter(expression)

Bases: Transform

Filter an environment based on one or multiple expressions.

This transform allows to filter an environment based on or multiple boolean expressions. Assuming the input environment is given by

t N x
1 10 0
2 12 1
3 5 2
4 15 1
5 17 0

The following transformation can be used to filter the environment so that the result contains only records where x=1:

    Filter("x == 1")

The result dataset after applying the transform will be

t N x
2 15 1
4 12 1

To only get records where x=1 and t > 3 the filter expression

Filter(And(["x == 1", "t > 3"]))

can be used.

To filter all records where x=1 and t > 3 or N < 15 use

Filter(And(["x == 1", Or(["t > 3", "N < 15"])]))

Initializes the Filter transform.

Parameters:

Name Type Description Default
expression str | FilterExpr

String or filter expression used to filter the environment. Records that do not match the expression are discarded. Standard comparison and mathematical operations are supported within the expressions. Features can be accessed by there name.

required
Source code in src/flowcean/polars/transforms/filter.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    expression: str | FilterExpr,
) -> None:
    """Initializes the Filter transform.

    Args:
        expression: String or filter expression used to filter the
            environment. Records that do not match the expression are
            discarded. Standard comparison and mathematical operations are
            supported within the expressions. Features can be accessed by
            there name.
    """
    if isinstance(expression, str):
        self.predicate = _str_to_pl(expression)
    else:
        self.predicate = expression()

First(features, *, replace=False)

Bases: Transform

Selects the first time value of a time-series feature.

Initializes the First transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The features to apply this transform to.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_first.

False
Source code in src/flowcean/polars/transforms/first.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the First transform.

    Args:
        features: The features to apply this transform to.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_first`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

FeatureLengthVaryError

Bases: Exception

Length of a feature varies over different rows.

Flatten(features=None)

Bases: Transform

Flatten all time series in a DataFrame to individual features.

The given DataFrame's time series are converted into individual features, with each time step creating a new feature. This transform will change the order of the columns in the resulting dataset.

For example the dataset

series_data A B
{[0, 0], [1, 1], [2, 2]} 42 43
{[0, 3], [1, 4], [2, 5]} 44 45

gets flattened into the dataset

series_data_0 series_data_1 series_data_2 A B
0 1 2 42 43
3 4 5 42 43

Initialize the flatten transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to flatten. If not provided or set to None, all possible features from the given dataframe will be flattened.

None
Source code in src/flowcean/polars/transforms/flatten.py
35
36
37
38
39
40
41
42
43
def __init__(self, features: Iterable[str] | None = None) -> None:
    """Initialize the flatten transform.

    Args:
        features: The features to flatten. If not provided or set to None,
            all possible features from the given dataframe will be
            flattened.
    """
    self.features = features

NoTimeSeriesFeatureError

Bases: Exception

Feature is no time series.

Lambda(fn)

Bases: Transform

Apply a custom function to the data of an environment.

Initializes the Lambda transform.

Parameters:

Name Type Description Default
fn Callable[[LazyFrame], LazyFrame]

Function handle to be applied to the data.

required
Source code in src/flowcean/polars/transforms/function.py
15
16
17
18
19
20
21
def __init__(self, fn: Callable[[pl.LazyFrame], pl.LazyFrame]) -> None:
    """Initializes the Lambda transform.

    Args:
        fn: Function handle to be applied to the data.
    """
    self.fn = fn

Last(features, *, replace=False)

Bases: Transform

Selects the last time value of a time-series feature.

Initializes the Last transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The features to apply this transform to.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_last.

False
Source code in src/flowcean/polars/transforms/last.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the Last transform.

    Args:
        features: The features to apply this transform to.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_last`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

FeatureNotFoundError(feature)

Bases: Exception

Feature not found in the DataFrame.

This exception is raised when a feature is not found in the DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
277
278
def __init__(self, feature: str) -> None:
    super().__init__(f"{feature} not found")

MatchSamplingRate(reference_feature_name, feature_interpolation_map, fill_strategy='both_ways')

Bases: Transform

Matches the sampling rate of all time series in the DataFrame.

Interpolates the time series to match the sampling rate of the reference time series. The below example shows the usage of a MatchSamplingRate transform in a run.py file. Assuming the loaded data is represented by the table:

| feature_a                   | feature_b                   | const |
| ---                         | ---                         | ---   |
| list[struct[time,struct[]]] | list[struct[time,struct[]]] | int   |
| --------------------------- | --------------------------- | ----- |
| [{12:26:01.0, {1.2}},       | [{12:26:00.0, {1.0}},       | 1     |
|  {12:26:02.0, {2.4}},       |  {12:26:05.0, {2.0}}]       |       |
|  {12:26:03.0, {3.6}},       |                             |       |
|  {12:26:04.0, {4.8}}]       |                             |       |

The following transform can be used to match the sampling rate of the time series feature_b to the sampling rate of the time series feature_a.

    ...
    environment.load()
    data = environment.get_data()
    transform = MatchSamplingRate(
        reference_feature_name="feature_a",
        feature_interpolation_map={
            "feature_b": "linear",
        },
    )
    transformed_data = transform.transform(data)
    ...

The resulting Dataframe after the transform is:

| feature_a                   | feature_b                   | const |
| ---                         | ---                         | ---   |
| list[struct[time,struct[]]] | list[struct[time,struct[]]] | int   |
| --------------------------- | --------------------------- | ----- |
| [{12:26:00.0, {1.2}},       | [{12:26:00.0, {1.2}},       | 1     |
|  {12:26:01.0, {2.4}},       |  {12:26:01.0, {1.4}},       |       |
|  {12:26:02.0, {3.6}},       |  {12:26:02.0, {1.6}},       |       |
|  {12:26:03.0, {4.8}}]       |  {12:26:03.0, {1.8}}]       |       |

Initialize the transform.

Parameters:

Name Type Description Default
reference_feature_name str

Reference timeseries feature.

required
feature_interpolation_map dict[str, MatchSamplingRateMethod]

Key-value pairs of the timeseries features that are targeted in interpolation columns and the interpolation method to use. The interpolation method can be 'linear' or 'nearest'.

required
fill_strategy FillStrategy

Strategy to fill missing values after interpolation.

'both_ways'
Source code in src/flowcean/polars/transforms/match_sampling_rate.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __init__(
    self,
    reference_feature_name: str,
    feature_interpolation_map: dict[str, MatchSamplingRateMethod],
    fill_strategy: FillStrategy = "both_ways",
) -> None:
    """Initialize the transform.

    Args:
        reference_feature_name: Reference timeseries feature.
        feature_interpolation_map: Key-value pairs of the timeseries
            features that are targeted in interpolation columns and the
            interpolation method to use. The interpolation
            method can be 'linear' or 'nearest'.
        fill_strategy: Strategy to fill missing values after interpolation.
    """
    self.reference_feature_name = reference_feature_name
    self.feature_interpolation_map = feature_interpolation_map
    self.fill_strategy = fill_strategy

apply(data)

Transform the input DataFrame.

Parameters:

Name Type Description Default
data LazyFrame

Input DataFrame.

required

Returns:

Type Description
LazyFrame

Transformed DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def apply(self, data: pl.LazyFrame) -> pl.LazyFrame:
    """Transform the input DataFrame.

    Args:
        data: Input DataFrame.

    Returns:
        Transformed DataFrame.

    """
    # preserve all constant columns that are not timeseries data
    transformed_data = pl.DataFrame()
    collected_data = data.collect()
    for i in range(len(collected_data.rows())):
        transformed_data_slice = self._transform_row(
            collected_data.slice(i, 1),
        )
        transformed_data = transformed_data.vstack(transformed_data_slice)
    return transformed_data.lazy()

Mean(features, *, replace=False)

Bases: Transform

Replaces time-series features with their mean value.

Initializes the Mean transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The feature or features the mean should be calculated for.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_mean.

False
Source code in src/flowcean/polars/transforms/mean.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the Mean transform.

    Args:
        features: The feature or features the mean should be calculated
            for.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_mean`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

Median(features, *, replace=False)

Bases: Transform

Replaces time-series features with their median value.

Initializes the Median transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The feature or features the median should be calculated for.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_median.

False
Source code in src/flowcean/polars/transforms/median.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the Median transform.

    Args:
        features: The feature or features the median should be calculated
            for.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_median`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

Mode(features, *, replace=False)

Bases: Transform

Mode finds the value that appears most often in time-series features.

Initializes the Mode transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The features to apply this transform to.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_mode.

False
Source code in src/flowcean/polars/transforms/mode.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the Mode transform.

    Args:
        features: The features to apply this transform to.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_mode`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

OneCold(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-cold features.

Transforms integer features into a set of binary one-cold features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-cold transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
0 1 1 1
1 0 1 1
1 1 0 1
1 0 1 1
1 1 1 0

In the default configuration missing categories are ignored. Their respective entries will all be one. If you however want to enforce that each data entry belongs to a certain category, you can set the check_for_missing_categories flag to true when constructing a One-Cold transform. In that case if an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. This however has an impact on the performance and will slow down the transform.

If you want to enable this check, create the transform as follows: python transform = OneCold( feature_categories={ "feature": [0, 1, 2, 5] }, check_for_missing_categories=True )

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_cold.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The data to transform.

required

Returns:

Type Description
LazyFrame

The transformed data.

Source code in src/flowcean/polars/transforms/one_cold.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@override
def apply(
    self,
    data: pl.LazyFrame,
) -> pl.LazyFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).ne(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ],
        ).drop(feature)

        # Check only for missing categories if the user has requested it
        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ],
            )  # Get the new crated on-cold feature columns
            .select(
                # Check if all on-cold features are true
                # That's only the case if the category is missing
                pl.all_horizontal(
                    pl.all(),
                ).all(),  # Combine the results for all data entries ...
            )
            .collect(engine="streaming")
            # ... and get the final result.
            # If it is false, there is a missing category
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data DataFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_cold.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@classmethod
def from_dataframe(
    cls,
    data: pl.DataFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-cold",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique()).to_series().to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

OneHot(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-hot features.

Transforms integer features into a set of binary one-hot features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-hot transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
1 0 0 0
0 1 0 0
0 0 1 0
0 1 0 0
0 0 0 1

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_hot.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The data to transform.

required

Returns:

Type Description
LazyFrame

The transformed data.

Source code in src/flowcean/polars/transforms/one_hot.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@override
def apply(
    self,
    data: pl.LazyFrame,
) -> pl.LazyFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).eq(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ],
        ).drop(feature)

        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ],
            )
            .select(pl.any_horizontal(pl.all()).all())
            .collect(engine="streaming")
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data LazyFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_hot.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def from_dataframe(
    cls,
    data: pl.LazyFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-hot",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique())
            .collect(engine="streaming")
            .to_series()
            .to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

Rename(mapping)

Bases: Transform

Rename features in an environment.

Initializes the Rename transform.

Parameters:

Name Type Description Default
mapping dict[str, str]

Key value pairs that map from the old feature name to the new one.

required
Source code in src/flowcean/polars/transforms/rename.py
14
15
16
17
18
19
20
21
def __init__(self, mapping: dict[str, str]) -> None:
    """Initializes the Rename transform.

    Args:
        mapping: Key value pairs that map from the old feature name to the
            new one.
    """
    self.mapping = mapping

Resample(sampling_rate, *, interpolation_method='linear')

Bases: Transform

Resample time series features to a given sampling rate.

Initializes the Resample transform.

Parameters:

Name Type Description Default
sampling_rate float | dict[str, float]

Target sampling rate for time series features. If a float is provided, all possible time series features will be resampled. Alternatively, a dictionary can be provided where the key is the feature and the value is the target sample rate.

required
interpolation_method InterpolationMethod

The interpolation method to use. Supported are "linear" and "cubic", with the default being "linear".

'linear'
Source code in src/flowcean/polars/transforms/resample.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    sampling_rate: float | dict[str, float],
    *,
    interpolation_method: InterpolationMethod = "linear",
) -> None:
    """Initializes the Resample transform.

    Args:
        sampling_rate: Target sampling rate for time series features. If a
            float is provided, all possible time series features will be
            resampled. Alternatively, a dictionary can be provided where
            the key is the feature and the value is the target sample rate.
        interpolation_method: The interpolation method to use. Supported
            are "linear" and "cubic", with the default being
            "linear".
    """
    self.sampling_rate = sampling_rate
    self.interpolation_method = interpolation_method

Select(features)

Bases: Transform

Selects a subset of features from the data.

Initializes the Select transform.

Parameters:

Name Type Description Default
features IntoExpr | Iterable[IntoExpr]

The features to select. Treats the selection as a parameter to polars select method. You can use regular expressions by wrapping the argument by ^ and $.

required
Source code in src/flowcean/polars/transforms/select.py
16
17
18
19
20
21
22
23
24
def __init__(self, features: IntoExpr | Iterable[IntoExpr]) -> None:
    """Initializes the Select transform.

    Args:
        features: The features to select. Treats the selection as a
            parameter to polars `select` method. You can use regular
            expressions by wrapping the argument by ^ and $.
    """
    self.features = features

SignalFilter(features, filter_type, filter_frequency, *, order=5)

Bases: Transform

Applies a Butterworth filter to time series features.

Applies a Butterworth lowpass or highpass filter to time series features. For this transform to work, the time series must already have a uniform sampling rate. Use a `Resample' transform to uniformly sample the points of a time series.

Initializes the Filter transform.

Parameters:

Name Type Description Default
features Iterable[str]

Features that shall be filtered.

required
filter_type SignalFilterType

Type of the filter to apply. Valid options are "lowpass" and "highpass".

required
filter_frequency float

Characteristic frequency of the filter in Hz. For high- and lowpass this is the cutoff frequency.

required
order int

Order of the Butterworth filter to uses. Defaults to 5.

5
Source code in src/flowcean/polars/transforms/signal_filter.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    features: Iterable[str],
    filter_type: SignalFilterType,
    filter_frequency: float,
    *,
    order: int = 5,
) -> None:
    """Initializes the Filter transform.

    Args:
        features: Features that shall be filtered.
        filter_type: Type of the filter to apply. Valid options are
            "lowpass" and "highpass".
        filter_frequency: Characteristic frequency of the filter in Hz. For
            high- and lowpass this is the cutoff frequency.
        order: Order of the Butterworth filter to uses. Defaults to 5.
    """
    self.features = features
    self.filter_type = filter_type
    self.frequency = filter_frequency
    self.order = order

SlidingWindow(window_size)

Bases: Transform

Transforms the data with a sliding window.

The sliding window transform transforms the data by creating a sliding window over the row dimension. The data is then transformed by creating a new column for each column in the original data. The new columns are named by appending the index of the row in the sliding window to the original column name. As an example, consider the following data:

x y z
1 10 100
2 20 200
3 30 300
4 40 400
5 50 500

If we apply a sliding window with a window size of 3, we get the following

x_0 y_0 z_0 x_1 y_1 z_1 x_2 y_2 z_2
1 10 100 2 20 200 3 30 300
2 20 200 3 30 300 4 40 400
3 30 300 4 40 400 5 50 500

Parameters:

Name Type Description Default
window_size int

size of the sliding window.

required
Source code in src/flowcean/polars/transforms/sliding_window.py
37
38
def __init__(self, window_size: int) -> None:
    self.window_size = window_size

TimeSeriesSlidingWindow(window_size, *, features=None, stride=1, rechunk=True)

Bases: Transform

Convert single large time series into a set of smaller sub-series.

Applies a sliding window to each individual time series sample of all or selected time series features while leaving other features unchanged. As a result, the resulting data frame will contain multiple samples for each original sample, where each sample is a sub-series of the original time series. The number of features (columns) will remain the same. For this transform to work, all selected time series features of a sample must have the same time vector. Use a MatchSamplingRate or Resample transform to ensure this is the case.

Initializes the TimeSeriesSlidingWindow transform.

Parameters:

Name Type Description Default
window_size int

The size of the sliding window.

required
features str | Iterable[str] | None

The features to apply the sliding window to. If None, all time series features are selected.

None
stride int

The stride of the sliding window.

1
rechunk bool

Whether to rechunk the data after applying the transform. Rechunking improves performance of subsequent operations, but increases memory usage and may slow down the initial operation.

True
Source code in src/flowcean/polars/transforms/sliding_window_ts.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
    self,
    window_size: int,
    *,
    features: str | Iterable[str] | None = None,
    stride: int = 1,
    rechunk: bool = True,
) -> None:
    """Initializes the TimeSeriesSlidingWindow transform.

    Args:
        window_size: The size of the sliding window.
        features: The features to apply the sliding window to. If None, all
            time series features are selected.
        stride: The stride of the sliding window.
        rechunk: Whether to rechunk the data after applying the transform.
            Rechunking improves performance of subsequent operations, but
            increases memory usage and may slow down the initial operation.
    """
    self.window_size = window_size
    self.features = features
    self.stride = stride
    self.rechunk = rechunk

Standardize(mean=None, std=None) dataclass

Bases: Transform, FitOnce

Standardize features by removing the mean and scaling to unit variance.

A sample \(x\) is standardized as:

\[ z = \frac{(x - \mu)}{\sigma} \]

where

  • \(\mu\) is the mean of the samples
  • \(\sigma\) is the standard deviation of the samples.

Attributes:

Name Type Description
mean dict[str, float] | None

The mean \(\mu\) of each feature.

std dict[str, float] | None

The standard deviation \(\sigma\) of each feature.

TimeWindow(*, features=None, time_start=0.0, time_end=math.inf)

Bases: Transform

Limit time series to a certain time window.

Initializes the TimeWindow transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to apply this transformation to. If None, all applicable features will be affected.

None
time_start float

Window start time. Defaults to zero. All data before this time will be removed from the time series when applying the transform.

0.0
time_end float

Window end time. Defaults to infinite. All data after this time will be removed from the time series when applying the transform.

inf
Source code in src/flowcean/polars/transforms/time_window.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    *,
    features: Iterable[str] | None = None,
    time_start: float = 0.0,
    time_end: float = math.inf,
) -> None:
    """Initializes the TimeWindow transform.

    Args:
        features: The features to apply this transformation to. If `None`,
            all applicable features will be affected.
        time_start: Window start time. Defaults to zero. All data before
            this time will be removed from the time series when applying
            the transform.
        time_end: Window end time. Defaults to infinite. All data after
            this time will be removed from the time series when applying
            the transform.
    """
    self.features = features
    self.t_start = time_start
    self.t_end = time_end

Unnest(features)

Bases: Transform

Decompose struct columns into separate columns for each field.

Example:

data_frame = pl.Series(
    "c",
    [
        {"a": 1, "t": 1},
        {"a": 4, "t": 2},
        {"a": 7, "t": 3},
        {"a": 10, "t": 4},
        {"a": 15, "t": 5},
    ],
).to_frame()
The transformed_data will be:
pl.DataFrame(
    {
        "a": [1, 4, 7, 10, 15],
        "t": [1, 2, 3, 4, 5],
    },
)
.

Initializes the Unnest transform.

Parameters:

Name Type Description Default
features ColumnNameOrSelector | Collection[ColumnNameOrSelector]

The features to unnest. Treats the selection as a parameter to polars unnest method. You can use regular expressions by wrapping the argument by ^ and $.

required
Source code in src/flowcean/polars/transforms/unnest.py
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    features: ColumnNameOrSelector | Collection[ColumnNameOrSelector],
) -> None:
    """Initializes the Unnest transform.

    Args:
        features: The features to unnest. Treats the selection as a
            parameter to polars `unnest` method. You can use regular
            expressions by wrapping the argument by ^ and $.
    """
    self.features = features