Skip to content

ode

ContinuousDynamics(flow, label=None) dataclass

Reusable continuous dynamics definition.

Parameters:

Name Type Description Default
flow Callable[..., Derivative]

Dynamics function returning the state derivative. Scalar derivative returns are accepted only for single-state systems, both during solver evaluation and when derivatives are captured on the returned trace grid.

required
label str | None

Optional display label.

None

CrossingDirection

Bases: IntEnum

Direction in which an event surface must cross zero.

Event(time, source_location, target_location, event_surface, reset, state) dataclass

Transition event information for a trace.

EventSurface(fn, direction=CrossingDirection.EITHER, label=None) dataclass

Scalar event surface defining a simulated transition event.

Flowcean transitions fire when fn reaches zero in direction. This is event-surface semantics, not Boolean guard-region semantics.

Parameters:

Name Type Description Default
fn Callable[..., float]

Root function; transitions when it crosses zero.

required
direction CrossingDirection

Crossing direction. Defaults to either direction.

EITHER
label str | None

Optional display label.

None

EventSurfaceFunction

Bases: Protocol

Scalar event-surface callback.

FlowFunction

Bases: Protocol

Continuous dynamics callback.

HybridSystem(locations, transitions, initial_location, initial_state, parameters=dict()) dataclass

Hybrid system with locations and transitions.

Parameters:

Name Type Description Default
locations Sequence[Location]

Location objects in this system.

required
transitions Sequence[Transition]

Transition list defining event surfaces and resets.

required
initial_location Location

Starting location object.

required
initial_state State

Initial state vector.

required
parameters Parameters

Global parameter map passed to callbacks.

dict()

transitions_from(location)

Return transitions leaving the given location.

Source code in src/flowcean/ode/hybrid_system.py
292
293
294
295
296
297
298
def transitions_from(self, location: Location) -> list[Transition]:
    """Return transitions leaving the given location."""
    return [
        transition
        for transition in self.transitions
        if transition.source is location
    ]

Location(dynamics, *, label=None, parameters=None) dataclass

Location(
    dynamics: ContinuousDynamics,
    *,
    label: str | None = None,
    parameters: Parameters | None = None,
)
Location(
    dynamics: Callable[..., Derivative],
    *,
    label: str | None = None,
    parameters: Parameters | None = None,
)

Discrete hybrid-automaton location.

Parameters:

Name Type Description Default
dynamics ContinuousDynamics | Callable[..., Derivative]

Continuous dynamics or bare flow callback active here.

required
label str | None

Optional display label.

None
parameters Parameters | None

Location-local parameter map.

None
Source code in src/flowcean/ode/hybrid_system.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def __init__(
    self,
    dynamics: ContinuousDynamics | Callable[..., Derivative],
    *,
    label: str | None = None,
    parameters: Parameters | None = None,
) -> None:
    if isinstance(dynamics, ContinuousDynamics):
        continuous_dynamics = dynamics
    elif callable(dynamics):
        continuous_dynamics = ContinuousDynamics(dynamics)
    else:
        message = (
            "Location requires ContinuousDynamics or a flow callback."
        )
        raise TypeError(message)
    if parameters is not None and not isinstance(parameters, Mapping):
        message = "parameters must be a mapping."
        raise TypeError(message)
    object.__setattr__(self, "dynamics", continuous_dynamics)
    object.__setattr__(self, "label", label)
    object.__setattr__(self, "parameters", dict(parameters or {}))

Reset(fn, label=None) dataclass

State reset applied on a transition.

Parameters:

Name Type Description Default
fn Callable[..., State]

Reset function applied at the event time.

required
label str | None

Optional display label.

None

ResetFunction

Bases: Protocol

State reset callback.

Trace(t, x, location, events, u=None, dx=None) dataclass

Simulation trace with time, state, and location labels.

as_dict()

Return a dictionary view of the trace.

Source code in src/flowcean/ode/hybrid_system.py
365
366
367
368
369
370
371
372
373
374
def as_dict(self) -> dict[str, object]:
    """Return a dictionary view of the trace."""
    return {
        "t": self.t,
        "x": self.x,
        "location": self.location,
        "events": self.events,
        "u": self.u,
        "dx": self.dx,
    }

Transition(source, target, event, reset=None) dataclass

Discrete event-triggered transition between locations.

event is a scalar zero-crossing surface.

Parameters:

Name Type Description Default
source Location

Source location.

required
target Location

Target location.

required
event EventSurface | Callable[..., float]

Event surface that triggers the transition.

required
reset Reset | Callable[..., State] | None

Optional reset applied upon transition.

None
Source code in src/flowcean/ode/hybrid_system.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def __init__(
    self,
    source: Location,
    target: Location,
    event: EventSurface | Callable[..., float],
    reset: Reset | Callable[..., State] | None = None,
) -> None:
    if not isinstance(source, Location):
        message = "source must be a Location."
        raise TypeError(message)
    if not isinstance(target, Location):
        message = "target must be a Location."
        raise TypeError(message)
    if isinstance(event, EventSurface):
        event_surface = event
    elif callable(event):
        event_surface = EventSurface(event)
    else:
        message = "event must be an EventSurface or callable."
        raise TypeError(message)
    if isinstance(reset, Reset) or reset is None:
        transition_reset = reset
    elif callable(reset):
        transition_reset = Reset(reset)
    else:
        message = "reset must be a Reset, callable, or None."
        raise TypeError(message)
    object.__setattr__(self, "source", source)
    object.__setattr__(self, "target", target)
    object.__setattr__(self, "event", event_surface)
    object.__setattr__(self, "reset", transition_reset)

IntegrationError()

Bases: Exception

Error while integrating an ODE.

This exception is raised when an error occurs while integrating an ordinary differential equation.

Initialize the exception.

Source code in src/flowcean/ode/ode_environment.py
20
21
22
def __init__(self) -> None:
    """Initialize the exception."""
    super().__init__("failed to integrate ODE")

OdeEnvironment(system, *, dt=1.0, map_to_dataframe)

Bases: IncrementalEnvironment

Environment governed by an ordinary differential equation.

This environment integrates an OdeSystem to generate a sequence of states.

Initialize the environment.

Parameters:

Name Type Description Default
system OdeSystem[X]

ODE system.

required
dt float

Time step.

1.0
map_to_dataframe Callable[[Sequence[float], Sequence[X]], DataFrame]

Function to map states to a DataFrame.

required
Source code in src/flowcean/ode/ode_environment.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def __init__(
    self,
    system: OdeSystem[X],
    *,
    dt: float = 1.0,
    map_to_dataframe: Callable[
        [Sequence[float], Sequence[X]],
        pl.DataFrame,
    ],
) -> None:
    """Initialize the environment.

    Args:
        system: ODE system.
        dt: Time step.
        map_to_dataframe: Function to map states to a DataFrame.
    """
    super().__init__()
    self.system = system
    self.dt = dt
    self.map_to_dataframe = map_to_dataframe
    self.ts = [self.system.t]
    self.states = [self.system.state]

OdeState

Bases: ABC

State of a differential equation.

This class represents the state of a differential equation. It provides methods to convert the state to and from a numpy array for integration.

as_numpy() abstractmethod

Convert the state to a numpy array.

Returns:

Type Description
NDArray[float64]

State as a numpy array.

Source code in src/flowcean/ode/ode_environment.py
32
33
34
35
36
37
38
@abstractmethod
def as_numpy(self) -> NDArray[np.float64]:
    """Convert the state to a numpy array.

    Returns:
        State as a numpy array.
    """

from_numpy(state) abstractmethod classmethod

Create a state from a numpy array.

Parameters:

Name Type Description Default
state NDArray[float64]

State as a numpy array.

required

Returns:

Type Description
Self

State instance.

Source code in src/flowcean/ode/ode_environment.py
40
41
42
43
44
45
46
47
48
49
50
@classmethod
@abstractmethod
def from_numpy(cls, state: NDArray[np.float64]) -> Self:
    """Create a state from a numpy array.

    Args:
        state: State as a numpy array.

    Returns:
        State instance.
    """

OdeSystem(t, state)

Bases: ABC

System governed by an ordinary differential equation.

This class represents a continuous system. The system is defined by a differential flow function \(f\) that governs the evolution of the state \(x\).

\[ \begin{aligned} \dot{x} &= f(t, x) \\ \end{aligned} \]

The system can be integrated to obtain the state at a future time.

Attributes:

Name Type Description
t float

Current time.

state X

Current state.

Initialize the system.

Parameters:

Name Type Description Default
t float

Initial time.

required
state X

Initial state.

required
Source code in src/flowcean/ode/ode_environment.py
75
76
77
78
79
80
81
82
83
def __init__(self, t: float, state: X) -> None:
    """Initialize the system.

    Args:
        t: Initial time.
        state: Initial state.
    """
    self.t = t
    self.state = state

flow(t, state) abstractmethod

Ordinary differential equation.

Compute the derivative of the state \(x\) at time \(t\).

Parameters:

Name Type Description Default
t float

Time.

required
state NDArray[float64]

State.

required

Returns:

Type Description
NDArray[float64]

Derivative of the state.

Source code in src/flowcean/ode/ode_environment.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@abstractmethod
def flow(
    self,
    t: float,
    state: NDArray[np.float64],
) -> NDArray[np.float64]:
    """Ordinary differential equation.

    Compute the derivative of the state $x$ at time $t$.

    Args:
        t: Time.
        state: State.

    Returns:
        Derivative of the state.
    """

step(dt)

Step the environment forward in time.

Step the environment forward in time by integrating the differential equation for a time step of dt.

Parameters:

Name Type Description Default
dt float

Time step.

required

Returns:

Type Description
tuple[Sequence[float], Sequence[X]]

Tuple of times and states of the integration.

Source code in src/flowcean/ode/ode_environment.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def step(self, dt: float) -> tuple[Sequence[float], Sequence[X]]:
    """Step the environment forward in time.

    Step the environment forward in time by integrating the
    differential equation for a time step of dt.

    Args:
        dt: Time step.

    Returns:
        Tuple of times and states of the integration.
    """
    y0 = self.state.as_numpy()
    solution = solve_ivp(
        self.flow,
        t_span=[self.t, self.t + dt],
        y0=y0,
    )
    if not solution.success:
        raise IntegrationError

    ts = cast("Sequence[float]", solution.t[1:])
    states = [self.state.from_numpy(y) for y in solution.y.T[1:]]

    self.t = ts[-1]
    self.state = states[-1]

    return ts, states

save_traces_csv(traces, path, *, trace_metadata=None)

Write traces to a directory with one CSV file per trace.

Source code in src/flowcean/ode/io.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def save_traces_csv(
    traces: Sequence[Trace],
    path: str,
    *,
    trace_metadata: Sequence[Mapping[str, object] | None] | None = None,
) -> None:
    """Write traces to a directory with one CSV file per trace."""
    output_dir = Path(path)
    output_dir.mkdir(parents=True, exist_ok=True)
    metadata_per_trace = _prepare_trace_metadata(trace_metadata, len(traces))
    trace_frames = traces_to_polars(traces)
    for trace_id, (trace_df, metadata) in enumerate(
        zip(trace_frames, metadata_per_trace, strict=True),
    ):
        trace_df.write_csv(output_dir / f"trace_{trace_id}.csv")
        _write_metadata_file(output_dir, trace_id, metadata)

save_traces_parquet(traces, path, *, trace_metadata=None)

Write traces to a directory with one Parquet file per trace.

Source code in src/flowcean/ode/io.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def save_traces_parquet(
    traces: Sequence[Trace],
    path: str,
    *,
    trace_metadata: Sequence[Mapping[str, object] | None] | None = None,
) -> None:
    """Write traces to a directory with one Parquet file per trace."""
    output_dir = Path(path)
    output_dir.mkdir(parents=True, exist_ok=True)
    metadata_per_trace = _prepare_trace_metadata(trace_metadata, len(traces))
    trace_frames = traces_to_polars(traces)
    for trace_id, (trace_df, metadata) in enumerate(
        zip(trace_frames, metadata_per_trace, strict=True),
    ):
        trace_df.write_parquet(output_dir / f"trace_{trace_id}.parquet")
        _write_metadata_file(output_dir, trace_id, metadata)

trace_to_polars(trace, *, state_names=None, derivative_names=None, input_names=None)

Convert a single trace into a Polars DataFrame.

Source code in src/flowcean/ode/io.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def trace_to_polars(
    trace: Trace,
    *,
    state_names: Sequence[str] | None = None,
    derivative_names: Sequence[str] | None = None,
    input_names: Sequence[str] | None = None,
) -> pl.DataFrame:
    """Convert a single trace into a Polars DataFrame."""
    trace_frames = traces_to_polars(
        [trace],
        state_names=state_names,
        derivative_names=derivative_names,
        input_names=input_names,
    )
    return trace_frames[0]

traces_to_polars(traces, *, state_names=None, derivative_names=None, input_names=None)

Convert traces into per-trace Polars DataFrames.

Parameters:

Name Type Description Default
traces Sequence[Trace]

Sequence of traces to convert.

required
state_names Sequence[str] | None

Optional names for state dimensions.

None
derivative_names Sequence[str] | None

Optional names for derivative dimensions.

None
input_names Sequence[str] | None

Optional names for input dimensions.

None

Returns:

Type Description
list[DataFrame]

List of per-trace DataFrames in trace order.

Source code in src/flowcean/ode/io.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def traces_to_polars(
    traces: Sequence[Trace],
    *,
    state_names: Sequence[str] | None = None,
    derivative_names: Sequence[str] | None = None,
    input_names: Sequence[str] | None = None,
) -> list[pl.DataFrame]:
    """Convert traces into per-trace Polars DataFrames.

    Args:
        traces: Sequence of traces to convert.
        state_names: Optional names for state dimensions.
        derivative_names: Optional names for derivative dimensions.
        input_names: Optional names for input dimensions.

    Returns:
        List of per-trace DataFrames in trace order.
    """
    trace_frames: list[pl.DataFrame] = []
    for trace in traces:
        inputs = _validated_inputs(trace)
        derivatives = _validated_derivatives(trace)
        state_columns = _column_names(
            "x",
            trace.x.shape[1],
            state_names,
            arg_name="state_names",
        )
        derivative_columns: list[str] = []
        if derivatives is not None:
            derivative_columns = _column_names(
                "dx",
                derivatives.shape[1],
                derivative_names,
                arg_name="derivative_names",
            )
            if derivative_names is None and state_names is not None:
                derivative_columns = [f"dx_{name}" for name in state_columns]
        input_dimension = 0 if inputs is None else inputs.shape[1]
        input_columns = _column_names(
            "u",
            input_dimension,
            input_names,
            arg_name="input_names",
        )
        rows: list[dict[str, object]] = []
        for idx, (time, state, location) in enumerate(
            zip(trace.t, trace.x, trace.location, strict=False),
        ):
            row: dict[str, object] = {
                "step": idx,
                "t": float(time),
                "location": str(location),
            }
            for dim, column in enumerate(state_columns):
                row[column] = float(state[dim])
            for dim, column in enumerate(derivative_columns):
                if derivatives is None:
                    message = "Trace does not contain captured derivatives."
                    raise ValueError(message)
                row[column] = float(derivatives[idx, dim])
            for dim, column in enumerate(input_columns):
                if inputs is None:
                    message = "Trace does not contain captured inputs."
                    raise ValueError(message)
                row[column] = float(inputs[idx, dim])
            rows.append(row)
        trace_frames.append(pl.DataFrame(rows))

    return trace_frames

plot_phase(trace, x_dim=0, y_dim=1, *, location_colors=None, show_location_legend=True, show=False, ax=None)

Plot phase portrait segments colored by location.

Parameters:

Name Type Description Default
trace Trace

Trace to visualize.

required
x_dim int

State index on the x-axis.

0
y_dim int

State index on the y-axis.

1
location_colors Mapping[str, str] | None

Optional color mapping for locations.

None
show_location_legend bool

Whether to show location labels in legend.

True
show bool

Whether to call matplotlib show().

False
ax Axes | None

Optional axis to draw into.

None

Returns:

Type Description
Axes

Matplotlib axes containing the plot.

Source code in src/flowcean/ode/plotting.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def plot_phase(
    trace: Trace,
    x_dim: int = 0,
    y_dim: int = 1,
    *,
    location_colors: Mapping[str, str] | None = None,
    show_location_legend: bool = True,
    show: bool = False,
    ax: Axes | None = None,
) -> Axes:
    """Plot phase portrait segments colored by location.

    Args:
        trace: Trace to visualize.
        x_dim: State index on the x-axis.
        y_dim: State index on the y-axis.
        location_colors: Optional color mapping for locations.
        show_location_legend: Whether to show location labels in legend.
        show: Whether to call matplotlib show().
        ax: Optional axis to draw into.

    Returns:
        Matplotlib axes containing the plot.
    """
    if ax is None:
        _, ax = plt.subplots()
    if ax is None:
        message = "Failed to create matplotlib axes."
        raise RuntimeError(message)

    segments, locations = _location_segments(trace)
    colors = _location_color_map(locations, location_colors)

    for start, end, location in segments:
        ax.plot(
            trace.x[start:end, x_dim],
            trace.x[start:end, y_dim],
            color=colors[location],
            label=f"{location}",
        )
    ax.set_xlabel(f"x{x_dim}")
    ax.set_ylabel(f"x{y_dim}")

    if show_location_legend:
        _dedupe_legend(ax)

    if show:
        plt.show()

    return ax

plot_trace(trace, dims=None, *, location_colors=None, show_locations=True, show_location_labels=False, show_events=True, show_event_labels=True, show=False, ax=None)

Plot state trajectories with location shading and event markers.

Parameters:

Name Type Description Default
trace Trace

Trace to visualize.

required
dims Sequence[int] | None

State indices to plot.

None
location_colors Mapping[str, str] | None

Optional color mapping for locations.

None
show_locations bool

Whether to shade location regions.

True
show_location_labels bool

Whether to label locations above the trace.

False
show_events bool

Whether to show transition events.

True
show_event_labels bool

Whether to label transition events.

True
show bool

Whether to call matplotlib show().

False
ax Axes | None

Optional axis to draw into.

None

Returns:

Type Description
Axes

Matplotlib axes containing the plot.

Source code in src/flowcean/ode/plotting.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def plot_trace(
    trace: Trace,
    dims: Sequence[int] | None = None,
    *,
    location_colors: Mapping[str, str] | None = None,
    show_locations: bool = True,
    show_location_labels: bool = False,
    show_events: bool = True,
    show_event_labels: bool = True,
    show: bool = False,
    ax: Axes | None = None,
) -> Axes:
    """Plot state trajectories with location shading and event markers.

    Args:
        trace: Trace to visualize.
        dims: State indices to plot.
        location_colors: Optional color mapping for locations.
        show_locations: Whether to shade location regions.
        show_location_labels: Whether to label locations above the trace.
        show_events: Whether to show transition events.
        show_event_labels: Whether to label transition events.
        show: Whether to call matplotlib show().
        ax: Optional axis to draw into.

    Returns:
        Matplotlib axes containing the plot.
    """
    if dims is None:
        dims = list(range(trace.x.shape[1]))

    if ax is None:
        _, ax = plt.subplots()
    if ax is None:
        message = "Failed to create matplotlib axes."
        raise RuntimeError(message)

    for dim in dims:
        ax.plot(trace.t, trace.x[:, dim], label=f"x{dim}")

    if show_locations:
        _plot_location_spans(
            trace,
            ax=ax,
            location_colors=location_colors,
            show_labels=show_location_labels,
        )

    if show_events:
        _plot_events(trace, ax=ax, show_labels=show_event_labels)

    ax.set_xlabel("t")
    ax.set_ylabel("state")
    ax.legend(loc="best")

    if show:
        plt.show()

    return ax

generate_traces(system, t_span, initial_states, *, input_stream=None, capture_inputs=None, capture_derivatives=False, max_jumps=256, rtol=1e-07, atol=1e-09, max_step=None, dense_output=False, sample_times=None, sample_dt=None)

Simulate a batch of traces for a set of initial states.

The input stream and capture semantics match :func:simulate, including the requirement that capture_derivatives=True assumes pure flow callbacks under repeated evaluation on the returned trace grid. Scalar derivative returns are accepted only for single-state systems.

Source code in src/flowcean/ode/simulator.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def generate_traces(
    system: HybridSystem,
    t_span: tuple[float, float],
    initial_states: Iterable[Iterable[float]],
    *,
    input_stream: InputStream | None = None,
    capture_inputs: bool | None = None,
    capture_derivatives: bool = False,
    max_jumps: int = 256,
    rtol: float = 1e-7,
    atol: float = 1e-9,
    max_step: float | None = None,
    dense_output: bool = False,
    sample_times: Iterable[float] | None = None,
    sample_dt: float | None = None,
) -> list[Trace]:
    """Simulate a batch of traces for a set of initial states.

    The input stream and capture semantics match :func:`simulate`, including
    the requirement that ``capture_derivatives=True`` assumes pure flow
    callbacks under repeated evaluation on the returned trace grid. Scalar
    derivative returns are accepted only for single-state systems.
    """
    return [
        simulate(
            system,
            t_span,
            x0=state,
            input_stream=input_stream,
            capture_inputs=capture_inputs,
            capture_derivatives=capture_derivatives,
            max_jumps=max_jumps,
            rtol=rtol,
            atol=atol,
            max_step=max_step,
            dense_output=dense_output,
            sample_times=sample_times,
            sample_dt=sample_dt,
        )
        for state in initial_states
    ]

simulate(system, t_span, x0=None, location0=None, *, input_stream=None, capture_inputs=None, capture_derivatives=False, max_jumps=256, rtol=1e-07, atol=1e-09, max_step=None, dense_output=False, sample_times=None, sample_dt=None)

Simulate a hybrid system and return a trace.

Parameters:

Name Type Description Default
system HybridSystem

Hybrid system to simulate.

required
t_span tuple[float, float]

Start and end time for integration.

required
x0 Iterable[float] | None

Optional initial state override.

None
location0 Location | None

Optional initial location override.

None
input_stream InputStream | None

Optional input stream accessor for callbacks.

None
capture_inputs bool | None

Input capture mode. If None, capture iff an input stream is provided.

None
capture_derivatives bool

Whether to re-evaluate Location.dynamics.flow on the returned trace grid and store the sampled derivatives in Trace.dx. This assumes pure flow callbacks under repeated evaluation. Scalar derivative returns are accepted only for single-state systems.

False
max_jumps int

Maximum number of transitions allowed.

256
rtol float

Relative tolerance for the solver.

1e-07
atol float

Absolute tolerance for the solver.

1e-09
max_step float | None

Optional maximum step size.

None
dense_output bool

Whether to build a continuous solution per segment.

False
sample_times Iterable[float] | None

Monotone time grid to sample from the dense solution.

None
sample_dt float | None

Fixed sampling interval to generate a time grid.

None

Returns:

Name Type Description
Trace Trace

The simulation trace with location labels and events.

Source code in src/flowcean/ode/simulator.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def simulate(  # noqa: C901, PLR0912, PLR0915
    system: HybridSystem,
    t_span: tuple[float, float],
    x0: Iterable[float] | None = None,
    location0: Location | None = None,
    *,
    input_stream: InputStream | None = None,
    capture_inputs: bool | None = None,
    capture_derivatives: bool = False,
    max_jumps: int = 256,
    rtol: float = 1e-7,
    atol: float = 1e-9,
    max_step: float | None = None,
    dense_output: bool = False,
    sample_times: Iterable[float] | None = None,
    sample_dt: float | None = None,
) -> Trace:
    """Simulate a hybrid system and return a trace.

    Args:
        system: Hybrid system to simulate.
        t_span: Start and end time for integration.
        x0: Optional initial state override.
        location0: Optional initial location override.
        input_stream: Optional input stream accessor for callbacks.
        capture_inputs: Input capture mode. If ``None``, capture iff an
            input stream is provided.
        capture_derivatives: Whether to re-evaluate ``Location.dynamics.flow``
            on the returned trace grid and store the sampled derivatives in
            ``Trace.dx``. This assumes pure flow callbacks under repeated
            evaluation. Scalar derivative returns are accepted only for
            single-state systems.
        max_jumps: Maximum number of transitions allowed.
        rtol: Relative tolerance for the solver.
        atol: Absolute tolerance for the solver.
        max_step: Optional maximum step size.
        dense_output: Whether to build a continuous solution per segment.
        sample_times: Monotone time grid to sample from the dense solution.
        sample_dt: Fixed sampling interval to generate a time grid.

    Returns:
        Trace: The simulation trace with location labels and events.
    """
    start_location = (
        system.initial_location if location0 is None else location0
    )
    if not isinstance(start_location, Location):
        message = "location0 must be a Location."
        raise TypeError(message)
    location_ids = {id(location) for location in system.locations}
    if id(start_location) not in location_ids:
        message = "location0 must be included in system.locations."
        raise ValueError(message)

    state = ensure_state(x0 if x0 is not None else system.initial_state)
    location = start_location
    effective_input_stream = input_stream or _missing_input_stream
    should_capture = _resolve_capture_inputs(
        capture_inputs=capture_inputs,
        input_stream=input_stream,
    )

    t_segments: list[np.ndarray] = []
    x_segments: list[np.ndarray] = []
    location_segments: list[np.ndarray] = []
    sol_segments: list[Callable[[np.ndarray], np.ndarray] | None] = []
    events: list[Event] = []

    t_current = float(t_span[0])
    t_final = float(t_span[1])
    jumps = 0

    time_epsilon = 1e-12
    sample_grid = _prepare_sample_times(t_span, sample_times, sample_dt)
    needs_dense = dense_output or sample_grid is not None

    while t_current < t_final:
        transitions = system.transitions_from(location)
        event_fns = _build_event_functions(
            transitions,
            system.parameters,
            location.parameters,
            effective_input_stream,
        )

        solve_kwargs = {
            "fun": _wrap_flow(
                location,
                system.parameters,
                effective_input_stream,
            ),
            "t_span": (t_current, t_final),
            "y0": state,
            "events": event_fns or None,
            "rtol": rtol,
            "atol": atol,
            "dense_output": needs_dense,
        }
        if max_step is not None:
            solve_kwargs["max_step"] = max_step

        result = solve_ivp(**solve_kwargs)
        if not result.success:
            message = f"ODE integration failed: {result.message}"
            raise RuntimeError(message)

        t_segments.append(result.t)
        x_segments.append(result.y.T)
        location_segments.append(
            np.full(result.t.shape, location, dtype=object),
        )
        sol_segments.append(result.sol)

        if not result.t_events or all(
            len(event_list) == 0 for event_list in result.t_events
        ):
            break

        triggered_index, event_time, event_state = _first_event(
            result.t_events,
            result.y_events,
        )
        is_zero_time_event = event_time - t_current <= time_epsilon
        transition = transitions[triggered_index]
        jumps += 1
        if jumps > max_jumps:
            message = "Maximum number of transitions exceeded."
            raise RuntimeError(message)

        state, event = _apply_transition(
            transition,
            event_time,
            event_state,
            system.parameters,
            effective_input_stream,
        )
        events.append(event)

        location = transition.target
        while True:
            transitions = system.transitions_from(location)
            immediate_index = _immediate_transition_index(
                transitions,
                location,
                event_time,
                state,
                system.parameters,
                effective_input_stream,
                time_epsilon,
            )
            if immediate_index is None:
                break

            transition = transitions[immediate_index]
            jumps += 1
            if jumps > max_jumps:
                message = "Maximum number of transitions exceeded."
                raise RuntimeError(message)

            state, event = _apply_transition(
                transition,
                event_time,
                state,
                system.parameters,
                effective_input_stream,
            )
            events.append(event)

            location = transition.target

        if is_zero_time_event:
            t_current = min(t_final, t_current + time_epsilon)
        else:
            t_current = min(t_final, event_time + time_epsilon)

    if sample_grid is None:
        t_all = _concat_segments(t_segments)
        x_all = _concat_segments(x_segments)
        location_objects = _concat_segments(location_segments)
        location_all = _location_labels(location_objects)
        u_all = None
        dx_all = None
        if should_capture:
            if input_stream is None:
                message = "Internal error: expected input_stream for capture."
                raise RuntimeError(message)
            u_all = _capture_inputs(t_all, input_stream)
        if capture_derivatives:
            dx_all = _capture_derivatives(
                system=system,
                times=t_all,
                states=x_all,
                locations=location_objects,
                input_stream=effective_input_stream,
            )
        return Trace(
            t=t_all,
            x=x_all,
            location=location_all,
            events=tuple(events),
            u=u_all,
            dx=dx_all,
        )

    rolled = _rollout_segments(
        sample_grid,
        t_segments,
        x_segments,
        location_segments,
        sol_segments,
    )
    u_all = None
    dx_all = None
    if should_capture:
        if input_stream is None:
            message = "Internal error: expected input_stream for capture."
            raise RuntimeError(message)
        u_all = _capture_inputs(rolled.t, input_stream)
    if capture_derivatives:
        dx_all = _capture_derivatives(
            system=system,
            times=rolled.eval_t,
            states=rolled.x,
            locations=rolled.location,
            input_stream=effective_input_stream,
        )
    location_all = _location_labels(rolled.location)
    return Trace(
        t=rolled.t,
        x=rolled.x,
        location=location_all,
        events=tuple(events),
        u=u_all,
        dx=dx_all,
    )