Skip to content

offline

OfflineEnvironment()

Bases: TransformedObservable

Base class for offline environments.

Offline environments are used to represent datasets. They can be used to represent static datasets. Offline environments can be transformed and joined together to create new datasets.

Initialize the offline environment.

Source code in src/flowcean/core/environment/offline.py
25
26
27
def __init__(self) -> None:
    """Initialize the offline environment."""
    super().__init__()

join(other)

Join this offline environment with another one.

Parameters:

Name Type Description Default
other OfflineEnvironment

The other offline environment to join.

required

Returns:

Type Description
JoinedOfflineEnvironment

The joined offline environment.

Source code in src/flowcean/core/environment/offline.py
29
30
31
32
33
34
35
36
37
38
def join(self, other: OfflineEnvironment) -> JoinedOfflineEnvironment:
    """Join this offline environment with another one.

    Args:
        other: The other offline environment to join.

    Returns:
        The joined offline environment.
    """
    return JoinedOfflineEnvironment([self, other])

__and__(other)

Shorthand for join.

Source code in src/flowcean/core/environment/offline.py
40
41
42
def __and__(self, other: OfflineEnvironment) -> JoinedOfflineEnvironment:
    """Shorthand for `join`."""
    return self.join(other)

chain(*other)

Chain this offline environment with other offline environments.

Chaining offline environments will create a new incremental environment that will first observe the data from this environment and then the data from the other environments.

Parameters:

Name Type Description Default
other OfflineEnvironment

The other offline environments to chain.

()

Returns:

Type Description
ChainedOfflineEnvironments

The chained offline environments.

Source code in src/flowcean/core/environment/offline.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def chain(self, *other: OfflineEnvironment) -> ChainedOfflineEnvironments:
    """Chain this offline environment with other offline environments.

    Chaining offline environments will create a new incremental environment
    that will first observe the data from this environment and then the
    data from the other environments.

    Args:
        other: The other offline environments to chain.

    Returns:
        The chained offline environments.
    """
    from flowcean.core.environment.chained import (
        ChainedOfflineEnvironments,
    )

    return ChainedOfflineEnvironments([self, *other])

__add__(other)

Shorthand for chain.

Source code in src/flowcean/core/environment/offline.py
63
64
65
def __add__(self, other: OfflineEnvironment) -> ChainedOfflineEnvironments:
    """Shorthand for `chain`."""
    return self.chain(other)

write_parquet(path)

Write the environment to a parquet file at the specified path.

Write the environment to a parquet file. Use a ParquetDataLoader to load the data back into flowcean as an environment.

Parameters:

Name Type Description Default
path Path | str

Path to the parquet file where the data is written.

required
Source code in src/flowcean/core/environment/offline.py
67
68
69
70
71
72
73
74
75
76
77
78
def write_parquet(self, path: Path | str) -> None:
    """Write the environment to a parquet file at the specified path.

    Write the environment to a parquet file. Use a `ParquetDataLoader` to
    load the data back into flowcean as an environment.

    Args:
        path: Path to the parquet file where the data is written.
    """
    self.observe().collect(streaming=True).write_parquet(
        Path(path).with_suffix(".parquet"),
    )

as_stream(batch_size)

Convert the offline environment to a streaming environment.

Parameters:

Name Type Description Default
batch_size int

The batch size of the streaming environment.

required

Returns:

Type Description
StreamingOfflineEnvironment

The streaming environment.

Source code in src/flowcean/core/environment/offline.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def as_stream(self, batch_size: int) -> StreamingOfflineEnvironment:
    """Convert the offline environment to a streaming environment.

    Args:
        batch_size: The batch size of the streaming environment.

    Returns:
        The streaming environment.
    """
    from flowcean.environments.streaming import (
        StreamingOfflineEnvironment,
    )

    return StreamingOfflineEnvironment(self, batch_size)

JoinedOfflineEnvironment(environments)

Bases: OfflineEnvironment

Environment that joins multiple offline environments.

Attributes:

Name Type Description
environments Iterable[OfflineEnvironment]

The offline environments to join.

Initialize the joined offline environment.

Parameters:

Name Type Description Default
environments Iterable[OfflineEnvironment]

The offline environments to join.

required
Source code in src/flowcean/core/environment/offline.py
105
106
107
108
109
110
111
112
def __init__(self, environments: Iterable[OfflineEnvironment]) -> None:
    """Initialize the joined offline environment.

    Args:
        environments: The offline environments to join.
    """
    self.environments = environments
    super().__init__()