Skip to content

io

Serialization helpers for hybrid system traces.

traces_to_polars(traces, *, state_names=None, derivative_names=None, input_names=None)

Convert traces into per-trace Polars DataFrames.

Parameters:

Name Type Description Default
traces Sequence[Trace]

Sequence of traces to convert.

required
state_names Sequence[str] | None

Optional names for state dimensions.

None
derivative_names Sequence[str] | None

Optional names for derivative dimensions.

None
input_names Sequence[str] | None

Optional names for input dimensions.

None

Returns:

Type Description
list[DataFrame]

List of per-trace DataFrames in trace order.

Source code in src/flowcean/ode/io.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def traces_to_polars(
    traces: Sequence[Trace],
    *,
    state_names: Sequence[str] | None = None,
    derivative_names: Sequence[str] | None = None,
    input_names: Sequence[str] | None = None,
) -> list[pl.DataFrame]:
    """Convert traces into per-trace Polars DataFrames.

    Args:
        traces: Sequence of traces to convert.
        state_names: Optional names for state dimensions.
        derivative_names: Optional names for derivative dimensions.
        input_names: Optional names for input dimensions.

    Returns:
        List of per-trace DataFrames in trace order.
    """
    trace_frames: list[pl.DataFrame] = []
    for trace in traces:
        inputs = _validated_inputs(trace)
        derivatives = _validated_derivatives(trace)
        state_columns = _column_names(
            "x",
            trace.x.shape[1],
            state_names,
            arg_name="state_names",
        )
        derivative_columns: list[str] = []
        if derivatives is not None:
            derivative_columns = _column_names(
                "dx",
                derivatives.shape[1],
                derivative_names,
                arg_name="derivative_names",
            )
            if derivative_names is None and state_names is not None:
                derivative_columns = [f"dx_{name}" for name in state_columns]
        input_dimension = 0 if inputs is None else inputs.shape[1]
        input_columns = _column_names(
            "u",
            input_dimension,
            input_names,
            arg_name="input_names",
        )
        rows: list[dict[str, object]] = []
        for idx, (time, state, location) in enumerate(
            zip(trace.t, trace.x, trace.location, strict=False),
        ):
            row: dict[str, object] = {
                "step": idx,
                "t": float(time),
                "location": str(location),
            }
            for dim, column in enumerate(state_columns):
                row[column] = float(state[dim])
            for dim, column in enumerate(derivative_columns):
                if derivatives is None:
                    message = "Trace does not contain captured derivatives."
                    raise ValueError(message)
                row[column] = float(derivatives[idx, dim])
            for dim, column in enumerate(input_columns):
                if inputs is None:
                    message = "Trace does not contain captured inputs."
                    raise ValueError(message)
                row[column] = float(inputs[idx, dim])
            rows.append(row)
        trace_frames.append(pl.DataFrame(rows))

    return trace_frames

save_traces_parquet(traces, path, *, trace_metadata=None)

Write traces to a directory with one Parquet file per trace.

Source code in src/flowcean/ode/io.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def save_traces_parquet(
    traces: Sequence[Trace],
    path: str,
    *,
    trace_metadata: Sequence[Mapping[str, object] | None] | None = None,
) -> None:
    """Write traces to a directory with one Parquet file per trace."""
    output_dir = Path(path)
    output_dir.mkdir(parents=True, exist_ok=True)
    metadata_per_trace = _prepare_trace_metadata(trace_metadata, len(traces))
    trace_frames = traces_to_polars(traces)
    for trace_id, (trace_df, metadata) in enumerate(
        zip(trace_frames, metadata_per_trace, strict=True),
    ):
        trace_df.write_parquet(output_dir / f"trace_{trace_id}.parquet")
        _write_metadata_file(output_dir, trace_id, metadata)

save_traces_csv(traces, path, *, trace_metadata=None)

Write traces to a directory with one CSV file per trace.

Source code in src/flowcean/ode/io.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def save_traces_csv(
    traces: Sequence[Trace],
    path: str,
    *,
    trace_metadata: Sequence[Mapping[str, object] | None] | None = None,
) -> None:
    """Write traces to a directory with one CSV file per trace."""
    output_dir = Path(path)
    output_dir.mkdir(parents=True, exist_ok=True)
    metadata_per_trace = _prepare_trace_metadata(trace_metadata, len(traces))
    trace_frames = traces_to_polars(traces)
    for trace_id, (trace_df, metadata) in enumerate(
        zip(trace_frames, metadata_per_trace, strict=True),
    ):
        trace_df.write_csv(output_dir / f"trace_{trace_id}.csv")
        _write_metadata_file(output_dir, trace_id, metadata)

trace_to_polars(trace, *, state_names=None, derivative_names=None, input_names=None)

Convert a single trace into a Polars DataFrame.

Source code in src/flowcean/ode/io.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def trace_to_polars(
    trace: Trace,
    *,
    state_names: Sequence[str] | None = None,
    derivative_names: Sequence[str] | None = None,
    input_names: Sequence[str] | None = None,
) -> pl.DataFrame:
    """Convert a single trace into a Polars DataFrame."""
    trace_frames = traces_to_polars(
        [trace],
        state_names=state_names,
        derivative_names=derivative_names,
        input_names=input_names,
    )
    return trace_frames[0]