Skip to content

ros

RosbagError

Bases: Exception

Dataframe conversion error.

load_rosbag(path, topics, *, message_paths=None, cache=True, cache_path=None)

Load a ROS2 Humble rosbag file and convert it to a Polars LazyFrame.

The structure of the data is inferred from the message definitions. If a message definition is not found in the ROS2 Humble typestore, it is added from the provided paths. Once all the message definitions are added, the data is loaded from the rosbag file.

Parameters:

Name Type Description Default
path str | PathLike

Path to the rosbag.

required
topics dict[str, list[str]]

Dictionary of topics to load (topic: [paths]).

required
message_paths Iterable[str | PathLike] | None

List of paths to additional message definitions.

None
cache bool

Whether to cache the data to a Parquet file.

True
cache_path str | PathLike | None

Path to the cache file. If None, defaults to the same directory as the rosbag file with a .parquet extension.

None
Source code in src/flowcean/ros/rosbag.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def load_rosbag(
    path: str | PathLike,
    topics: dict[str, list[str]],
    *,
    message_paths: Iterable[str | PathLike] | None = None,
    cache: bool = True,
    cache_path: str | PathLike | None = None,
) -> pl.LazyFrame:
    """Load a ROS2 Humble rosbag file and convert it to a Polars LazyFrame.

    The structure of the data is inferred from the message definitions. If
    a message definition is not found in the ROS2 Humble typestore, it is
    added from the provided paths. Once all the message definitions are
    added, the data is loaded from the rosbag file.

    Args:
        path: Path to the rosbag.
        topics: Dictionary of topics to load (`topic: [paths]`).
        message_paths: List of paths to additional message definitions.
        cache: Whether to cache the data to a Parquet file.
        cache_path: Path to the cache file. If None, defaults to the same
            directory as the rosbag file with a .parquet extension.
    """
    path = Path(path)
    cache_path = (
        Path(cache_path)
        if cache_path is not None
        else path.with_suffix(".parquet")
    )

    if cache and cache_path.exists():
        logger.info("Loading data from cache...")
        return pl.scan_parquet(cache_path)

    typestore = get_typestore(Stores.ROS2_HUMBLE)
    if message_paths is not None:
        additional_types = _collect_type_definitions(message_paths)
        typestore.register(additional_types)

    with AnyReader(
        [path],
        default_typestore=typestore,
    ) as reader:
        data = pl.concat(
            _generate_features(reader, topics),
            how="horizontal",
        )

    if cache:
        logger.info("Caching ROS data to Parquet file...")
        data.sink_parquet(Path(cache_path))

    return data