Skip to content

transforms

Cast(target_type, *, features=None)

Bases: Transform

Cast features to a different datatype.

Initializes the Cast transform.

Parameters:

Name Type Description Default
target_type PolarsDataType

Type to which the features will be cast.

required
features Iterable[str] | None

The features to cast. If None all features will be cast. This is the default behaviour.

None
Source code in src/flowcean/polars/transforms/cast.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    target_type: PolarsDataType,
    *,
    features: Iterable[str] | None = None,
) -> None:
    """Initializes the Cast transform.

    Args:
        target_type: Type to which the features will be cast.
        features: The features to cast. If `None` all
            features will be cast. This is the default behaviour.
    """
    self.target_type = target_type
    self.features = features

Drop(features)

Bases: Transform

Drop features from the data.

Initializes the Drop transform.

Source code in src/flowcean/polars/transforms/drop.py
12
13
14
15
def __init__(self, features: str | Iterable[str]) -> None:
    """Initializes the Drop transform."""
    super().__init__()
    self.features = features

Explode(features)

Bases: Transform

Explodes a Dataframe to long format by exploding the given features.

Parameters:

Name Type Description Default
features list[str]

List of features to explode.

required

The below example shows the usage of a Explode transform in an experiment.yaml file. Assuming the loaded data is represented by the table:

time feature_a feature_b constant
[0, 1] [2, 1] [9, 3] 1
[0, 2] [3, 4] [8, 4] 2

This transform can be used to explode the columns time, feature_a, and feature_b.

The resulting Dataframe after the transform is:

time feature_a feature_b constant
0 2 9 1
1 1 3 1
0 3 8 2
2 4 4 2
Source code in src/flowcean/polars/transforms/explode.py
39
40
def __init__(self, features: list[str]) -> None:
    self.features = features

Filter(expression)

Bases: Transform

Filter an environment based on one or multiple expressions.

This transform allows to filter an environment based on or multiple boolean expressions. Assuming the input environment is given by

t N x
1 10 0
2 12 1
3 5 2
4 15 1
5 17 0

The following transformation can be used to filter the environment so that the result contains only records where x=1:

    Filter("x == 1")

The result dataset after applying the transform will be

t N x
2 15 1
4 12 1

To only get records where x=1 and t > 3 the filter expression

Filter(And(["x == 1", "t > 3"]))

can be used.

To filter all records where x=1 and t > 3 or N < 15 use

Filter(And(["x == 1", Or(["t > 3", "N < 15"])]))

Initializes the Filter transform.

Parameters:

Name Type Description Default
expression str | FilterExpr

String or filter expression used to filter the environment. Records that do not match the expression are discarded. Standard comparison and mathematical operations are supported within the expressions. Features can be accessed by there name.

required
Source code in src/flowcean/polars/transforms/filter.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    expression: str | FilterExpr,
) -> None:
    """Initializes the Filter transform.

    Args:
        expression: String or filter expression used to filter the
            environment. Records that do not match the expression are
            discarded. Standard comparison and mathematical operations are
            supported within the expressions. Features can be accessed by
            there name.
    """
    if isinstance(expression, str):
        self.predicate = _str_to_pl(expression)
    else:
        self.predicate = expression()

First(features, *, replace=False)

Bases: Transform

Selects the first time value of a time-series feature.

Initializes the First transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The features to apply this transform to.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_first.

False
Source code in src/flowcean/polars/transforms/first.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the First transform.

    Args:
        features: The features to apply this transform to.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_first`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

FeatureLengthVaryError

Bases: Exception

Length of a feature varies over different rows.

Flatten(features=None)

Bases: Transform

Flatten all time series in a DataFrame to individual features.

The given DataFrame's time series are converted into individual features, with each time step creating a new feature. This transform will change the order of the columns in the resulting dataset.

For example the dataset

series_data A B
{[0, 0], [1, 1], [2, 2]} 42 43
{[0, 3], [1, 4], [2, 5]} 44 45

gets flattened into the dataset

series_data_0 series_data_1 series_data_2 A B
0 1 2 42 43
3 4 5 42 43

Initialize the flatten transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to flatten. If not provided or set to None, all possible features from the given dataframe will be flattened.

None
Source code in src/flowcean/polars/transforms/flatten.py
35
36
37
38
39
40
41
42
43
def __init__(self, features: Iterable[str] | None = None) -> None:
    """Initialize the flatten transform.

    Args:
        features: The features to flatten. If not provided or set to None,
            all possible features from the given dataframe will be
            flattened.
    """
    self.features = features

NoTimeSeriesFeatureError

Bases: Exception

Feature is no time series.

Lambda(fn)

Bases: Transform

Apply a custom function to the data of an environment.

Initializes the Lambda transform.

Parameters:

Name Type Description Default
fn Callable[[LazyFrame], LazyFrame]

Function handle to be applied to the data.

required
Source code in src/flowcean/polars/transforms/function.py
15
16
17
18
19
20
21
def __init__(self, fn: Callable[[pl.LazyFrame], pl.LazyFrame]) -> None:
    """Initializes the Lambda transform.

    Args:
        fn: Function handle to be applied to the data.
    """
    self.fn = fn

Last(features, *, replace=False)

Bases: Transform

Selects the last time value of a time-series feature.

Initializes the Last transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The features to apply this transform to.

required
replace bool

Whether to replace the original features with the transformed ones. If set to False, the default, the value will be added as a new feature named {feature}_last.

False
Source code in src/flowcean/polars/transforms/last.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    features: str | Iterable[str],
    *,
    replace: bool = False,
) -> None:
    """Initializes the Last transform.

    Args:
        features: The features to apply this transform to.
        replace: Whether to replace the original features with the
            transformed ones. If set to False, the default, the value will
            be added as a new feature named `{feature}_last`.
    """
    self.features = [features] if isinstance(features, str) else features
    self.replace = replace

FeatureNotFoundError(feature)

Bases: Exception

Feature not found in the DataFrame.

This exception is raised when a feature is not found in the DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
213
214
def __init__(self, feature: str) -> None:
    super().__init__(f"{feature} not found")

MatchSamplingRate(reference_feature_name, feature_interpolation_map)

Bases: Transform

Matches the sampling rate of all time series in the DataFrame.

Interpolates the time series to match the sampling rate of the reference time series. The below example shows the usage of a MatchSamplingRate transform in a run.py file. Assuming the loaded data is represented by the table:

feature_a feature_b const
list[struct[time,struct[]]] list[struct[time,struct[]]] int
--------------------------- --------------------------- -----
[{12:26:01.0, {1.2}}, [{12:26:00.0, {1.0}}, 1
{12:26:02.0, {2.4}}, {12:26:05.0, {2.0}}]
{12:26:03.0, {3.6}},
{12:26:04.0, {4.8}}]

The following transform can be used to match the sampling rate of the time series feature_b to the sampling rate of the time series feature_a.

    ...
    environment.load()
    data = environment.get_data()
    transform = MatchSamplingRate(
        reference_feature_name="feature_a",
        feature_interpolation_map={
            "feature_b": "linear",
        },
    )
    transformed_data = transform.transform(data)
    ...

The resulting Dataframe after the transform is:

feature_a feature_b const
list[struct[time,struct[]]] list[struct[time,struct[]]] int
--------------------------- --------------------------- -----
[{12:26:00.0, {1.2}}, [{12:26:00.0, {1.2}}, 1
{12:26:01.0, {2.4}}, {12:26:01.0, {1.4}},
{12:26:02.0, {3.6}}, {12:26:02.0, {1.6}},
{12:26:03.0, {4.8}}] {12:26:03.0, {1.8}}]

Initialize the transform.

Parameters:

Name Type Description Default
reference_feature_name str

Reference timeseries feature.

required
feature_interpolation_map dict[str, MatchSamplingRateMethod]

Key-value pairs of the timeseries features that are targeted in interpolation columns and the interpolation method to use. At the moment, the interpolation method can only be 'linear'.

required
Source code in src/flowcean/polars/transforms/match_sampling_rate.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    reference_feature_name: str,
    feature_interpolation_map: dict[str, MatchSamplingRateMethod],
) -> None:
    """Initialize the transform.

    Args:
        reference_feature_name: Reference timeseries feature.
        feature_interpolation_map: Key-value pairs of the timeseries
            features that are targeted in interpolation columns and the
            interpolation method to use. At the moment, the interpolation
            method can only be 'linear'.
    """
    self.reference_feature_name = reference_feature_name
    self.feature_interpolation_map = feature_interpolation_map

apply(data)

Transform the input DataFrame.

Parameters:

Name Type Description Default
data LazyFrame

Input DataFrame.

required

Returns:

Type Description
LazyFrame

Transformed DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def apply(self, data: pl.LazyFrame) -> pl.LazyFrame:
    """Transform the input DataFrame.

    Args:
        data: Input DataFrame.

    Returns:
        Transformed DataFrame.

    """
    # preserve all constant columns that are not timeseries data
    transformed_data = pl.DataFrame()
    collected_data = data.collect()
    for i in range(len(collected_data.rows())):
        transformed_data_slice = self._transform_row(
            collected_data.slice(i, 1),
        )
        transformed_data = transformed_data.vstack(transformed_data_slice)
    return transformed_data.lazy()

UnknownInterpolationError(interpolation_method)

Bases: Exception

Interpolation method is not implemented yet.

This exception is raised when a feature is not found in the DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
223
224
def __init__(self, interpolation_method: str) -> None:
    super().__init__(f"{interpolation_method} not found")

Median(features)

Bases: Transform

Replaces time-series features with their median value.

Initializes the Median transform.

Parameters:

Name Type Description Default
features str | Iterable[str]

The feature or features the median should be calculated for.

required
Source code in src/flowcean/polars/transforms/median.py
15
16
17
18
19
20
21
22
def __init__(self, features: str | Iterable[str]) -> None:
    """Initializes the Median transform.

    Args:
        features: The feature or features the median should be calculated
            for.
    """
    self.features = [features] if isinstance(features, str) else features

OneCold(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-cold features.

Transforms integer features into a set of binary one-cold features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-cold transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
0 1 1 1
1 0 1 1
1 1 0 1
1 0 1 1
1 1 1 0

In the default configuration missing categories are ignored. Their respective entries will all be one. If you however want to enforce that each data entry belongs to a certain category, you can set the check_for_missing_categories flag to true when constructing a One-Cold transform. In that case if an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. This however has an impact on the performance and will slow down the transform.

If you want to enable this check, create the transform as follows: python transform = OneCold( feature_categories={ "feature": [0, 1, 2, 5] }, check_for_missing_categories=True )

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_cold.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The data to transform.

required

Returns:

Type Description
LazyFrame

The transformed data.

Source code in src/flowcean/polars/transforms/one_cold.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@override
def apply(
    self,
    data: pl.LazyFrame,
) -> pl.LazyFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).ne(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ],
        ).drop(feature)

        # Check only for missing categories if the user has requested it
        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ],
            )  # Get the new crated on-cold feature columns
            .select(
                # Check if all on-cold features are true
                # That's only the case if the category is missing
                pl.all_horizontal(
                    pl.all(),
                ).all(),  # Combine the results for all data entries ...
            )
            .collect(streaming=True)
            # ... and get the final result.
            # If it is false, there is a missing category
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data DataFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_cold.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@classmethod
def from_dataframe(
    cls,
    data: pl.DataFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-cold",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique()).to_series().to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

OneHot(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-hot features.

Transforms integer features into a set of binary one-hot features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-hot transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
1 0 0 0
0 1 0 0
0 0 1 0
0 1 0 0
0 0 0 1

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_hot.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The data to transform.

required

Returns:

Type Description
LazyFrame

The transformed data.

Source code in src/flowcean/polars/transforms/one_hot.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@override
def apply(
    self,
    data: pl.LazyFrame,
) -> pl.LazyFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).eq(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ],
        ).drop(feature)

        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ],
            )
            .select(pl.any_horizontal(pl.all()).all())
            .collect(streaming=True)
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data LazyFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_hot.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def from_dataframe(
    cls,
    data: pl.LazyFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-hot",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique())
            .collect(streaming=True)
            .to_series()
            .to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

Rename(mapping)

Bases: Transform

Rename features in an environment.

Initializes the Rename transform.

Parameters:

Name Type Description Default
mapping dict[str, str]

Key value pairs that map from the old feature name to the new one.

required
Source code in src/flowcean/polars/transforms/rename.py
14
15
16
17
18
19
20
21
def __init__(self, mapping: dict[str, str]) -> None:
    """Initializes the Rename transform.

    Args:
        mapping: Key value pairs that map from the old feature name to the
            new one.
    """
    self.mapping = mapping

Resample(sampling_rate, *, interpolation_method='linear')

Bases: Transform

Resample time series features to a given sampling rate.

Initializes the Resample transform.

Parameters:

Name Type Description Default
sampling_rate float | dict[str, float]

Target sampling rate for time series features. If a float is provided, all possible time series features will be resampled. Alternatively, a dictionary can be provided where the key is the feature and the value is the target sample rate.

required
interpolation_method InterpolationMethod

The interpolation method to use. Supported are "linear" and "cubic", with the default being "linear".

'linear'
Source code in src/flowcean/polars/transforms/resample.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    sampling_rate: float | dict[str, float],
    *,
    interpolation_method: InterpolationMethod = "linear",
) -> None:
    """Initializes the Resample transform.

    Args:
        sampling_rate: Target sampling rate for time series features. If a
            float is provided, all possible time series features will be
            resampled. Alternatively, a dictionary can be provided where
            the key is the feature and the value is the target sample rate.
        interpolation_method: The interpolation method to use. Supported
            are "linear" and "cubic", with the default being
            "linear".
    """
    self.sampling_rate = sampling_rate
    self.interpolation_method = interpolation_method

Select(features)

Bases: Transform

Selects a subset of features from the data.

Initializes the Select transform.

Parameters:

Name Type Description Default
features IntoExpr | Iterable[IntoExpr]

The features to select. Treats the selection as a parameter to polars select method. You can use regular expressions by wrapping the argument by ^ and $.

required
Source code in src/flowcean/polars/transforms/select.py
16
17
18
19
20
21
22
23
24
def __init__(self, features: IntoExpr | Iterable[IntoExpr]) -> None:
    """Initializes the Select transform.

    Args:
        features: The features to select. Treats the selection as a
            parameter to polars `select` method. You can use regular
            expressions by wrapping the argument by ^ and $.
    """
    self.features = features

SignalFilter(features, filter_type, filter_frequency, *, order=5)

Bases: Transform

Applies a Butterworth filter to time series features.

Applies a Butterworth lowpass or highpass filter to time series features. For this transform to work, the time series must already have a uniform sampling rate. Use a `Resample' transform to uniformly sample the points of a time series.

Initializes the Filter transform.

Parameters:

Name Type Description Default
features Iterable[str]

Features that shall be filtered.

required
filter_type SignalFilterType

Type of the filter to apply. Valid options are "lowpass" and "highpass".

required
filter_frequency float

Characteristic frequency of the filter in Hz. For high- and lowpass this is the cutoff frequency.

required
order int

Order of the Butterworth filter to uses. Defaults to 5.

5
Source code in src/flowcean/polars/transforms/signal_filter.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    features: Iterable[str],
    filter_type: SignalFilterType,
    filter_frequency: float,
    *,
    order: int = 5,
) -> None:
    """Initializes the Filter transform.

    Args:
        features: Features that shall be filtered.
        filter_type: Type of the filter to apply. Valid options are
            "lowpass" and "highpass".
        filter_frequency: Characteristic frequency of the filter in Hz. For
            high- and lowpass this is the cutoff frequency.
        order: Order of the Butterworth filter to uses. Defaults to 5.
    """
    self.features = features
    self.filter_type = filter_type
    self.frequency = filter_frequency
    self.order = order

SlidingWindow(window_size)

Bases: Transform

Transforms the data with a sliding window.

The sliding window transform transforms the data by creating a sliding window over the row dimension. The data is then transformed by creating a new column for each column in the original data. The new columns are named by appending the index of the row in the sliding window to the original column name. As an example, consider the following data:

x y z
1 10 100
2 20 200
3 30 300
4 40 400
5 50 500

If we apply a sliding window with a window size of 3, we get the following

x_0 y_0 z_0 x_1 y_1 z_1 x_2 y_2 z_2
1 10 100 2 20 200 3 30 300
2 20 200 3 30 300 4 40 400
3 30 300 4 40 400 5 50 500

Parameters:

Name Type Description Default
window_size int

size of the sliding window.

required
Source code in src/flowcean/polars/transforms/sliding_window.py
37
38
def __init__(self, window_size: int) -> None:
    self.window_size = window_size

Standardize(mean=None, std=None) dataclass

Bases: Transform, FitOnce

Standardize features by removing the mean and scaling to unit variance.

A sample \(x\) is standardized as:

\[ z = \frac{(x - \mu)}{\sigma} \]

where

  • \(\mu\) is the mean of the samples
  • \(\sigma\) is the standard deviation of the samples.

Attributes:

Name Type Description
mean dict[str, float] | None

The mean \(\mu\) of each feature.

std dict[str, float] | None

The standard deviation \(\sigma\) of each feature.

TimeWindow(*, features=None, time_start=0.0, time_end=math.inf)

Bases: Transform

Limit time series to a certain time window.

Initializes the TimeWindow transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to apply this transformation to. If None, all applicable features will be affected.

None
time_start float

Window start time. Defaults to zero. All data before this time will be removed from the time series when applying the transform.

0.0
time_end float

Window end time. Defaults to infinite. All data after this time will be removed from the time series when applying the transform.

inf
Source code in src/flowcean/polars/transforms/time_window.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    *,
    features: Iterable[str] | None = None,
    time_start: float = 0.0,
    time_end: float = math.inf,
) -> None:
    """Initializes the TimeWindow transform.

    Args:
        features: The features to apply this transformation to. If `None`,
            all applicable features will be affected.
        time_start: Window start time. Defaults to zero. All data before
            this time will be removed from the time series when applying
            the transform.
        time_end: Window end time. Defaults to infinite. All data after
            this time will be removed from the time series when applying
            the transform.
    """
    self.features = features
    self.t_start = time_start
    self.t_end = time_end