Skip to content

polars

DummyLearner

Bases: SupervisedLearner, SupervisedIncrementalLearner

Dummy learner that learns nothing.

This learner is useful for testing purposes.

DummyModel(output_names)

Bases: Model

Dummy model that predicts zeros.

This model is useful for testing purposes.

Initialize the model.

Parameters:

Name Type Description Default
output_names list[str]

The names of the output features.

required
Source code in src/flowcean/polars/dummy.py
44
45
46
47
48
49
50
def __init__(self, output_names: list[str]) -> None:
    """Initialize the model.

    Args:
        output_names: The names of the output features.
    """
    self.output_names = output_names

CsvDataLoader(path, separator=',')

Bases: Dataset

DataLoader for CSV files.

Initialize the CsvDataLoader.

Parameters:

Name Type Description Default
path str | Path

Path to the CSV file.

required
separator str

Value separator. Defaults to ",".

','
Source code in src/flowcean/polars/environments/csv.py
14
15
16
17
18
19
20
21
22
23
def __init__(self, path: str | Path, separator: str = ",") -> None:
    """Initialize the CsvDataLoader.

    Args:
        path: Path to the CSV file.
        separator: Value separator. Defaults to ",".
    """
    data = pl.scan_csv(path, separator=separator)
    data = data.rename(lambda column_name: column_name.strip())
    super().__init__(data)

Dataset(data)

Bases: OfflineEnvironment

A dataset environment.

This environment represents static tabular datasets.

Attributes:

Name Type Description
data LazyFrame

The data to represent.

Initialize the dataset environment.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The data to represent.

required
Source code in src/flowcean/polars/environments/dataset.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, data: pl.DataFrame | pl.LazyFrame) -> None:
    """Initialize the dataset environment.

    Args:
        data: The data to represent.
    """
    if isinstance(data, pl.DataFrame):
        self.data = data.lazy()
        self._length = len(data)
    else:
        self.data = data
    super().__init__()

__len__()

Return the number of samples in the dataset.

Source code in src/flowcean/polars/environments/dataset.py
43
44
45
46
47
48
49
50
51
def __len__(self) -> int:
    """Return the number of samples in the dataset."""
    if self._length is None:
        # This operation is potentially very slow / costly
        self._length = cast(
            int,
            self.data.select(pl.len()).collect().item(),
        )
    return self._length

DatasetPredictionEnvironment(environment, batch_size)

Bases: ActiveEnvironment

Dataset prediction environment.

Initialize the dataset prediction environment.

Parameters:

Name Type Description Default
environment Dataset

The dataset to use for prediction.

required
batch_size int

The batch size of the prediction.

required
Source code in src/flowcean/polars/environments/datasetprediction.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(
    self,
    environment: Dataset,
    batch_size: int,
) -> None:
    """Initialize the dataset prediction environment.

    Args:
        environment: The dataset to use for prediction.
        batch_size: The batch size of the prediction.
    """
    super().__init__()
    self.environment = environment
    self.batch_size = batch_size

JoinedOfflineEnvironment(environments)

Bases: OfflineEnvironment

Environment that joins multiple offline environments.

Attributes:

Name Type Description
environments Iterable[OfflineEnvironment]

The offline environments to join.

Initialize the joined offline environment.

Parameters:

Name Type Description Default
environments Iterable[OfflineEnvironment]

The offline environments to join.

required
Source code in src/flowcean/polars/environments/join.py
18
19
20
21
22
23
24
25
def __init__(self, environments: Iterable[OfflineEnvironment]) -> None:
    """Initialize the joined offline environment.

    Args:
        environments: The offline environments to join.
    """
    self.environments = environments
    super().__init__()

JsonDataLoader(path)

Bases: Dataset

DataLoader for JSON files.

Initialize the JsonDataLoader.

Parameters:

Name Type Description Default
path str | Path

Path to the JSON file.

required
Source code in src/flowcean/polars/environments/json.py
12
13
14
15
16
17
18
19
def __init__(self, path: str | Path) -> None:
    """Initialize the JsonDataLoader.

    Args:
        path: Path to the JSON file.
    """
    data = pl.read_json(path)
    super().__init__(data.lazy())

ParquetDataLoader(path)

Bases: Dataset

DataLoader for Parquet files.

Initialize the ParquetDataLoader.

Parameters:

Name Type Description Default
path str | Path

Path to the Parquet file.

required
Source code in src/flowcean/polars/environments/parquet.py
11
12
13
14
15
16
17
18
def __init__(self, path: str | Path) -> None:
    """Initialize the ParquetDataLoader.

    Args:
        path: Path to the Parquet file.
    """
    data = pl.scan_parquet(path)
    super().__init__(data)

StreamingOfflineEnvironment(environment, batch_size)

Bases: IncrementalEnvironment

Streaming offline environment.

This environment streams data from an offline environment in batches.

Initialize the streaming offline environment.

Parameters:

Name Type Description Default
environment OfflineEnvironment

The offline environment to stream.

required
batch_size int

The batch size of the streaming environment.

required
Source code in src/flowcean/polars/environments/streaming.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(
    self,
    environment: OfflineEnvironment,
    batch_size: int,
) -> None:
    """Initialize the streaming offline environment.

    Args:
        environment: The offline environment to stream.
        batch_size: The batch size of the streaming environment.
    """
    super().__init__()
    self.environment = environment
    self.batch_size = batch_size

TrainTestSplit(ratio, *, shuffle=False)

Split data into train and test sets.

Initialize the train-test splitter.

Parameters:

Name Type Description Default
ratio float

The ratio of the data to put in the training set.

required
shuffle bool

Whether to shuffle the data before splitting.

False
Source code in src/flowcean/polars/environments/train_test_split.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    ratio: float,
    *,
    shuffle: bool = False,
) -> None:
    """Initialize the train-test splitter.

    Args:
        ratio: The ratio of the data to put in the training set.
        shuffle: Whether to shuffle the data before splitting.
    """
    if ratio < 0 or ratio > 1:
        message = "ratio must be between 0 and 1"
        raise ValueError(message)
    self.ratio = ratio
    self.shuffle = shuffle

split(environment)

Split the data into train and test sets.

Parameters:

Name Type Description Default
environment OfflineEnvironment

The environment to split.

required
Source code in src/flowcean/polars/environments/train_test_split.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def split(
    self,
    environment: OfflineEnvironment,
) -> tuple[Dataset, Dataset]:
    """Split the data into train and test sets.

    Args:
        environment: The environment to split.
    """
    logger.info("Splitting data into train and test sets")
    data = environment.observe().collect(streaming=True)
    pivot = int(len(data) * self.ratio)
    splits = _split(
        data,
        lengths=[pivot, len(data) - pivot],
        shuffle=self.shuffle,
        seed=get_seed(),
    )
    return Dataset(splits[0].lazy()), Dataset(splits[1].lazy())

InvalidUriSchemeError(scheme)

Bases: Exception

Exception raised when an URI scheme is invalid.

Initialize the InvalidUriSchemeError.

Parameters:

Name Type Description Default
scheme str

Invalid URI scheme.

required
Source code in src/flowcean/polars/environments/uri.py
42
43
44
45
46
47
48
49
50
def __init__(self, scheme: str) -> None:
    """Initialize the InvalidUriSchemeError.

    Args:
        scheme: Invalid URI scheme.
    """
    super().__init__(
        f"only file URIs can be converted to a path, but got `{scheme}`",
    )

UnsupportedFileTypeError(suffix)

Bases: Exception

Exception raised when a file type is not supported.

Initialize the UnsupportedFileTypeError.

Parameters:

Name Type Description Default
suffix str

File type suffix.

required
Source code in src/flowcean/polars/environments/uri.py
10
11
12
13
14
15
16
def __init__(self, suffix: str) -> None:
    """Initialize the UnsupportedFileTypeError.

    Args:
        suffix: File type suffix.
    """
    super().__init__(f"file type `{suffix}` is not supported")

UriDataLoader(uri)

Bases: Dataset

DataLoader for files specified by an URI.

Initialize the UriDataLoader.

Parameters:

Name Type Description Default
uri str

Path to the URI file.

required
Source code in src/flowcean/polars/environments/uri.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self, uri: str) -> None:
    """Initialize the UriDataLoader.

    Args:
        uri: Path to the URI file.
    """
    path = _file_uri_to_path(uri)
    suffix = path.suffix
    if suffix == ".csv":
        data_loader = CsvDataLoader(path)
    elif suffix == ".parquet":
        data_loader = ParquetDataLoader(path)
    else:
        raise UnsupportedFileTypeError(suffix)
    super().__init__(data_loader.data)

YamlDataLoader(path)

Bases: Dataset

DataLoader for YAML files.

Initialize the YamlDataLoader.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML file.

required
Source code in src/flowcean/polars/environments/yaml.py
12
13
14
15
16
17
18
19
def __init__(self, path: str | Path) -> None:
    """Initialize the YamlDataLoader.

    Args:
        path: Path to the YAML file.
    """
    data = pl.DataFrame(YAML(typ="safe").load(path))
    super().__init__(data)

Cast(target_type, *, features=None)

Bases: Transform

Cast features to a different datatype.

Initializes the Cast transform.

Parameters:

Name Type Description Default
target_type PolarsDataType

Type to which the features will be cast.

required
features Iterable[str] | None

The features to cast. If None all features will be cast. This is the default behaviour.

None
Source code in src/flowcean/polars/transforms/cast.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    target_type: PolarsDataType,
    *,
    features: Iterable[str] | None = None,
) -> None:
    """Initializes the Cast transform.

    Args:
        target_type: Type to which the features will be cast.
        features: The features to cast. If `None` all
            features will be cast. This is the default behaviour.
    """
    self.target_type = target_type
    self.features = features

Drop(features)

Bases: Transform

Drop features from the data.

Initializes the Drop transform.

Source code in src/flowcean/polars/transforms/drop.py
12
13
14
15
def __init__(self, features: str | Iterable[str]) -> None:
    """Initializes the Drop transform."""
    super().__init__()
    self.features = features

Explode(features)

Bases: Transform

Explodes a Dataframe to long format by exploding the given features.

Parameters:

Name Type Description Default
features list[str]

List of features to explode.

required

The below example shows the usage of a Explode transform in an experiment.yaml file. Assuming the loaded data is represented by the table:

time feature_a feature_b constant
[0, 1] [2, 1] [9, 3] 1
[0, 2] [3, 4] [8, 4] 2

This transform can be used to explode the columns time, feature_a, and feature_b.

The resulting Dataframe after the transform is:

time feature_a feature_b constant
0 2 9 1
1 1 3 1
0 3 8 2
2 4 4 2
Source code in src/flowcean/polars/transforms/explode.py
39
40
def __init__(self, features: list[str]) -> None:
    self.features = features

Filter(expression)

Bases: Transform

Filter an environment based on one or multiple expressions.

This transform allows to filter an environment based on or multiple boolean expressions. Assuming the input environment is given by

t N x
1 10 0
2 12 1
3 5 2
4 15 1
5 17 0

The following transformation can be used to filter the environment so that the result contains only records where x=1:

    Filter("x == 1")

The result dataset after applying the transform will be

t N x
2 15 1
4 12 1

To only get records where x=1 and t > 3 the filter expression

Filter(And(["x == 1", "t > 3"]))

can be used.

To filter all records where x=1 and t > 3 or N < 15 use

Filter(And(["x == 1", Or(["t > 3", "N < 15"])]))

Initializes the Filter transform.

Parameters:

Name Type Description Default
expression str | FilterExpr

String or filter expression used to filter the environment. Records that do not match the expression are discarded. Standard comparison and mathematical operations are supported within the expressions. Features can be accessed by there name.

required
Source code in src/flowcean/polars/transforms/filter.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    expression: str | FilterExpr,
) -> None:
    """Initializes the Filter transform.

    Args:
        expression: String or filter expression used to filter the
            environment. Records that do not match the expression are
            discarded. Standard comparison and mathematical operations are
            supported within the expressions. Features can be accessed by
            there name.
    """
    if isinstance(expression, str):
        self.predicate = _str_to_pl(expression)
    else:
        self.predicate = expression()

FilterExpr

Bases: ABC

Expression to be used in a Filter transform.

get() abstractmethod

Get the polars expression for this filter.

Source code in src/flowcean/polars/transforms/filter.py
18
19
20
@abstractmethod
def get(self) -> pl.Expr:
    """Get the polars expression for this filter."""

FeatureLengthVaryError

Bases: Exception

Length of a feature varies over different rows.

Flatten(features=None)

Bases: Transform

Flatten all time series in a DataFrame to individual features.

The given DataFrame's time series are converted into individual features, with each time step creating a new feature. This transform will change the order of the columns in the resulting dataset.

For example the dataset

series_data A B
{[0, 0], [1, 1], [2, 2]} 42 43
{[0, 3], [1, 4], [2, 5]} 44 45

gets flattened into the dataset

series_data_0 series_data_1 series_data_2 A B
0 1 2 42 43
3 4 5 42 43

Initialize the flatten transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to flatten. If not provided or set to None, all possible features from the given dataframe will be flattened.

None
Source code in src/flowcean/polars/transforms/flatten.py
35
36
37
38
39
40
41
42
43
def __init__(self, features: Iterable[str] | None = None) -> None:
    """Initialize the flatten transform.

    Args:
        features: The features to flatten. If not provided or set to None,
            all possible features from the given dataframe will be
            flattened.
    """
    self.features = features

NoTimeSeriesFeatureError

Bases: Exception

Feature is no time series.

Lambda(fn)

Bases: Transform

Apply a custom function to the data of an environment.

Initializes the Lambda transform.

Parameters:

Name Type Description Default
fn Callable[[LazyFrame], LazyFrame]

Function handle to be applied to the data.

required
Source code in src/flowcean/polars/transforms/function.py
15
16
17
18
19
20
21
def __init__(self, fn: Callable[[pl.LazyFrame], pl.LazyFrame]) -> None:
    """Initializes the Lambda transform.

    Args:
        fn: Function handle to be applied to the data.
    """
    self.fn = fn

FeatureNotFoundError(feature)

Bases: Exception

Feature not found in the DataFrame.

This exception is raised when a feature is not found in the DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
213
214
def __init__(self, feature: str) -> None:
    super().__init__(f"{feature} not found")

MatchSamplingRate(reference_feature_name, feature_interpolation_map)

Bases: Transform

Matches the sampling rate of all time series in the DataFrame.

Interpolates the time series to match the sampling rate of the reference time series. The below example shows the usage of a MatchSamplingRate transform in a run.py file. Assuming the loaded data is represented by the table:

feature_a feature_b const
list[struct[time,struct[]]] list[struct[time,struct[]]] int
--------------------------- --------------------------- -----
[{12:26:01.0, {1.2}}, [{12:26:00.0, {1.0}}, 1
{12:26:02.0, {2.4}}, {12:26:05.0, {2.0}}]
{12:26:03.0, {3.6}},
{12:26:04.0, {4.8}}]

The following transform can be used to match the sampling rate of the time series feature_b to the sampling rate of the time series feature_a.

    ...
    environment.load()
    data = environment.get_data()
    transform = MatchSamplingRate(
        reference_feature_name="feature_a",
        feature_interpolation_map={
            "feature_b": "linear",
        },
    )
    transformed_data = transform.transform(data)
    ...

The resulting Dataframe after the transform is:

feature_a feature_b const
list[struct[time,struct[]]] list[struct[time,struct[]]] int
--------------------------- --------------------------- -----
[{12:26:00.0, {1.2}}, [{12:26:00.0, {1.2}}, 1
{12:26:01.0, {2.4}}, {12:26:01.0, {1.4}},
{12:26:02.0, {3.6}}, {12:26:02.0, {1.6}},
{12:26:03.0, {4.8}}] {12:26:03.0, {1.8}}]

Initialize the transform.

Parameters:

Name Type Description Default
reference_feature_name str

Reference timeseries feature.

required
feature_interpolation_map dict[str, MatchSamplingRateMethod]

Key-value pairs of the timeseries features that are targeted in interpolation columns and the interpolation method to use. At the moment, the interpolation method can only be 'linear'.

required
Source code in src/flowcean/polars/transforms/match_sampling_rate.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    reference_feature_name: str,
    feature_interpolation_map: dict[str, MatchSamplingRateMethod],
) -> None:
    """Initialize the transform.

    Args:
        reference_feature_name: Reference timeseries feature.
        feature_interpolation_map: Key-value pairs of the timeseries
            features that are targeted in interpolation columns and the
            interpolation method to use. At the moment, the interpolation
            method can only be 'linear'.
    """
    self.reference_feature_name = reference_feature_name
    self.feature_interpolation_map = feature_interpolation_map

apply(data)

Transform the input DataFrame.

Parameters:

Name Type Description Default
data LazyFrame

Input DataFrame.

required

Returns:

Type Description
LazyFrame

Transformed DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def apply(self, data: pl.LazyFrame) -> pl.LazyFrame:
    """Transform the input DataFrame.

    Args:
        data: Input DataFrame.

    Returns:
        Transformed DataFrame.

    """
    # preserve all constant columns that are not timeseries data
    transformed_data = pl.DataFrame()
    collected_data = data.collect()
    for i in range(len(collected_data.rows())):
        transformed_data_slice = self._transform_row(
            collected_data.slice(i, 1),
        )
        transformed_data = transformed_data.vstack(transformed_data_slice)
    return transformed_data.lazy()

UnknownInterpolationError(interpolation_method)

Bases: Exception

Interpolation method is not implemented yet.

This exception is raised when a feature is not found in the DataFrame.

Source code in src/flowcean/polars/transforms/match_sampling_rate.py
223
224
def __init__(self, interpolation_method: str) -> None:
    super().__init__(f"{interpolation_method} not found")

OneCold(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-cold features.

Transforms integer features into a set of binary one-cold features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-cold transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
0 1 1 1
1 0 1 1
1 1 0 1
1 0 1 1
1 1 1 0

In the default configuration missing categories are ignored. Their respective entries will all be one. If you however want to enforce that each data entry belongs to a certain category, you can set the check_for_missing_categories flag to true when constructing a One-Cold transform. In that case if an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. This however has an impact on the performance and will slow down the transform.

If you want to enable this check, create the transform as follows: python transform = OneCold( feature_categories={ "feature": [0, 1, 2, 5] }, check_for_missing_categories=True )

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_cold.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The data to transform.

required

Returns:

Type Description
LazyFrame

The transformed data.

Source code in src/flowcean/polars/transforms/one_cold.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@override
def apply(
    self,
    data: pl.LazyFrame,
) -> pl.LazyFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).ne(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ],
        ).drop(feature)

        # Check only for missing categories if the user has requested it
        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ],
            )  # Get the new crated on-cold feature columns
            .select(
                # Check if all on-cold features are true
                # That's only the case if the category is missing
                pl.all_horizontal(
                    pl.all(),
                ).all(),  # Combine the results for all data entries ...
            )
            .collect(streaming=True)
            # ... and get the final result.
            # If it is false, there is a missing category
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data DataFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_cold.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@classmethod
def from_dataframe(
    cls,
    data: pl.DataFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-cold",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique()).to_series().to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

OneHot(feature_categories, *, check_for_missing_categories=False)

Bases: Transform

Transforms integer features into a set of binary one-hot features.

Transforms integer features into a set of binary one-hot features. The original integer features are dropped and are not part of the resulting data frame.

As an example consider the following data

feature
0
1
2
1
5

When the one-hot transformation is applied, the result is as follows

feature_0 feature_1 feature_2 feature_5
1 0 0 0
0 1 0 0
0 0 1 0
0 1 0 0
0 0 0 1

Initializes the One-Hot transform.

Parameters:

Name Type Description Default
feature_categories dict[str, list[Any]]

Dictionary of features and a list of categorical values to encode for each.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_hot.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    feature_categories: dict[str, list[Any]],
    *,
    check_for_missing_categories: bool = False,
) -> None:
    """Initializes the One-Hot transform.

    Args:
        feature_categories: Dictionary of features and a list of
            categorical values to encode for each.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    self.feature_category_mapping = {
        feature: {f"{feature}_{value}": value for value in values}
        for feature, values in feature_categories.items()
    }
    self.check_for_missing_categories = check_for_missing_categories

apply(data)

Transform data with this one hot transformation.

Transform data with this one hot transformation and return the resulting dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The data to transform.

required

Returns:

Type Description
LazyFrame

The transformed data.

Source code in src/flowcean/polars/transforms/one_hot.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@override
def apply(
    self,
    data: pl.LazyFrame,
) -> pl.LazyFrame:
    """Transform data with this one hot transformation.

    Transform data with this one hot transformation and return the
    resulting dataframe.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    if len(self.feature_category_mapping) == 0:
        raise NoCategoriesError
    for (
        feature,
        category_mappings,
    ) in self.feature_category_mapping.items():
        data = data.with_columns(
            [
                pl.col(feature).eq(value).cast(pl.Int64).alias(name)
                for name, value in category_mappings.items()
            ],
        ).drop(feature)

        if self.check_for_missing_categories and (
            not data.select(
                [
                    pl.col(name).cast(pl.Boolean)
                    for name in category_mappings
                ],
            )
            .select(pl.any_horizontal(pl.all()).all())
            .collect(streaming=True)
            .item(0, 0)
        ):
            raise NoMatchingCategoryError
    return data

from_dataframe(data, features, *, check_for_missing_categories=False) classmethod

Creates a new one-hot transformation based on sample data.

Parameters:

Name Type Description Default
data LazyFrame

A dataframe containing sample data for determining the categories of the transform.

required
features Iterable[str]

Name of the features for which the one hot transformation will determine the categories.

required
check_for_missing_categories bool

If set to true, a check is performed to see if all values belong to a category. If an unknown value is found which does not belong to any category, a NoMatchingCategoryError is thrown. To perform this check, the dataframe must be materialised, resulting in a potential performance decrease. Therefore it defaults to false.

False
Source code in src/flowcean/polars/transforms/one_hot.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def from_dataframe(
    cls,
    data: pl.LazyFrame,
    features: Iterable[str],
    *,
    check_for_missing_categories: bool = False,
) -> Self:
    """Creates a new one-hot transformation based on sample data.

    Args:
        data: A dataframe containing sample data for determining the
            categories of the transform.
        features: Name of the features for which the one hot transformation
            will determine the categories.
        check_for_missing_categories: If set to true, a check is performed
            to see if all values belong to a category. If an unknown value
            is found which does not belong to any category, a
            NoMatchingCategoryError is thrown. To perform this check, the
            dataframe must be materialised, resulting in a potential
            performance decrease. Therefore it defaults to false.
    """
    # Derive categories from the data frame
    feature_categories: dict[str, list[Any]] = {}
    for feature in features:
        if data.schema[feature].is_float():
            logger.warning(
                (
                    "Feature %s is of type float. Applying a one-hot",
                    "transform to it may produce undesired results.",
                    "Check your datatypes and transforms.",
                ),
                feature,
            )
        feature_categories[feature] = (
            data.select(pl.col(feature).unique())
            .collect(streaming=True)
            .to_series()
            .to_list()
        )
    return cls(
        feature_categories,
        check_for_missing_categories=check_for_missing_categories,
    )

Rename(mapping)

Bases: Transform

Rename features in an environment.

Initializes the Rename transform.

Parameters:

Name Type Description Default
mapping dict[str, str]

Key value pairs that map from the old feature name to the new one.

required
Source code in src/flowcean/polars/transforms/rename.py
14
15
16
17
18
19
20
21
def __init__(self, mapping: dict[str, str]) -> None:
    """Initializes the Rename transform.

    Args:
        mapping: Key value pairs that map from the old feature name to the
            new one.
    """
    self.mapping = mapping

Resample(sampling_rate, *, interpolation_method='linear')

Bases: Transform

Resample time series features to a given sampling rate.

Initializes the Resample transform.

Parameters:

Name Type Description Default
sampling_rate float | dict[str, float]

Target sampling rate for time series features. If a float is provided, all possible time series features will be resampled. Alternatively, a dictionary can be provided where the key is the feature and the value is the target sample rate.

required
interpolation_method InterpolationMethod

The interpolation method to use. Supported are "linear" and "cubic", with the default being "linear".

'linear'
Source code in src/flowcean/polars/transforms/resample.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    sampling_rate: float | dict[str, float],
    *,
    interpolation_method: InterpolationMethod = "linear",
) -> None:
    """Initializes the Resample transform.

    Args:
        sampling_rate: Target sampling rate for time series features. If a
            float is provided, all possible time series features will be
            resampled. Alternatively, a dictionary can be provided where
            the key is the feature and the value is the target sample rate.
        interpolation_method: The interpolation method to use. Supported
            are "linear" and "cubic", with the default being
            "linear".
    """
    self.sampling_rate = sampling_rate
    self.interpolation_method = interpolation_method

Select(features)

Bases: Transform

Selects a subset of features from the data.

Initializes the Select transform.

Parameters:

Name Type Description Default
features IntoExpr | Iterable[IntoExpr]

The features to select. Treats the selection as a parameter to polars select method. You can use regular expressions by wrapping the argument by ^ and $.

required
Source code in src/flowcean/polars/transforms/select.py
16
17
18
19
20
21
22
23
24
def __init__(self, features: IntoExpr | Iterable[IntoExpr]) -> None:
    """Initializes the Select transform.

    Args:
        features: The features to select. Treats the selection as a
            parameter to polars `select` method. You can use regular
            expressions by wrapping the argument by ^ and $.
    """
    self.features = features

SignalFilter(features, filter_type, filter_frequency, *, order=5)

Bases: Transform

Applies a Butterworth filter to time series features.

Applies a Butterworth lowpass or highpass filter to time series features. For this transform to work, the time series must already have a uniform sampling rate. Use a `Resample' transform to uniformly sample the points of a time series.

Initializes the Filter transform.

Parameters:

Name Type Description Default
features Iterable[str]

Features that shall be filtered.

required
filter_type SignalFilterType

Type of the filter to apply. Valid options are "lowpass" and "highpass".

required
filter_frequency float

Characteristic frequency of the filter in Hz. For high- and lowpass this is the cutoff frequency.

required
order int

Order of the Butterworth filter to uses. Defaults to 5.

5
Source code in src/flowcean/polars/transforms/signal_filter.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    features: Iterable[str],
    filter_type: SignalFilterType,
    filter_frequency: float,
    *,
    order: int = 5,
) -> None:
    """Initializes the Filter transform.

    Args:
        features: Features that shall be filtered.
        filter_type: Type of the filter to apply. Valid options are
            "lowpass" and "highpass".
        filter_frequency: Characteristic frequency of the filter in Hz. For
            high- and lowpass this is the cutoff frequency.
        order: Order of the Butterworth filter to uses. Defaults to 5.
    """
    self.features = features
    self.filter_type = filter_type
    self.frequency = filter_frequency
    self.order = order

SlidingWindow(window_size)

Bases: Transform

Transforms the data with a sliding window.

The sliding window transform transforms the data by creating a sliding window over the row dimension. The data is then transformed by creating a new column for each column in the original data. The new columns are named by appending the index of the row in the sliding window to the original column name. As an example, consider the following data:

x y z
1 10 100
2 20 200
3 30 300
4 40 400
5 50 500

If we apply a sliding window with a window size of 3, we get the following

x_0 y_0 z_0 x_1 y_1 z_1 x_2 y_2 z_2
1 10 100 2 20 200 3 30 300
2 20 200 3 30 300 4 40 400
3 30 300 4 40 400 5 50 500

Parameters:

Name Type Description Default
window_size int

size of the sliding window.

required
Source code in src/flowcean/polars/transforms/sliding_window.py
37
38
def __init__(self, window_size: int) -> None:
    self.window_size = window_size

Standardize(mean=None, std=None) dataclass

Bases: Transform, FitOnce

Standardize features by removing the mean and scaling to unit variance.

A sample \(x\) is standardized as:

\[ z = \frac{(x - \mu)}{\sigma} \]

where

  • \(\mu\) is the mean of the samples
  • \(\sigma\) is the standard deviation of the samples.

Attributes:

Name Type Description
mean dict[str, float] | None

The mean \(\mu\) of each feature.

std dict[str, float] | None

The standard deviation \(\sigma\) of each feature.

TimeWindow(*, features=None, time_start=0.0, time_end=math.inf)

Bases: Transform

Limit time series to a certain time window.

Initializes the TimeWindow transform.

Parameters:

Name Type Description Default
features Iterable[str] | None

The features to apply this transformation to. If None, all applicable features will be affected.

None
time_start float

Window start time. Defaults to zero. All data before this time will be removed from the time series when applying the transform.

0.0
time_end float

Window end time. Defaults to infinite. All data after this time will be removed from the time series when applying the transform.

inf
Source code in src/flowcean/polars/transforms/time_window.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    *,
    features: Iterable[str] | None = None,
    time_start: float = 0.0,
    time_end: float = math.inf,
) -> None:
    """Initializes the TimeWindow transform.

    Args:
        features: The features to apply this transformation to. If `None`,
            all applicable features will be affected.
        time_start: Window start time. Defaults to zero. All data before
            this time will be removed from the time series when applying
            the transform.
        time_end: Window end time. Defaults to infinite. All data after
            this time will be removed from the time series when applying
            the transform.
    """
    self.features = features
    self.t_start = time_start
    self.t_end = time_end

collect(environment, n=None, *, progress_bar=True)

Collect data from an environment.

Parameters:

Name Type Description Default
environment Iterable[LazyFrame] | Collection[LazyFrame]

The environment to collect data from.

required
n int | None

Number of samples to collect. If None, all samples are collected.

None
progress_bar bool | dict[str, Any]

Whether to show a progress bar. If a dictionary is provided, it will be passed to the progress bar.

True

Returns:

Type Description
Dataset

The collected dataset.

Source code in src/flowcean/polars/environments/dataset.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def collect(
    environment: Iterable[pl.LazyFrame] | Collection[pl.LazyFrame],
    n: int | None = None,
    *,
    progress_bar: bool | dict[str, Any] = True,
) -> Dataset:
    """Collect data from an environment.

    Args:
        environment: The environment to collect data from.
        n: Number of samples to collect. If None, all samples are collected.
        progress_bar: Whether to show a progress bar. If a dictionary is
            provided, it will be passed to the progress bar.

    Returns:
        The collected dataset.
    """
    samples = islice(environment, n)

    if n is not None:
        total = n
    elif isinstance(environment, Collection):
        total = len(environment)
    else:
        total = None

    if isinstance(progress_bar, dict):
        progress_bar.setdefault("desc", "Collecting samples")
        progress_bar.setdefault("total", total)
        samples = tqdm(
            samples,
            **progress_bar,
        )
    elif progress_bar:
        samples = tqdm(samples, desc="Collecting samples", total=total)

    data = pl.concat(samples, how="vertical")
    return Dataset(data)

is_timeseries_feature(df, name)

Check if the given column is a time series feature.

A time series feature contains a list of structs with fields time and value.

Parameters:

Name Type Description Default
df LazyFrame

The DataFrame to check.

required
name str

The column to check.

required

Returns:

Type Description
bool

True if the column is a time series feature, False otherwise.

Source code in src/flowcean/polars/is_time_series.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def is_timeseries_feature(df: pl.LazyFrame, name: str) -> bool:
    """Check if the given column is a time series feature.

    A time series feature contains a list of structs with fields _time_ and
    _value_.

    Args:
        df: The DataFrame to check.
        name: The column to check.

    Returns:
        True if the column is a time series feature, False otherwise.
    """
    data_type = df.select(name).collect_schema().dtypes()[0]

    if data_type.base_type() != pl.List:
        return False

    inner_type: pl.DataType = cast(pl.DataType, cast(pl.List, data_type).inner)
    if inner_type.base_type() != pl.Struct:
        return False

    field_names = [field.name for field in cast(pl.Struct, inner_type).fields]
    return "time" in field_names and "value" in field_names