Skip to content

ros

RosbagError

Bases: Exception

Dataframe conversion error.

RosbagLoader(path, topics, *, message_paths=None, create_cache=True, load_from_cache=True, cache_path=None)

Bases: DataFrame

Environment to load data from a rosbag file.

The RosbagEnvironment is used to load data from a rosbag file. The environment is initialized with the path to the rosbag file and a dictionary of topics to load.

Example
from flowcean.environments.rosbag import RosbagLoader

environment = RosbagLoader(
    path="example_rosbag",
    topics={
        "/amcl_pose": [
            "pose.pose.position.x",
            "pose.pose.position.y",
        ],
        "/odometry": [
            "pose.pose.position.x",
            "pose.pose.position.y",
        ],
    },
)
environment.load()
data = environment.get_data()
print(data)

Initialize the RosbagEnvironment.

The structure of the data is inferred from the message definitions. If a message definition is not found in the ROS2 Humble typestore, it is added from the provided paths. Once all the message definitions are added, the data is loaded from the rosbag file.

Parameters:

Name Type Description Default
path PathLike

Path to the rosbag.

required
topics dict[str, list[str]]

Dictionary of topics to load (topic: [paths]).

required
message_paths Iterable[PathLike] | None

List of paths to additional message definitions.

None
create_cache bool

If True, cache the data to a Parquet file.

True
load_from_cache bool

If True, load data from the cache if it exists.

True
cache_path PathLike | None

Path to the cache file. If None, defaults to the same directory as the rosbag file with a .parquet extension.

None
Source code in src/flowcean/ros/rosbag.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(
    self,
    path: PathLike,
    topics: dict[str, list[str]],
    *,
    message_paths: Iterable[PathLike] | None = None,
    create_cache: bool = True,
    load_from_cache: bool = True,
    cache_path: PathLike | None = None,
) -> None:
    """Initialize the RosbagEnvironment.

    The structure of the data is inferred from the message definitions. If
    a message definition is not found in the ROS2 Humble typestore, it is
    added from the provided paths. Once all the message definitions are
    added, the data is loaded from the rosbag file.

    Args:
        path: Path to the rosbag.
        topics: Dictionary of topics to load (`topic: [paths]`).
        message_paths: List of paths to additional message definitions.
        create_cache: If True, cache the data to a Parquet file.
        load_from_cache: If True, load data from the cache if it exists.
        cache_path: Path to the cache file. If None, defaults to the same
            directory as the rosbag file with a .parquet extension.
    """
    path = Path(path)
    cache_path = (
        Path(cache_path)
        if cache_path is not None
        else path.with_suffix(".parquet")
    )

    if load_from_cache and cache_path.exists():
        logger.info("Loading data from cache...")
        super().__init__(pl.scan_parquet(cache_path))
        return

    typestore = get_typestore(Stores.ROS2_HUMBLE)
    if message_paths is not None:
        additional_types = _collect_type_definitions(message_paths)
        typestore.register(additional_types)

    with AnyReader(
        [path],
        default_typestore=typestore,
    ) as reader:
        data = pl.concat(
            _generate_features(reader, topics),
            how="horizontal",
        )

    if create_cache:
        logger.info("Caching ROS data to Parquet file...")
        data.sink_parquet(Path(cache_path))

    super().__init__(data)