Skip to content

transforms

Cast(target_type, *, features=None)

Bases: Transform

Cast features to a different datatype.

Initializes the Cast transform.

Parameters:

Name Type Description Default
target_type PolarsDataType

Type to which the features will be cast.

required
features Iterable[str] | None

The features to cast. If None all features will be cast. This is the default behaviour.

None
Source code in src/flowcean/transforms/cast.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    target_type: PolarsDataType,
    *,
    features: Iterable[str] | None = None,
) -> None:
    """Initializes the Cast transform.

    Args:
        target_type: Type to which the features will be cast.
        features: The features to cast. If `None` all
            features will be cast. This is the default behaviour.
    """
    self.target_type = target_type
    self.features = features

Explode(features)

Bases: Transform

Explodes a Dataframe to long format by exploding the given features.

Parameters:

Name Type Description Default
features list[str]

List of features to explode.

required

The below example shows the usage of a Explode transform in an experiment.yaml file. Assuming the loaded data is represented by the table:

time feature_a feature_b constant
[0, 1] [2, 1] [9, 3] 1
[0, 2] [3, 4] [8, 4] 2

This transform can be used to explode the columns time, feature_a, and feature_b.

The resulting Dataframe after the transform is:

time feature_a feature_b constant
0 2 9 1
1 1 3 1
0 3 8 2
2 4 4 2
Source code in src/flowcean/transforms/explode.py
39
40
def __init__(self, features: list[str]) -> None:
    self.features = features

FeatureLengthVaryError

Bases: Exception

Length of a feature varies over different rows.

Flatten(features=None)

Bases: Transform

Flatten all time series in a DataFrame to individual features.

The given DataFrame's time series are converted into individual features, with each time step creating a new feature. This transform will change the order of the columns in the resulting dataset.

For example the dataset

series_data A B
{[0, 0], [1, 1], [2, 2]} 42 43
{[0, 3], [1, 4], [2, 5]} 44 45

gets flattened into the dataset

series_data_0 series_data_1 series_data_2 A B
0 1 2 42 43
3 4 5 42 43

Initialize the flatten transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to flatten. If not provided or set to None, all possible features from the given dataframe will be flattened.

None
Source code in src/flowcean/transforms/flatten.py
35
36
37
38
39
40
41
42
43
def __init__(self, features: Iterable[str] | None = None) -> None:
    """Initialize the flatten transform.

    Args:
        features: The features to flatten. If not provided or set to None,
            all possible features from the given dataframe will be
            flattened.
    """
    self.features = features

NoTimeSeriesFeatureError

Bases: Exception

Feature is no time series.

Lambda(fn)

Bases: Transform

Apply a custom function to the data of an environment.

Initializes the Lambda transform.

Parameters:

Name Type Description Default
fn Callable[[DataFrame], DataFrame]

Function handle to be applied to the data.

required
Source code in src/flowcean/transforms/function.py
15
16
17
18
19
20
21
def __init__(self, fn: Callable[[pl.DataFrame], pl.DataFrame]) -> None:
    """Initializes the Lambda transform.

    Args:
        fn: Function handle to be applied to the data.
    """
    self.fn = fn

MatchSamplingRate(reference_timestamps, feature_columns_with_timestamps)

Bases: Transform

Matches the sampling rate of all time series in the DataFrame.

Interpolates the time series to match the sampling rate of the reference time series. The below example shows the usage of a MatchSamplingRate transform in a run.py file. Assuming the loaded data is represented by the table:

feature_a feature_b const
list[struct[datetime[us],struct[]] list[struct[datetime[us],struct[]] int
----------------------------------- ----------------------------------- -----
[{2024-06-25 12:26:01.0,{1.2}]}, [{2024-06-25 12:26:00.0,{1.0}}, 1
{2024-06-25 12:26:02.0,{2.4}]}, {2024-06-25 12:26:05.0,{2}}]
{2024-06-25 12:26:03.0,{3.6}]},
{2024-06-25 12:26:04.0,{4.8}]}]

The following transform can be used to match the sampling rate of the time series feature_b to the sampling rate of the time series feature_a.

    ...
    environment.load()
    data = environment.get_data()
    transform = MatchSamplingRate(
        reference_timestamps="time_feature_a",
        feature_columns_with_timestamps={
            "feature_b": "time_feature_b"
        },
    )
    transformed_data = transform.transform(data)
    ...

The resulting Dataframe after the transform is:

time_feature_a feature_a time_feature_b feature_b constant
[0, 1, 2] [2, 1, 7] [0, 1, 2] [10, 15, 20] 1
[0, 1, 2] [4, 1, 0] [0, 1, 2] [20, 30, 40] 2

Note that the used feature time_feature_b is still present in the DataFrame. To remove it use the select transform.

Initialize the MatchSamplingRate transform.

Parameters:

Name Type Description Default
reference_timestamps str

Timestamps of the reference feature.

required
feature_columns_with_timestamps dict[str, str]

Names of the features that are getting interpolated with their respective original timestamp feature names.

required
Source code in src/flowcean/transforms/match_sampling_rate.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    reference_timestamps: str,
    feature_columns_with_timestamps: dict[str, str],
) -> None:
    """Initialize the MatchSamplingRate transform.

    Args:
        reference_timestamps: Timestamps of the reference feature.
        feature_columns_with_timestamps: Names of the features that are
            getting interpolated with their respective original timestamp
            feature names.
    """
    self.reference_timestamps = reference_timestamps
    self.feature_columns_with_timestamps = feature_columns_with_timestamps

OneHot(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-hot features.

Transforms integer features into a set of binary one-hot features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-hot transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
1 0 0 0
0 1 0 0
0 0 1 0
0 1 0 0
0 0 0 1

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/transforms/one_hot.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data DataFrame

The data to transform.

required

Returns:

Type Description
DataFrame

The transformed data.

Source code in src/flowcean/transforms/one_hot.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@override
def apply(
    self,
    data: pl.DataFrame,
) -> pl.DataFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).eq(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ]
        ).drop(feature)

        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ]
            )
            .select(pl.any_horizontal(pl.all()).all())
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data DataFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/transforms/one_hot.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@classmethod
def from_dataframe(
    cls,
    data: pl.DataFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-hot",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique()).to_series().to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

Rechunk()

Bases: Transform

Rechunks a dataframe.

Rearranges a dataframe so that it resides in a contiguous block of memory. This improves the performance of any subsequent transform performed on the rechunked dataframe. However, this operation can be costly depending on the size of the dataframe, so it should be used with care and only when deemed necessary.

Initializes the Rechunk transform.

Source code in src/flowcean/transforms/rechunk.py
18
19
20
def __init__(self) -> None:
    """Initializes the Rechunk transform."""
    super().__init__()

Rename(mapping)

Bases: Transform

Rename features in an environment.

Initializes the Rename transform.

Parameters:

Name Type Description Default
mapping dict[str, str]

Key value pairs that map from the old feature name to the new one.

required
Source code in src/flowcean/transforms/rename.py
14
15
16
17
18
19
20
21
def __init__(self, mapping: dict[str, str]) -> None:
    """Initializes the Rename transform.

    Args:
        mapping: Key value pairs that map from the old feature name to the
            new one.
    """
    self.mapping = mapping

Resample(sampling_rate, *, interpolation_method='linear')

Bases: Transform

Resample time series features to a given sampling rate.

Initializes the Resample transform.

Parameters:

Name Type Description Default
sampling_rate float | dict[str, float]

Target sampling rate for time series features. If a float is provided, all possible time series features will be resampled. Alternatively, a dictionary can be provided where the key is the feature and the value is the target sample rate.

required
interpolation_method InterpolationMethod

The interpolation method to use. Supported are "linear" and "cubic", with the default being "linear".

'linear'
Source code in src/flowcean/transforms/resample.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    sampling_rate: float | dict[str, float],
    *,
    interpolation_method: InterpolationMethod = "linear",
) -> None:
    """Initializes the Resample transform.

    Args:
        sampling_rate: Target sampling rate for time series features. If a
            float is provided, all possible time series features will be
            resampled. Alternatively, a dictionary can be provided where
            the key is the feature and the value is the target sample rate.
        interpolation_method: The interpolation method to use. Supported
            are "linear" and "cubic", with the default being
            "linear".
    """
    self.sampling_rate = sampling_rate
    self.interpolation_method = interpolation_method

Select(features)

Bases: Transform

Selects a subset of features from the data.

Initializes the Select transform.

Parameters:

Name Type Description Default
features IntoExpr | Iterable[IntoExpr]

The features to select. Treats the selection as a parameter to polars select method. You can use regular expressions by wrapping the argument by ^ and $.

required
Source code in src/flowcean/transforms/select.py
16
17
18
19
20
21
22
23
24
def __init__(self, features: IntoExpr | Iterable[IntoExpr]) -> None:
    """Initializes the Select transform.

    Args:
        features: The features to select. Treats the selection as a
            parameter to polars `select` method. You can use regular
            expressions by wrapping the argument by ^ and $.
    """
    self.features = features

SignalFilter(features, filter_type, filter_frequency, *, order=5)

Bases: Transform

Applies a Butterworth filter to time series features.

Applies a Butterworth lowpass or highpass filter to time series features. For this transform to work, the time series must already have a uniform sampling rate. Use a `Resample' transform to uniformly sample the points of a time series.

Initializes the Filter transform.

Parameters:

Name Type Description Default
features Iterable[str]

Features that shall be filtered.

required
filter_type SignalFilterType

Type of the filter to apply. Valid options are "lowpass" and "highpass".

required
filter_frequency float

Characteristic frequency of the filter in Hz. For high- and lowpass this is the cutoff frequency.

required
order int

Order of the Butterworth filter to uses. Defaults to 5.

5
Source code in src/flowcean/transforms/signal_filter.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    features: Iterable[str],
    filter_type: SignalFilterType,
    filter_frequency: float,
    *,
    order: int = 5,
) -> None:
    """Initializes the Filter transform.

    Args:
        features: Features that shall be filtered.
        filter_type: Type of the filter to apply. Valid options are
            "lowpass" and "highpass".
        filter_frequency: Characteristic frequency of the filter in Hz. For
            high- and lowpass this is the cutoff frequency.
        order: Order of the Butterworth filter to uses. Defaults to 5.
    """
    self.features = features
    self.filter_type = filter_type
    self.frequency = filter_frequency
    self.order = order

SlidingWindow(window_size)

Bases: Transform

Transforms the data with a sliding window.

The sliding window transform transforms the data by creating a sliding window over the row dimension. The data is then transformed by creating a new column for each column in the original data. The new columns are named by appending the index of the row in the sliding window to the original column name. As an example, consider the following data:

x y z
1 10 100
2 20 200
3 30 300
4 40 400
5 50 500

If we apply a sliding window with a window size of 3, we get the following

x_0 y_0 z_0 x_1 y_1 z_1 x_2 y_2 z_2
1 10 100 2 20 200 3 30 300
2 20 200 3 30 300 4 40 400
3 30 300 4 40 400 5 50 500

Parameters:

Name Type Description Default
window_size int

size of the sliding window.

required
Source code in src/flowcean/transforms/sliding_window.py
38
39
def __init__(self, window_size: int) -> None:
    self.window_size = window_size

Standardize()

Bases: Transform, FitOnce

Standardize features by removing the mean and scaling to unit variance.

A sample \(x\) is standardized as:

\[ z = \frac{(x - \mu)}{\sigma} \]

where

  • \(\mu\) is the mean of the samples
  • \(\sigma\) is the standard deviation of the samples.

Attributes:

Name Type Description
mean dict[str, float] | None

The mean \(\mu\) of each feature.

std dict[str, float] | None

The standard deviation \(\sigma\) of each feature.

counts int | None

Number of samples already learned

Source code in src/flowcean/transforms/standardize.py
34
35
def __init__(self) -> None:
    super().__init__()

TimeWindow(*, features=None, time_start=0.0, time_end=math.inf)

Bases: Transform

Limit time series to a certain time window.

Initializes the TimeWindow transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to apply this transformation to. If None, all applicable features will be affected.

None
time_start float

Window start time. Defaults to zero. All data before this time will be removed from the time series when applying the transform.

0.0
time_end float

Window end time. Defaults to infinite. All data after this time will be removed from the time series when applying the transform.

inf
Source code in src/flowcean/transforms/time_window.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    *,
    features: Iterable[str] | None = None,
    time_start: float = 0.0,
    time_end: float = math.inf,
) -> None:
    """Initializes the TimeWindow transform.

    Args:
        features: The features to apply this transformation to. If `None`,
            all applicable features will be affected.
        time_start: Window start time. Defaults to zero. All data before
            this time will be removed from the time series when applying
            the transform.
        time_end: Window end time. Defaults to infinite. All data after
            this time will be removed from the time series when applying
            the transform.
    """
    self.features = features
    self.t_start = time_start
    self.t_end = time_end