Skip to content

core

Adapter

Bases: ABC

Abstract base class for adapters.

start() abstractmethod

Start the adapter.

This method is called when the tool loop is started. It should be used to initialize the adapter, start any background processes and establish connections to the CPS.

Source code in src/flowcean/core/adapter.py
 9
10
11
12
13
14
15
16
@abstractmethod
def start(self) -> None:
    """Start the adapter.

    This method is called when the tool loop is started.
    It should be used to initialize the adapter, start any
    background processes and establish connections to the CPS.
    """

stop() abstractmethod

Stop the adapter.

This method is called when the tool loop is stopped. It should be used to clean up resources, stop any background processes and close connections to the CPS.

Source code in src/flowcean/core/adapter.py
18
19
20
21
22
23
24
25
@abstractmethod
def stop(self) -> None:
    """Stop the adapter.

    This method is called when the tool loop is stopped.
    It should be used to clean up resources, stop any background
    processes and close connections to the CPS.
    """

get_data() abstractmethod

Get data from the CPS through the adapter.

Retrieve a data record from the CPS. This method should block until data is available. If no more data is available, it should raise a Stop exception.

Returns:

Type Description
Data

The data retrieved from the CPS.

Source code in src/flowcean/core/adapter.py
27
28
29
30
31
32
33
34
35
36
37
@abstractmethod
def get_data(self) -> Data:
    """Get data from the CPS through the adapter.

    Retrieve a data record from the CPS. This method should block until
    data is available. If no more data is available, it should raise a
    Stop exception.

    Returns:
        The data retrieved from the CPS.
    """

send_data(data) abstractmethod

Send data to the CPS through the adapter.

This method allows sending data to the CPS. It is used by the tool loop to send the results for the tool evaluation back to the CPS for further processing.

Parameters:

Name Type Description Default
data Data

The data to send.

required
Source code in src/flowcean/core/adapter.py
39
40
41
42
43
44
45
46
47
48
49
@abstractmethod
def send_data(self, data: Data) -> None:
    """Send data to the CPS through the adapter.

    This method allows sending data to the CPS. It is used by the tool loop
    to send the results for the tool evaluation back to the CPS for further
    processing.

    Args:
        data: The data to send.
    """

Actable

Bases: Protocol

Base class for active environments.

Active environments require actions to be taken to advance.

act(action) abstractmethod

Act on the environment.

Parameters:

Name Type Description Default
action Data

The action to perform.

required
Source code in src/flowcean/core/environment/actable.py
16
17
18
19
20
21
22
@abstractmethod
def act(self, action: Data) -> None:
    """Act on the environment.

    Args:
        action: The action to perform.
    """

ActiveEnvironment

Bases: Environment, Stepable, Actable, Protocol

Base class for active environments.

An active environment loads data in an interactive way, e.g., from a simulation or real system. The environment requires actions to be taken to advance. Data can be retrieved by observing the environment.

Environment

Bases: Named, Protocol

Base class adding transform support.

append_transform(transform)

Append a transform to the observation.

Parameters:

Name Type Description Default
transform Transform

Transform to append.

required

Returns:

Type Description
Self

This observable with the appended transform.

Source code in src/flowcean/core/environment/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def append_transform(
    self,
    transform: Transform,
) -> Self:
    """Append a transform to the observation.

    Args:
        transform: Transform to append.

    Returns:
        This observable with the appended transform.
    """
    self.transform = self.transform.chain(transform)
    return self

observe()

Observe and return the observation.

Source code in src/flowcean/core/environment/base.py
46
47
48
def observe(self) -> Data:
    """Observe and return the observation."""
    return self.transform(self._observe())

__or__(transform)

Shortcut for with_transform.

Source code in src/flowcean/core/environment/base.py
50
51
52
53
54
55
56
@final
def __or__(
    self,
    transform: Transform,
) -> Self:
    """Shortcut for `with_transform`."""
    return self.append_transform(transform)

Finished

Bases: Exception

Exception raised when the environment is finished.

This exception is raised when the environment is finished, and no more data can be retrieved.

IncrementalEnvironment

Bases: Environment, Stepable, Iterable[Data], Protocol

Base class for incremental environments.

Incremental environments are environments that can be advanced by a step and provide a stream of data. The data can be observed at each step.

num_steps()

Return the number of steps in the environment.

Returns:

Type Description
int | None

The number of steps in the environment, or None if the number of

int | None

steps is unknown.

Source code in src/flowcean/core/environment/incremental.py
57
58
59
60
61
62
63
64
def num_steps(self) -> int | None:
    """Return the number of steps in the environment.

    Returns:
        The number of steps in the environment, or None if the number of
        steps is unknown.
    """
    return None

Stepable

Bases: Protocol

Base class for stepable environments.

Stepable environments are environments that can be advanced by a step. Usually, this is combined with an observable to provide a stream of data.

step() abstractmethod

Advance the environment by one step.

Source code in src/flowcean/core/environment/incremental.py
21
22
23
@abstractmethod
def step(self) -> None:
    """Advance the environment by one step."""

ChainedOfflineEnvironments(environments)

Bases: IncrementalEnvironment

Chained offline environments.

This environment chains multiple offline environments together. The environment will first observe the data from the first environment and then the data from the other environments.

Initialize the chained offline environments.

Parameters:

Name Type Description Default
environments Iterable[Environment]

The offline environments to chain.

required
Source code in src/flowcean/core/environment/offline.py
60
61
62
63
64
65
66
67
def __init__(self, environments: Iterable[Environment]) -> None:
    """Initialize the chained offline environments.

    Args:
        environments: The offline environments to chain.
    """
    self._environments = iter(environments)
    self._element = next(self._environments)

OfflineEnvironment

Bases: Environment, Protocol

Base class for offline environments.

Offline environments are used to represent datasets. They can be used to represent static datasets. Offline environments can be transformed and joined together to create new datasets.

chain(*other)

Chain this offline environment with other offline environments.

Chaining offline environments will create a new incremental environment that will first observe the data from this environment and then the data from the other environments.

Parameters:

Name Type Description Default
other Environment

The other offline environments to chain.

()

Returns:

Type Description
ChainedOfflineEnvironments

The chained offline environments.

Source code in src/flowcean/core/environment/offline.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def chain(self, *other: Environment) -> ChainedOfflineEnvironments:
    """Chain this offline environment with other offline environments.

    Chaining offline environments will create a new incremental environment
    that will first observe the data from this environment and then the
    data from the other environments.

    Args:
        other: The other offline environments to chain.

    Returns:
        The chained offline environments.
    """
    return ChainedOfflineEnvironments([self, *other])

__add__(other)

Shorthand for chain.

Source code in src/flowcean/core/environment/offline.py
43
44
45
46
@final
def __add__(self, other: Environment) -> ChainedOfflineEnvironments:
    """Shorthand for `chain`."""
    return self.chain(other)

ActiveLearner

Bases: Named, Protocol

Base class for active learners.

Active learners require actions to be taken to learn.

learn_active(action, observation) abstractmethod

Learn from actions and observations.

Parameters:

Name Type Description Default
action Data

The action performed.

required
observation Data

The observation of the environment.

required

Returns:

Type Description
Model

The model learned from the data.

Source code in src/flowcean/core/learner.py
54
55
56
57
58
59
60
61
62
63
64
@abstractmethod
def learn_active(self, action: Data, observation: Data) -> Model:
    """Learn from actions and observations.

    Args:
        action: The action performed.
        observation: The observation of the environment.

    Returns:
        The model learned from the data.
    """

propose_action(observation) abstractmethod

Propose an action based on an observation.

Parameters:

Name Type Description Default
observation Data

The observation of an environment.

required

Returns:

Type Description
Data

The action to perform.

Source code in src/flowcean/core/learner.py
66
67
68
69
70
71
72
73
74
75
@abstractmethod
def propose_action(self, observation: Data) -> Data:
    """Propose an action based on an observation.

    Args:
        observation: The observation of an environment.

    Returns:
        The action to perform.
    """

SupervisedIncrementalLearner

Bases: Named, Protocol

Base class for incremental supervised learners.

An incremental supervised learner learns from input-output pairs incrementally.

learn_incremental(inputs, outputs) abstractmethod

Learn from the data incrementally.

Parameters:

Name Type Description Default
inputs Data

The input data.

required
outputs Data

The output data.

required

Returns:

Type Description
Model

The model learned from the data.

Source code in src/flowcean/core/learner.py
35
36
37
38
39
40
41
42
43
44
45
@abstractmethod
def learn_incremental(self, inputs: Data, outputs: Data) -> Model:
    """Learn from the data incrementally.

    Args:
        inputs: The input data.
        outputs: The output data.

    Returns:
        The model learned from the data.
    """

SupervisedLearner

Bases: Named, Protocol

Base class for supervised learners.

A supervised learner learns from input-output pairs.

learn(inputs, outputs) abstractmethod

Learn from the data.

Parameters:

Name Type Description Default
inputs Data

The input data.

required
outputs Data

The output data.

required

Returns:

Type Description
Model

The model learned from the data.

Source code in src/flowcean/core/learner.py
15
16
17
18
19
20
21
22
23
24
25
@abstractmethod
def learn(self, inputs: Data, outputs: Data) -> Model:
    """Learn from the data.

    Args:
        inputs: The input data.
        outputs: The output data.

    Returns:
        The model learned from the data.
    """

Metric

Bases: Named, Protocol

Minimal template for metrics.

Call flow

call -> prepare(true), prepare(predicted) -> compute(true, predicted)

prepare(data)

Hook to normalize/collect/select data before computing metric.

Default: identity. Mixins override and call super().prepare(...)

Source code in src/flowcean/core/metric.py
24
25
26
27
28
29
def prepare(self, data: Data) -> Data:
    """Hook to normalize/collect/select data before computing metric.

    Default: identity. Mixins override and call super().prepare(...)
    """
    return data

__call__(true, predicted)

Execute metric: prepare inputs then compute.

Source code in src/flowcean/core/metric.py
35
36
37
38
39
40
41
42
@final
def __call__(
    self,
    true: Data,
    predicted: Data,
) -> Reportable | dict[str, Reportable]:
    """Execute metric: prepare inputs then compute."""
    return self.compute(true, predicted)

compute(true, predicted)

Implement metric logic on prepared inputs.

Source code in src/flowcean/core/metric.py
44
45
46
47
48
def compute(self, true: Data, predicted: Data) -> Reportable:
    """Implement metric logic on prepared inputs."""
    t = self.prepare(true)
    p = self.prepare(predicted)
    return self._compute(t, p)

Model

Bases: Named, Protocol

Base class for models.

A model is used to predict outputs for given inputs.

preprocess(input_features)

Preprocess pipeline step.

Source code in src/flowcean/core/model.py
25
26
27
def preprocess(self, input_features: Data) -> Data:
    """Preprocess pipeline step."""
    return self.pre_transform.apply(input_features)

predict(input_features)

Predict outputs for given inputs, applying transforms and hooks.

Source code in src/flowcean/core/model.py
40
41
42
43
44
def predict(self, input_features: Data) -> Data:
    """Predict outputs for given inputs, applying transforms and hooks."""
    input_features = self.preprocess(input_features)
    result = self._predict(input_features)
    return self.postprocess(result)

__call__(input_features)

Predict outputs for given inputs, applying transforms and hooks.

Source code in src/flowcean/core/model.py
46
47
48
49
@final
def __call__(self, input_features: Data) -> Data:
    """Predict outputs for given inputs, applying transforms and hooks."""
    return self.predict(input_features)

postprocess(output)

Postprocess pipeline step.

Source code in src/flowcean/core/model.py
51
52
53
def postprocess(self, output: Data) -> Data:
    """Postprocess pipeline step."""
    return self.post_transform.apply(output)

save(file)

Save the model to the file.

This method can be used to save a flowcean model to a file or a file-like object. To save a model to a file use

model.save("model.fml")

The resulting file will contain the model any any attached transforms. It can be loaded again using the load method from the Model class.

This method uses pickle to serialize the model, so child classes should ensure that all attributes are pickleable. If this is not the case, the child class should override this method to implement custom serialization logic, or use the __getstate__ and __setstate__ methods to control what is serialized (see https://docs.python.org/3/library/pickle.html#pickling-class-instances).

Parameters:

Name Type Description Default
file Path | str | BinaryIO

The file like object to save the model to.

required
Source code in src/flowcean/core/model.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def save(self, file: Path | str | BinaryIO) -> None:
    """Save the model to the file.

    This method can be used to save a flowcean model to a file or a
    file-like object. To save a model to a file use

    ```python
    model.save("model.fml")
    ```

    The resulting file will contain the model any any attached transforms.
    It can be loaded again using the `load` method from the `Model` class.

    This method uses pickle to serialize the model, so child classes should
    ensure that all attributes are pickleable. If this is not the case, the
    child class should override this method to implement custom
    serialization logic, or use the `__getstate__` and `__setstate__`
    methods to control what is serialized (see https://docs.python.org/3/library/pickle.html#pickling-class-instances).

    Args:
        file: The file like object to save the model to.
    """
    if isinstance(file, Path | str):
        path = Path(file)
        path.parent.mkdir(parents=True, exist_ok=True)
        with path.open("wb") as f:
            pickle.dump(self, f)
    else:
        pickle.dump(self, file)

load(file) staticmethod

Load a model from file.

This method can be used to load a previously saved flowcean model from a file or a file-like object. To load a model from a file use

model = Model.load("model.fml")

The load method will automatically determine the model type and and any attached transforms and will load them into the correct model class.

As this method uses the pickle module to load the model, it is not safe to load models from untrusted sources as this could lead to arbitrary code execution!

Parameters:

Name Type Description Default
file Path | str | BinaryIO

The file like object to load the model from.

required
Source code in src/flowcean/core/model.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@staticmethod
def load(file: Path | str | BinaryIO) -> Model:
    """Load a model from file.

    This method can be used to load a previously saved flowcean model from
    a file or a file-like object. To load a model from a file use

    ```python
    model = Model.load("model.fml")
    ```

    The `load` method will automatically determine the model type and and
    any attached transforms and will load them into the correct model
    class.

    As this method uses the `pickle` module to load the model, it is not
    safe to load models from untrusted sources as this could lead to
    arbitrary code execution!

    Args:
        file: The file like object to load the model from.
    """
    if isinstance(file, Path | str):
        with Path(file).open("rb") as f:
            instance = pickle.load(f)
    else:
        instance = pickle.load(file)

    return instance

Report

Bases: dict[str, ReportEntry]

A structured container for evaluation results of multiple models.

The Report maps model names to their metric results. For each model:

  • top-level keys are metric names,
  • values are either:
    • a single Reportable (e.g., scalar metric result), or
    • a nested mapping from submetric names to Reportable objects (e.g., per-class F1 scores, per-feature regression errors, or multi-output results).

This hierarchical structure allows uniform representation of both simple metrics and complex hierarchical metrics across multiple models.

Example:

report = Report( ... { ... "model_a": { ... "accuracy": 0.95, ... "f1": {"class_0": 0.91, "class_1": 0.89}, ... }, ... "model_b": { ... "mae": {"feature_x": 0.2, "feature_y": 0.3}, ... }, ... } ... )

pretty_print(header_style='bold magenta', metric_style='cyan', value_style='green', title_style='bold yellow')

Pretty print the report to the terminal.

Source code in src/flowcean/core/report.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def pretty_print(
    self,
    header_style: StyleType = "bold magenta",
    metric_style: StyleType = "cyan",
    value_style: StyleType = "green",
    title_style: StyleType = "bold yellow",
) -> None:
    """Pretty print the report to the terminal."""
    console = Console()
    for model, entry in self.items():
        table = Table(show_header=True, header_style=header_style)
        table.add_column("Metric", style=metric_style, no_wrap=True)
        table.add_column("Value", style=value_style)

        for metric, value in entry.items():
            if isinstance(value, Mapping):
                value = cast("Mapping[str, Reportable]", value)
                for submetric, subvalue in value.items():
                    table.add_row(
                        f"{metric} → {submetric}",
                        _format_value(subvalue),
                    )
            else:
                table.add_row(metric, _format_value(value))

        panel = Panel(
            table,
            title=f"[{title_style}]{model}[/]",
            expand=False,
        )
        console.print(panel)

Reportable

Bases: Protocol

__str__() abstractmethod

Return a string representation.

Source code in src/flowcean/core/report.py
19
20
21
@abstractmethod
def __str__(self) -> str:
    """Return a string representation."""

Action(actuators) dataclass

An action in an active environment.

The action contains 'actuators', which represent setpoints in the environment. Each actuator targets exactly one input feature.

Parameters:

Name Type Description Default
actuators list[ActiveInterface]

List of interface objects, which are setpoints

required

ActiveInterface(uid, value, value_min, value_max, shape, dtype) dataclass

Interface to a feature in an active environment.

Represents a single feature of the environment, which can be either an input, an output, or the reward of the environment.

Parameters:

Name Type Description Default
uid str

Identifier of the feature inside the environment

required
value int | float | NDArray[Any] | None

The value of the feature

required
value_min SupportsFloat | NDArray[Any] | list[Any]

Simple representation of the minimum value

required
value_max SupportsFloat | NDArray[Any] | list[Any]

Simple representation of the maximum value

required
shape Sequence[int]

Tuple representing the shape of the value

required
dtype type[floating[Any]] | type[integer[Any]]

Data type of this interface, e.g., numpy.float32

required

Observation(sensors, rewards) dataclass

An observation of an active environment.

The observation contains 'sensors', which are the raw observations of featured values, and rewards, which are a rated quantification of the environment state.

Parameters:

Name Type Description Default
sensors list[ActiveInterface]

List of interface objects, i.e., raw observations

required
rewards list[ActiveInterface]

List of interface objects, i.e., rated state

required

StopLearning

Bases: Exception

Stop learning.

This exception is raised when the learning process should stop.

ChainedTransforms(*transforms)

Bases: Invertible, Transform

A composition of multiple transforms applied sequentially.

Chained transforms are applied left-to-right. Useful for building preprocessing pipelines.

Initialize the chained transforms.

Parameters:

Name Type Description Default
transforms Transform

The transforms to chain.

()
Source code in src/flowcean/core/transform.py
202
203
204
205
206
207
208
209
210
211
def __init__(
    self,
    *transforms: Transform,
) -> None:
    """Initialize the chained transforms.

    Args:
        transforms: The transforms to chain.
    """
    self.transforms = transforms

Identity()

Bases: Invertible, Transform

A no-op transform that returns data unchanged.

Often used as a placeholder or default transform.

Initialize the identity transform.

Source code in src/flowcean/core/transform.py
261
262
263
def __init__(self) -> None:
    """Initialize the identity transform."""
    super().__init__()

Invertible

Bases: Protocol

Protocol for transforms that support inversion.

An invertible transform can undo its effect via inverse().

Example
>>> scaler = Standardize().fit(data)
>>> restored = scaler.inverse()(scaler(data))

inverse() abstractmethod

Return a new transform that inverts this one.

Returns:

Type Description
Transform

The inverse of the transform.

Source code in src/flowcean/core/transform.py
184
185
186
187
188
189
190
@abstractmethod
def inverse(self) -> Transform:
    """Return a new transform that inverts this one.

    Returns:
        The inverse of the transform.
    """

Lambda(func, *, inverse_func=None)

Bases: Transform, Invertible

A transform wrapping a function.

Useful for quick one-off transformations without creating a dedicated class.

Example
>>> to_float = Lambda(lambda df: df.cast(pl.Float64))
>>> normalized = Lambda(
...     lambda df: (df - df.mean()) / df.std(),
...     inverse_func=lambda df: df * df.std() + df.mean(),
... )

Initialize the lambda transform.

Parameters:

Name Type Description Default
func Callable[[Data], Data]

Function that transforms data.

required
inverse_func Callable[[Data], Data] | None

Optional function that inverts func.

None
Source code in src/flowcean/core/transform.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def __init__(
    self,
    func: Callable[[Data], Data],
    *,
    inverse_func: Callable[[Data], Data] | None = None,
) -> None:
    """Initialize the lambda transform.

    Args:
        func: Function that transforms data.
        inverse_func: Optional function that inverts ``func``.
    """
    self.func = func
    self.inverse_func = inverse_func

Transform

Bases: Named, Protocol

Base protocol for all transforms in Flowcean.

A transform is a reusable operation that modifies data. Examples include preprocessing (e.g., standardization), feature engineering (e.g., feature selection, PCA), or augmentation (noise injection, synthetic features).

Transforms are composable via the | operator, allowing complex transformation pipelines to be expressed in a clean and functional style:

Example
>>> transform = Select(features=["x"]) | Standardize()
>>> transformed = transform(dataset)

apply(data) abstractmethod

Apply the transform to data.

Parameters:

Name Type Description Default
data Data

The data to transform.

required

Returns:

Type Description
Data

The transformed data.

Source code in src/flowcean/core/transform.py
78
79
80
81
82
83
84
85
86
87
@abstractmethod
def apply(self, data: Data) -> Data:
    """Apply the transform to data.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """

__call__(data)

Apply the transform to data.

Equivalent to self.apply(data).

Parameters:

Name Type Description Default
data Data

The data to transform.

required

Returns:

Type Description
Data

The transformed data.

Source code in src/flowcean/core/transform.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@final
def __call__(self, data: Data) -> Data:
    """Apply the transform to data.

    Equivalent to ``self.apply(data)``.

    Args:
        data: The data to transform.

    Returns:
        The transformed data.
    """
    return self.apply(data)

chain(other)

Chain this transform with other.

This can be used to chain multiple transforms together. Chained transforms are applied left-to-right:

Example
chained = TransformA().chain(TransformB())
chained(data)  # Equivalent to TransformB(TransformA(data))

Parameters:

Name Type Description Default
other Transform

The transforms to chain.

required

Returns:

Type Description
Transform

A new chained transform.

Source code in src/flowcean/core/transform.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def chain(
    self,
    other: Transform,
) -> Transform:
    """Chain this transform with ``other``.

    This can be used to chain multiple transforms together. Chained
    transforms are applied left-to-right:

    Example:
        ```python
        chained = TransformA().chain(TransformB())
        chained(data)  # Equivalent to TransformB(TransformA(data))
        ```

    Args:
        other: The transforms to chain.

    Returns:
        A new chained transform.
    """
    return ChainedTransforms(self, other)

__or__(other)

Shorthand for chaining transforms.

Example
chained = TransformA() | TransformB()

Parameters:

Name Type Description Default
other Transform

The transform to chain.

required

Returns:

Type Description
Transform

A new Chain transform.

Source code in src/flowcean/core/transform.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def __or__(
    self,
    other: Transform,
) -> Transform:
    """Shorthand for chaining transforms.

    Example:
        ```python
        chained = TransformA() | TransformB()
        ```

    Args:
        other: The transform to chain.

    Returns:
        A new Chain transform.
    """
    return self.chain(other)

fit(data)

Fit the transform to data.

Many transforms (e.g. scaling, PCA) require statistics from the dataset before applying. Default implementation is a no-op. This is meant to be idempotent, i.e., calling fit() multiple times should have the same effect as calling it once.

Parameters:

Name Type Description Default
data Data

The data to fit to.

required
Source code in src/flowcean/core/transform.py
145
146
147
148
149
150
151
152
153
154
155
156
157
def fit(self, data: Data) -> Self:
    """Fit the transform to data.

    Many transforms (e.g. scaling, PCA) require statistics from the dataset
    before applying. Default implementation is a no-op.
    This is meant to be idempotent, i.e., calling ``fit()`` multiple times
    should have the same effect as calling it once.

    Args:
        data: The data to fit to.
    """
    _ = data
    return self

fit_incremental(data)

Incrementally fit the transform to streaming/batched data.

Default implementation is a no-op.

Parameters:

Name Type Description Default
data Data

The data to fit to.

required
Source code in src/flowcean/core/transform.py
159
160
161
162
163
164
165
166
167
168
def fit_incremental(self, data: Data) -> Self:
    """Incrementally fit the transform to streaming/batched data.

    Default implementation is a no-op.

    Args:
        data: The data to fit to.
    """
    _ = data
    return self

learn_active(environment, learner)

Learn from an active environment.

Learn from an active environment by interacting with it and learning from the observations. The learning process stops when the environment ends or when the learner requests to stop.

Parameters:

Name Type Description Default
environment ActiveEnvironment

The active environment.

required
learner ActiveLearner

The active learner.

required

Returns:

Type Description
Model

The model learned from the environment.

Source code in src/flowcean/core/strategies/active.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def learn_active(
    environment: ActiveEnvironment,
    learner: ActiveLearner,
) -> Model:
    """Learn from an active environment.

    Learn from an active environment by interacting with it and
    learning from the observations. The learning process stops when the
    environment ends or when the learner requests to stop.

    Args:
        environment: The active environment.
        learner: The active learner.

    Returns:
        The model learned from the environment.
    """
    model = None

    try:
        while True:
            observations = environment.observe()
            action = learner.propose_action(observations)
            environment.act(action)
            environment.step()
            observations = environment.observe()
            model = learner.learn_active(action, observations)
    except StopLearning:
        pass
    if model is None:
        message = "No model was learned."
        raise RuntimeError(message)
    return model

deploy(environment, model, input_transforms=None, output_transforms=None)

Deploy a trained model to a custom environment.

Parameters:

Name Type Description Default
environment ActiveEnvironment | IncrementalEnvironment

custom system environment

required
model Model

the trained model

required
input_transforms Transform | None

system specific transforms for model input

None
output_transforms Transform | None

system specific transforms for system input

None
Source code in src/flowcean/core/strategies/deploy.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def deploy(
    environment: ActiveEnvironment | IncrementalEnvironment,
    model: Model,
    input_transforms: Transform | None = None,
    output_transforms: Transform | None = None,
) -> None:
    """Deploy a trained model to a custom environment.

    Args:
        environment: custom system environment
        model: the trained model
        input_transforms: system specific transforms for model input
        output_transforms: system specific transforms for system input
    """
    if input_transforms is None:
        input_transforms = Identity()
    if output_transforms is None:
        output_transforms = Identity()

    observation = environment.observe()
    output = model.predict(input_transforms.apply(observation))

    if isinstance(environment, ActiveEnvironment):
        environment.act(output_transforms.apply(output).collect())

learn_incremental(environment, learner, inputs, outputs, input_transform=None, output_transform=None)

Learn from a incremental environment.

Learn from a incremental environment by incrementally learning from the input-output pairs. The learning process stops when the environment ends.

Parameters:

Name Type Description Default
environment IncrementalEnvironment

The incremental environment.

required
learner SupervisedIncrementalLearner

The supervised incremental learner.

required
inputs list[str]

The input feature names.

required
outputs list[str]

The output feature names.

required
input_transform Transform | None

The transform to apply to the input features.

None
output_transform InvertibleTransform | None

The transform to apply to the output features. Its inverse will be part of the final model.

None

Returns:

Type Description
Model

The model learned from the environment.

Source code in src/flowcean/core/strategies/incremental.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def learn_incremental(
    environment: IncrementalEnvironment,
    learner: SupervisedIncrementalLearner,
    inputs: list[str],
    outputs: list[str],
    input_transform: Transform | None = None,
    output_transform: InvertibleTransform | None = None,
) -> Model:
    """Learn from a incremental environment.

    Learn from a incremental environment by incrementally learning from
    the input-output pairs. The learning process stops when the environment
    ends.

    Args:
        environment: The incremental environment.
        learner: The supervised incremental learner.
        inputs: The input feature names.
        outputs: The output feature names.
        input_transform: The transform to apply to the input features.
        output_transform: The transform to apply to the output features.
            Its inverse will be part of the final model.

    Returns:
        The model learned from the environment.
    """
    if input_transform is None:
        input_transform = Identity()
    if output_transform is None:
        output_transform = Identity()

    model = None
    for data in environment:
        input_features = data.select(inputs)
        output_features = data.select(outputs)

        input_transform.fit_incremental(input_features)
        input_features = input_transform.apply(input_features)

        output_transform.fit_incremental(output_features)
        output_features = output_transform.apply(output_features)

        model = learner.learn_incremental(
            input_features,
            output_features,
        )

    if model is None:
        message = "No data found in environment."
        raise ValueError(message)

    model.post_transform |= output_transform.inverse()

    return model

evaluate_offline(models, environment, inputs, outputs, metrics)

Evaluate a model on an offline environment.

Evaluate a model on an offline environment by predicting the outputs from the inputs and comparing them to the true outputs.

Parameters:

Name Type Description Default
models Model | Iterable[Model]

The models to evaluate.

required
environment OfflineEnvironment

The offline environment.

required
inputs Sequence[str]

The input feature names.

required
outputs Sequence[str]

The output feature names.

required
metrics Sequence[Metric]

The metrics to evaluate the model with.

required

Returns:

Type Description
Report

The evaluation report.

Source code in src/flowcean/core/strategies/offline.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def evaluate_offline(
    models: Model | Iterable[Model],
    environment: OfflineEnvironment,
    inputs: Sequence[str],
    outputs: Sequence[str],
    metrics: Sequence[Metric],
) -> Report:
    """Evaluate a model on an offline environment.

    Evaluate a model on an offline environment by predicting the outputs from
    the inputs and comparing them to the true outputs.

    Args:
        models: The models to evaluate.
        environment: The offline environment.
        inputs: The input feature names.
        outputs: The output feature names.
        metrics: The metrics to evaluate the model with.

    Returns:
        The evaluation report.
    """
    if not isinstance(models, Iterable):
        models = [models]
    data = environment.observe()
    input_features = data.select(inputs)
    output_features = data.select(outputs)
    entries: dict[str, ReportEntry] = {}

    for model in models:
        predictions = model.predict(input_features)

        entries[model.name] = ReportEntry(
            {
                metric.name: metric(output_features, predictions.lazy())
                for metric in metrics
            },
        )
    return Report(entries)

learn_offline(environment, learner, inputs, outputs, *, input_transform=None, output_transform=None)

Learn from an offline environment.

Learn from an offline environment by learning from the input-output pairs.

Parameters:

Name Type Description Default
environment OfflineEnvironment

The offline environment.

required
learner SupervisedLearner

The supervised learner.

required
inputs list[str]

The input feature names.

required
outputs list[str]

The output feature names.

required
input_transform Transform | None

The transform to apply to the input features. Will be part of the final model.

None
output_transform InvertibleTransform | None

The transform to apply to the output features. Its inverse will be part of the final model.

None

Returns:

Type Description
Model

The model learned from the environment.

Source code in src/flowcean/core/strategies/offline.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def learn_offline(
    environment: OfflineEnvironment,
    learner: SupervisedLearner,
    inputs: list[str],
    outputs: list[str],
    *,
    input_transform: Transform | None = None,
    output_transform: InvertibleTransform | None = None,
) -> Model:
    """Learn from an offline environment.

    Learn from an offline environment by learning from the input-output pairs.

    Args:
        environment: The offline environment.
        learner: The supervised learner.
        inputs: The input feature names.
        outputs: The output feature names.
        input_transform: The transform to apply to the input features.
            Will be part of the final model.
        output_transform: The transform to apply to the output features.
            Its inverse will be part of the final model.

    Returns:
        The model learned from the environment.
    """
    if input_transform is None:
        input_transform = Identity()
    if output_transform is None:
        output_transform = Identity()

    logger.info("Learning with offline strategy")
    data = environment.observe()

    logger.info("Selecting input and output features")
    input_features = data.select(inputs)
    output_features = data.select(outputs)

    logger.info("Fitting transforms and applying them to features")
    input_transform.fit(input_features)
    input_features = input_transform.apply(input_features)

    logger.info("Fitting output transform and applying it to output features")
    output_transform.fit(output_features)
    output_features = output_transform.apply(output_features)

    logger.info("Learning model")
    model = learner.learn(inputs=input_features, outputs=output_features)

    model.post_transform |= output_transform.inverse()

    return model