Skip to content

rosbag

Loading data from rosbag topics.

RosbagLoader(path, topics)

Bases: Dataset

Environment to load data from a rosbag file.

The RosbagEnvironment is used to load data from a rosbag file. The environment is initialized with the path to the rosbag file and a dictionary of topics to load.

Example
from flowcean.environments.rosbag import RosbagLoader

environment = RosbagLoader(
    path="example_rosbag",
    topics={
        "/j100_0000/amcl_pose": [
            "pose.pose.position.x",
            "pose.pose.position.y",
        ],
        "/j100_0000/odometry": [
            "pose.pose.position.x",
            "pose.pose.position.y",
        ],
    },
)
environment.load()
data = environment.get_data()
print(data)

Initialize the RosbagEnvironment.

Parameters:

Name Type Description Default
path str | Path

Path to the rosbag.

required
topics dict[str, list[str]]

Dictionary of topics to load (topic: [keys]).

required
Source code in src/flowcean/environments/rosbag.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    path: str | Path,
    topics: dict[str, list[str]],
) -> None:
    """Initialize the RosbagEnvironment.

    Args:
        path: Path to the rosbag.
        topics: Dictionary of topics to load (`topic: [keys]`).
    """
    with AnyReader([Path(path)]) as reader:
        features = [
            read_timeseries(reader, topic, keys)
            for topic, keys in topics.items()
        ]
    super().__init__(pl.concat(features, how="horizontal"))

read_timeseries(reader, topic, keys)

Read a timeseries from a rosbag topic.

Parameters:

Name Type Description Default
reader AnyReader

Rosbag reader.

required
topic str

Topic name.

required
keys Sequence[str]

Keys to read from the topic.

required

Returns:

Type Description
DataFrame

Timeseries DataFrame.

Source code in src/flowcean/environments/rosbag.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def read_timeseries(
    reader: AnyReader,
    topic: str,
    keys: Sequence[str],
) -> pl.DataFrame:
    """Read a timeseries from a rosbag topic.

    Args:
        reader: Rosbag reader.
        topic: Topic name.
        keys: Keys to read from the topic.

    Returns:
        Timeseries DataFrame.
    """
    data = pl.from_pandas(
        get_dataframe(reader, topic, keys).reset_index(names="time"),
    )
    nest_into_timeseries = pl.struct(
        [
            pl.col("time"),
            pl.struct(pl.exclude("time")).alias("value"),
        ]
    )
    return data.select(nest_into_timeseries.implode().alias(topic))