Skip to content

rosbag

DataframeError

Bases: Exception

Dataframe conversion error.

RosbagLoader(path, topics, msgpaths)

Bases: Dataset

Environment to load data from a rosbag file.

The RosbagEnvironment is used to load data from a rosbag file. The environment is initialized with the path to the rosbag file and a dictionary of topics to load.

Example
from flowcean.environments.rosbag import RosbagLoader

environment = RosbagLoader(
    path="example_rosbag",
    topics={
        "/amcl_pose": [
            "pose.pose.position.x",
            "pose.pose.position.y",
        ],
        "/odometry": [
            "pose.pose.position.x",
            "pose.pose.position.y",
        ],
    },
)
environment.load()
data = environment.get_data()
print(data)

Initialize the RosbagEnvironment.

The structure of the data is inferred from the message definitions. If a message definition is not found in the ROS2 Humble typestore, it is added from the provided paths. Once all the message definitions are added, the data is loaded from the rosbag file.

Parameters:

Name Type Description Default
path str | Path

Path to the rosbag.

required
topics dict[str, list[str]]

Dictionary of topics to load (topic: [keys]).

required
msgpaths list[str]

List of paths to additional message definitions.

required
Source code in src/flowcean/environments/rosbag.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def __init__(
    self,
    path: str | Path,
    topics: dict[str, list[str]],
    msgpaths: list[str],
) -> None:
    """Initialize the RosbagEnvironment.

    The structure of the data is inferred from the message definitions.
    If a message definition is not found in the ROS2 Humble typestore,
    it is added from the provided paths. Once all
    the message definitions are added, the data is loaded from the
    rosbag file.

    Args:
        path: Path to the rosbag.
        topics: Dictionary of topics to load (`topic: [keys]`).
        msgpaths: List of paths to additional message definitions.
    """
    if msgpaths is None:
        msgpaths = []
    self.path = Path(path)
    self.topics = topics
    self.typestore = get_typestore(Stores.ROS2_HUMBLE)
    add_types = {}
    for pathstr in msgpaths:
        msgpath = Path(pathstr)
        msgdef = msgpath.read_text(encoding="utf-8")
        add_types.update(
            get_types_from_msg(msgdef, self.guess_msgtype(msgpath)),
        )
        debug_msg = f"Added message type: {self.guess_msgtype(msgpath)}"
        logger.debug(debug_msg)
    self.typestore.register(add_types)

    with AnyRosbagReader(
        [self.path],
        default_typestore=self.typestore,
    ) as reader:
        features = [
            self.get_dataframe(reader, topic, keys)
            for topic, keys in self.topics.items()
        ]
        super().__init__(pl.concat(features, how="horizontal"))

guess_msgtype(path)

Guess message type name from path.

Parameters:

Name Type Description Default
path Path

Path to the message file.

required

Returns:

Type Description
str

The message definition string.

Source code in src/flowcean/environments/rosbag.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def guess_msgtype(self, path: Path) -> str:
    """Guess message type name from path.

    Args:
        path: Path to the message file.

    Returns:
        The message definition string.
    """
    name = path.relative_to(path.parents[2]).with_suffix("")
    if "msg" not in name.parts:
        name = name.parent / "msg" / name.name
    return str(name)

get_dataframe(reader, topicname, keys)

Convert messages from a topic into a polars dataframe.

Read all messages from a topic and extract referenced keys into a polars dataframe. The timestamps of messages are automatically added as the dataframe index.

Keys support a dotted syntax to traverse nested messages. Here is an example of a nested ROS message structure: /amcl_pose (geometry_msgs/PoseWithCovarianceStamped) ├── pose (PoseWithCovariance) │ ├── pose (Pose) │ │ ├── position (Point) │ │ │ ├── x (float) │ │ │ ├── y (float) │ │ │ └── z (float) │ │ └── orientation (Quaternion) │ └── covariance (array[36])

The first key is 'pose.pose.position.x'. The subkeys are separated by dots. So, in this case, the subkeys are ['pose', 'pose', 'position', 'x']. Each subkey is used to traverse the nested message structure. If a subkey matches a field name, the next subkey is used to traverse deeper into the nested structure.

Parameters:

Name Type Description Default
reader AnyReader

Opened rosbags reader.

required
topicname str

Topic name of messages to process.

required
keys Sequence[str]

Field names to get from each message.

required

Raises:

Type Description
DataframeError

Reader not opened or topic or field does not exist.

Returns:

Type Description
DataFrame

Polars dataframe.

Source code in src/flowcean/environments/rosbag.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def get_dataframe(
    self,
    reader: AnyRosbagReader,
    topicname: str,
    keys: Sequence[str],
) -> pl.DataFrame:
    """Convert messages from a topic into a polars dataframe.

    Read all messages from a topic and extract referenced keys into
    a polars dataframe. The timestamps of messages are automatically added
    as the dataframe index.

    Keys support a dotted syntax to traverse nested messages. Here is an
    example of a nested ROS message structure:
    /amcl_pose (geometry_msgs/PoseWithCovarianceStamped)
    ├── pose (PoseWithCovariance)
    │   ├── pose (Pose)
    │   │   ├── position (Point)
    │   │   │   ├── x (float)
    │   │   │   ├── y (float)
    │   │   │   └── z (float)
    │   │   └── orientation (Quaternion)
    │   └── covariance (array[36])

    The first key is 'pose.pose.position.x'. The subkeys are
    separated by dots. So, in this case, the subkeys are
    ['pose', 'pose', 'position', 'x']. Each subkey is used to
    traverse the nested message structure. If a subkey matches
    a field name, the next subkey is used to traverse deeper
    into the nested structure.

    Args:
        reader: Opened rosbags reader.
        topicname: Topic name of messages to process.
        keys: Field names to get from each message.

    Raises:
        DataframeError: Reader not opened or topic or field does not exist.

    Returns:
        Polars dataframe.

    """
    self.verify_topics(reader, topicname)
    topic = reader.topics[topicname]
    msgdef = reader.typestore.get_msgdef(str(topic.msgtype))

    getters = []
    # Iterate through each key provided by the user
    # e.g., "pose.pose.position.x")
    for key in keys:
        # Split the key into subkeys at dots
        # (e.g., ["pose", "pose", "position", "x"])
        subkeys = key.split(".")

        # Start with the top-level message definition
        subdef = msgdef

        # Process all subkeys except the last one
        # (e.g., ["pose", "pose", "position"])
        for subkey in subkeys[:-1]:
            # Find the field in the current message definition that matches
            # the subkey. x[0] is the field name, returns None if not found
            subfield = next(
                (x for x in subdef.fields if x[0] == subkey),
                None,
            )

            # Get the message definition for this subfield to continue
            # traversing e.g., get definition of a 'pose' message
            subdef = self.get_subdef(reader, subdef, subkey, subfield)

        # Verify the final subkey exists in the last message definition
        # e.g. check that 'x' exists in the 'position' message
        if subkeys[-1] not in {x[0] for x in subdef.fields}:
            msg = (
                f"Field {subkeys[-1]!r} does not exist on {subdef.name!r}."
            )
            raise DataframeError(msg)
        # Create a getter function to extract the value from the message
        getters.append(self.create_getter(subkeys))

    timestamps = []
    data = []
    for _, timestamp, rawdata in reader.messages(
        connections=topic.connections,
    ):
        dmsg = reader.deserialize(rawdata, str(topic.msgtype))
        timestamps.append(timestamp)
        row = []

        for x in getters:
            value = x(dmsg)
            if isinstance(value, list):
                # Convert list items to dicts but keep them in the row
                row.append([self.ros_msg_to_dict(i) for i in value])
            elif hasattr(value, "__dict__"):
                row.append(self.ros_msg_to_dict(value))
            else:
                row.append(value)
        data.append(row)

    # Handle any numpy arrays
    data = [
        [x.tolist() if isinstance(x, np.ndarray) else x for x in row]
        for row in data
    ]
    df = pl.DataFrame(data, schema=tuple(keys), orient="row")
    time = pl.Series("time", timestamps)
    nest_into_timeseries = pl.struct(
        [
            pl.col("time"),
            pl.struct(pl.exclude("time")).alias("value"),
        ],
    )
    return df.with_columns(time).select(
        nest_into_timeseries.implode().alias(topicname),
    )

create_getter(keys)

Create getter for nested lookups.

Source code in src/flowcean/environments/rosbag.py
266
267
268
269
270
271
272
273
274
275
def create_getter(self, keys: list[str]) -> Callable[[object], AttrValue]:
    """Create getter for nested lookups."""

    def getter(msg: object) -> AttrValue:
        value = msg
        for key in keys:
            value = getattr(value, key)
        return value

    return getter

ros_msg_to_dict(obj)

Recursively convert a ROS message object into a dictionary.

Parameters:

Name Type Description Default
obj dict

A ROS message object represented as a dictionary where keys are field names and values are their corresponding data.

required

Returns:

Name Type Description
dict dict

A dictionary representation of the ROS message, with all nested fields converted to dictionaries.

Source code in src/flowcean/environments/rosbag.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def ros_msg_to_dict(self, obj: dict) -> dict:
    """Recursively convert a ROS message object into a dictionary.

    Args:
        obj (dict): A ROS message object represented as a dictionary where
            keys are field names and values are their corresponding data.

    Returns:
        dict: A dictionary representation of the ROS message, with all
            nested fields converted to dictionaries.
    """
    if hasattr(obj, "__dict__"):  # Check if the object has attributes
        result = {}
        for key, value in obj.__dict__.items():
            result[key] = self.ros_msg_to_dict(value)
        return result
    return obj  # Return the base value if it's not an object