Skip to content

dataframe

DataFrame(data)

Bases: OfflineEnvironment

A dataset environment.

This environment represents static tabular datasets.

Attributes:

Name Type Description
data LazyFrame

The data to represent.

Initialize the dataset environment.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The data to represent.

required
Source code in src/flowcean/polars/environments/dataframe.py
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, data: pl.DataFrame | pl.LazyFrame) -> None:
    """Initialize the dataset environment.

    Args:
        data: The data to represent.
    """
    if isinstance(data, pl.DataFrame):
        self.data = data.lazy()
        self._length = len(data)
    else:
        self.data = data
    super().__init__()

to_incremental(batch_size=1)

Convert the DataFrame to an incremental environment.

Parameters:

Name Type Description Default
batch_size int

The size of each batch. Defaults to 1.

1
Source code in src/flowcean/polars/environments/dataframe.py
46
47
48
49
50
51
52
53
54
55
def to_incremental(
    self,
    batch_size: int = 1,
) -> StreamingOfflineEnvironment:
    """Convert the DataFrame to an incremental environment.

    Args:
        batch_size: The size of each batch. Defaults to 1.
    """
    return StreamingOfflineEnvironment(self, batch_size, size=len(self))

from_csv(path, separator=',') classmethod

Load a dataset from a CSV file.

Parameters:

Name Type Description Default
path str | PathLike[str]

Path to the CSV file.

required
separator str

Value separator. Defaults to ",".

','
Source code in src/flowcean/polars/environments/dataframe.py
57
58
59
60
61
62
63
64
65
66
67
@classmethod
def from_csv(cls, path: str | PathLike[str], separator: str = ",") -> Self:
    """Load a dataset from a CSV file.

    Args:
        path: Path to the CSV file.
        separator: Value separator. Defaults to ",".
    """
    data = pl.scan_csv(Path(path), separator=separator)
    data = data.rename(lambda column_name: column_name.strip())
    return cls(data)

from_json(path) classmethod

Load a dataset from a JSON file.

Parameters:

Name Type Description Default
path str | PathLike[str]

Path to the JSON file.

required
Source code in src/flowcean/polars/environments/dataframe.py
69
70
71
72
73
74
75
76
77
@classmethod
def from_json(cls, path: str | PathLike[str]) -> Self:
    """Load a dataset from a JSON file.

    Args:
        path: Path to the JSON file.
    """
    data = pl.read_json(Path(path))
    return cls(data)

from_parquet(path) classmethod

Load a dataset from a Parquet file.

Parameters:

Name Type Description Default
path str | PathLike[str]

Path to the Parquet file.

required
Source code in src/flowcean/polars/environments/dataframe.py
79
80
81
82
83
84
85
86
87
@classmethod
def from_parquet(cls, path: str | PathLike[str]) -> Self:
    """Load a dataset from a Parquet file.

    Args:
        path: Path to the Parquet file.
    """
    data = pl.scan_parquet(Path(path))
    return cls(data)

from_yaml(path) classmethod

Load a dataset from a YAML file.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML file.

required
Source code in src/flowcean/polars/environments/dataframe.py
89
90
91
92
93
94
95
96
97
@classmethod
def from_yaml(cls, path: str | Path) -> Self:
    """Load a dataset from a YAML file.

    Args:
        path: Path to the YAML file.
    """
    data = pl.LazyFrame(YAML(typ="safe").load(path))
    return cls(data)

from_uri(uri) classmethod

Load a dataset from a URI.

Parameters:

Name Type Description Default
uri str

The URI to load the dataset from.

required
Source code in src/flowcean/polars/environments/dataframe.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@classmethod
def from_uri(cls, uri: str) -> Self:
    """Load a dataset from a URI.

    Args:
        uri: The URI to load the dataset from.
    """
    path = _file_uri_to_path(uri)
    suffix = path.suffix

    if suffix == ".csv":
        return cls.from_csv(path)
    if suffix == ".json":
        return cls.from_json(path)
    if suffix == ".parquet":
        return cls.from_parquet(path)
    if suffix in (".yaml", ".yml"):
        return cls.from_yaml(path)

    raise UnsupportedFileTypeError(suffix)

from_rosbag(path, topics, *, message_paths=None, cache=True, cache_path=None) classmethod

Load a dataset from a ROS2 Humble rosbag file.

The structure of the data is inferred from the message definitions. If a message definition is not found in the ROS2 Humble typestore, it is added from the provided paths. Once all the message definitions are added, the data is loaded from the rosbag file.

Parameters:

Name Type Description Default
path str | PathLike[str]

Path to the rosbag.

required
topics dict[str, list[str]]

Dictionary of topics to load (topic: [paths]).

required
message_paths Iterable[str | PathLike[str]] | None

List of paths to additional message definitions.

None
cache bool

Whether to cache the data to a Parquet file.

True
cache_path str | PathLike[str] | None

Path to the cache file. If None, defaults to the same directory as the rosbag file with a .parquet extension.

None
Source code in src/flowcean/polars/environments/dataframe.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@classmethod
def from_rosbag(
    cls,
    path: str | PathLike[str],
    topics: dict[str, list[str]],
    *,
    message_paths: Iterable[str | PathLike[str]] | None = None,
    cache: bool = True,
    cache_path: str | PathLike[str] | None = None,
) -> Self:
    """Load a dataset from a ROS2 Humble rosbag file.

    The structure of the data is inferred from the message definitions. If
    a message definition is not found in the ROS2 Humble typestore, it is
    added from the provided paths. Once all the message definitions are
    added, the data is loaded from the rosbag file.

    Args:
        path: Path to the rosbag.
        topics: Dictionary of topics to load (`topic: [paths]`).
        message_paths: List of paths to additional message definitions.
        cache: Whether to cache the data to a Parquet file.
        cache_path: Path to the cache file. If None, defaults to the same
            directory as the rosbag file with a .parquet extension.
    """
    from flowcean.ros import load_rosbag

    return cls(
        load_rosbag(
            path,
            topics,
            message_paths=message_paths,
            cache=cache,
            cache_path=cache_path,
        ),
    )

__len__()

Return the number of samples in the dataset.

Source code in src/flowcean/polars/environments/dataframe.py
161
162
163
164
165
166
167
168
169
def __len__(self) -> int:
    """Return the number of samples in the dataset."""
    if self._length is None:
        # This operation is potentially very slow / costly
        self._length = cast(
            "int",
            self.data.select(pl.len()).collect().item(),
        )
    return self._length

InvalidUriSchemeError(scheme)

Bases: Exception

Exception raised when an URI scheme is invalid.

Initialize the InvalidUriSchemeError.

Parameters:

Name Type Description Default
scheme str

Invalid URI scheme.

required
Source code in src/flowcean/polars/environments/dataframe.py
182
183
184
185
186
187
188
189
190
def __init__(self, scheme: str) -> None:
    """Initialize the InvalidUriSchemeError.

    Args:
        scheme: Invalid URI scheme.
    """
    super().__init__(
        f"only file URIs can be converted to a path, but got `{scheme}`",
    )

UnsupportedFileTypeError(suffix)

Bases: Exception

Exception raised when a file type is not supported.

Initialize the UnsupportedFileTypeError.

Parameters:

Name Type Description Default
suffix str

File type suffix.

required
Source code in src/flowcean/polars/environments/dataframe.py
196
197
198
199
200
201
202
def __init__(self, suffix: str) -> None:
    """Initialize the UnsupportedFileTypeError.

    Args:
        suffix: File type suffix.
    """
    super().__init__(f"file type `{suffix}` is not supported")

collect(environment, n=None, *, progress_bar=True)

Collect data from an environment.

Parameters:

Name Type Description Default
environment Iterable[LazyFrame] | Collection[LazyFrame]

The environment to collect data from.

required
n int | None

Number of samples to collect. If None, all samples are collected.

None
progress_bar bool | dict[str, Any]

Whether to show a progress bar. If a dictionary is provided, it will be passed to the progress bar.

True

Returns:

Type Description
DataFrame

The collected dataset.

Source code in src/flowcean/polars/environments/dataframe.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def collect(
    environment: Iterable[pl.LazyFrame] | Collection[pl.LazyFrame],
    n: int | None = None,
    *,
    progress_bar: bool | dict[str, Any] = True,
) -> DataFrame:
    """Collect data from an environment.

    Args:
        environment: The environment to collect data from.
        n: Number of samples to collect. If None, all samples are collected.
        progress_bar: Whether to show a progress bar. If a dictionary is
            provided, it will be passed to the progress bar.

    Returns:
        The collected dataset.
    """
    samples = islice(environment, n)

    if n is not None:
        total = n
    elif isinstance(environment, Collection):
        total = len(environment)
    else:
        total = None

    if isinstance(progress_bar, dict):
        progress_bar.setdefault("desc", "Collecting samples")
        progress_bar.setdefault("total", total)
        samples = tqdm(
            samples,
            **progress_bar,
        )
    elif progress_bar:
        samples = tqdm(samples, desc="Collecting samples", total=total)

    data = pl.concat(samples, how="vertical")
    return DataFrame(data)