Skip to content

callbacks

Public callback API for learner progress feedback.

CallbackManager(callbacks=None)

Manage multiple callbacks through a single interface.

Initialize the callback manager.

Source code in src/flowcean/core/callbacks/base.py
61
62
63
def __init__(self, callbacks: list[LearnerCallback] | None = None) -> None:
    """Initialize the callback manager."""
    self.callbacks = callbacks or []

on_learning_start(learner, context=None)

Notify all callbacks that learning has started.

Source code in src/flowcean/core/callbacks/base.py
65
66
67
68
69
70
71
72
def on_learning_start(
    self,
    learner: Named,
    context: dict[str, Any] | None = None,
) -> None:
    """Notify all callbacks that learning has started."""
    for callback in self.callbacks:
        callback.on_learning_start(learner, context)

on_learning_progress(learner, progress=None, metrics=None)

Notify all callbacks of learning progress.

Source code in src/flowcean/core/callbacks/base.py
74
75
76
77
78
79
80
81
82
def on_learning_progress(
    self,
    learner: Named,
    progress: float | None = None,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Notify all callbacks of learning progress."""
    for callback in self.callbacks:
        callback.on_learning_progress(learner, progress, metrics)

on_learning_end(learner, model, metrics=None)

Notify all callbacks that learning has completed.

Source code in src/flowcean/core/callbacks/base.py
84
85
86
87
88
89
90
91
92
def on_learning_end(
    self,
    learner: Named,
    model: Model,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Notify all callbacks that learning has completed."""
    for callback in self.callbacks:
        callback.on_learning_end(learner, model, metrics)

on_learning_error(learner, error)

Notify all callbacks that learning has failed.

Source code in src/flowcean/core/callbacks/base.py
 94
 95
 96
 97
 98
 99
100
101
def on_learning_error(
    self,
    learner: Named,
    error: Exception,
) -> None:
    """Notify all callbacks that learning has failed."""
    for callback in self.callbacks:
        callback.on_learning_error(learner, error)

LearnerCallback

Bases: Protocol

Protocol for learner callbacks.

on_learning_start(learner, context=None) abstractmethod

Called when learning starts.

Source code in src/flowcean/core/callbacks/base.py
23
24
25
26
27
28
29
@abstractmethod
def on_learning_start(
    self,
    learner: Named,
    context: dict[str, Any] | None = None,
) -> None:
    """Called when learning starts."""

on_learning_progress(learner, progress=None, metrics=None) abstractmethod

Called during learning with progress updates.

Source code in src/flowcean/core/callbacks/base.py
31
32
33
34
35
36
37
38
@abstractmethod
def on_learning_progress(
    self,
    learner: Named,
    progress: float | None = None,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Called during learning with progress updates."""

on_learning_end(learner, model, metrics=None) abstractmethod

Called when learning completes successfully.

Source code in src/flowcean/core/callbacks/base.py
40
41
42
43
44
45
46
47
@abstractmethod
def on_learning_end(
    self,
    learner: Named,
    model: Model,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Called when learning completes successfully."""

on_learning_error(learner, error) abstractmethod

Called if learning fails with an error.

Source code in src/flowcean/core/callbacks/base.py
49
50
51
52
53
54
55
@abstractmethod
def on_learning_error(
    self,
    learner: Named,
    error: Exception,
) -> None:
    """Called if learning fails with an error."""

SilentCallback

A callback that intentionally produces no output.

on_learning_start(learner, context=None)

Do nothing when learning starts.

Source code in src/flowcean/core/callbacks/base.py
107
108
109
110
111
112
def on_learning_start(
    self,
    learner: Named,
    context: dict[str, Any] | None = None,
) -> None:
    """Do nothing when learning starts."""

on_learning_progress(learner, progress=None, metrics=None)

Do nothing during learning progress.

Source code in src/flowcean/core/callbacks/base.py
114
115
116
117
118
119
120
def on_learning_progress(
    self,
    learner: Named,
    progress: float | None = None,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Do nothing during learning progress."""

on_learning_end(learner, model, metrics=None)

Do nothing when learning ends.

Source code in src/flowcean/core/callbacks/base.py
122
123
124
125
126
127
128
def on_learning_end(
    self,
    learner: Named,
    model: Model,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Do nothing when learning ends."""

on_learning_error(learner, error)

Do nothing when learning errors.

Source code in src/flowcean/core/callbacks/base.py
130
131
132
133
134
135
def on_learning_error(
    self,
    learner: Named,
    error: Exception,
) -> None:
    """Do nothing when learning errors."""

LoggingCallback(logger=None, level_start=logging.INFO, level_progress=logging.DEBUG, level_end=logging.INFO, level_error=logging.ERROR)

Bases: LearnerCallback

Standard Python logging callback.

Initialize the logging callback.

Source code in src/flowcean/core/callbacks/logging.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
    self,
    logger: logging.Logger | None = None,
    level_start: int = logging.INFO,
    level_progress: int = logging.DEBUG,
    level_end: int = logging.INFO,
    level_error: int = logging.ERROR,
) -> None:
    """Initialize the logging callback."""
    self.logger = logger or logging.getLogger("flowcean.learner")
    self.level_start = level_start
    self.level_progress = level_progress
    self.level_end = level_end
    self.level_error = level_error

on_learning_start(learner, context=None)

Log learning start event.

Source code in src/flowcean/core/callbacks/logging.py
34
35
36
37
38
39
40
41
42
43
44
def on_learning_start(
    self,
    learner: Named,
    context: dict[str, Any] | None = None,
) -> None:
    """Log learning start event."""
    message = f"[{learner.name}] Learning started"
    if context:
        context_str = ", ".join(f"{k}={v}" for k, v in context.items())
        message += f" ({context_str})"
    self.logger.log(self.level_start, message)

on_learning_progress(learner, progress=None, metrics=None)

Log learning progress.

Source code in src/flowcean/core/callbacks/logging.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def on_learning_progress(
    self,
    learner: Named,
    progress: float | None = None,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Log learning progress."""
    message = f"[{learner.name}] Learning in progress"
    if progress is not None:
        message += f" ({progress * 100:.1f}%)"
    if metrics:
        metrics_str = ", ".join(f"{k}={v}" for k, v in metrics.items())
        message += f" - {metrics_str}"
    self.logger.log(self.level_progress, message)

on_learning_end(learner, model, metrics=None)

Log learning completion.

Source code in src/flowcean/core/callbacks/logging.py
61
62
63
64
65
66
67
68
69
70
71
72
def on_learning_end(
    self,
    learner: Named,
    model: Model,  # noqa: ARG002
    metrics: dict[str, Any] | None = None,
) -> None:
    """Log learning completion."""
    message = f"[{learner.name}] Learning finished"
    if metrics:
        metrics_str = ", ".join(f"{k}={v}" for k, v in metrics.items())
        message += f" ({metrics_str})"
    self.logger.log(self.level_end, message)

on_learning_error(learner, error)

Log learning error.

Source code in src/flowcean/core/callbacks/logging.py
74
75
76
77
78
79
80
81
82
83
84
85
86
def on_learning_error(
    self,
    learner: Named,
    error: Exception,
) -> None:
    """Log learning error."""
    self.logger.log(
        self.level_error,
        "[%s] Learning failed: %s",
        learner.name,
        error,
        exc_info=error,
    )

RichCallback(console=None, *, show_metrics=True)

Bases: LearnerCallback

Rich console callback with adaptive progress display.

Initialize the Rich callback.

Source code in src/flowcean/core/callbacks/rich.py
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self,
    console: Console | None = None,
    *,
    show_metrics: bool = True,
) -> None:
    """Initialize the Rich callback."""
    self.console = console or Console()
    self.show_metrics = show_metrics
    self._live: Live | None = None
    self._progress: Progress | None = None
    self._task_id: TaskID | None = None
    self._has_progress_updates = False

on_learning_start(learner, context=None)

Display learning start message.

Source code in src/flowcean/core/callbacks/rich.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def on_learning_start(
    self,
    learner: Named,
    context: dict[str, Any] | None = None,
) -> None:
    """Display learning start message."""
    message = Text()
    message.append("⠿ ", style="bold blue")
    message.append(f"[{learner.name}] Learning", style="bold blue")
    if context and self.show_metrics:
        context_str = ", ".join(f"{k}={v}" for k, v in context.items())
        message.append(f" ({context_str})", style="dim")
    message.append("...", style="dim")

    self.console.print(message)
    self._has_progress_updates = False

on_learning_progress(learner, progress=None, metrics=None)

Update progress display with current progress and metrics.

Source code in src/flowcean/core/callbacks/rich.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def on_learning_progress(
    self,
    learner: Named,
    progress: float | None = None,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Update progress display with current progress and metrics."""
    if not self._has_progress_updates:
        self._has_progress_updates = True
        self._progress = Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]{task.description}"),
            BarColumn(),
            TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
            MofNCompleteColumn(),
            TimeElapsedColumn(),
            console=self.console,
        )

        description = f"[{learner.name}] Learning"
        self._task_id = self._progress.add_task(
            description,
            total=100 if progress else None,
        )

        self._live = Live(
            self._progress,
            console=self.console,
            refresh_per_second=10,
        )
        self._live.start()

    if self._progress and self._task_id is not None:
        description = f"[{learner.name}] Learning"
        if metrics and self.show_metrics:
            metrics_str = ", ".join(f"{k}={v}" for k, v in metrics.items())
            description += f" ({metrics_str})"

        if progress is not None:
            if self._progress.tasks[self._task_id].total is None:
                self._progress.update(self._task_id, total=100)
            self._progress.update(
                self._task_id,
                completed=progress * 100,
                description=description,
            )
        else:
            self._progress.update(
                self._task_id,
                description=description,
                advance=0.1,
            )

        if self._live:
            self._live.refresh()

on_learning_end(learner, model, metrics=None)

Display learning completion message.

Source code in src/flowcean/core/callbacks/rich.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def on_learning_end(
    self,
    learner: Named,
    model: Model,  # noqa: ARG002
    metrics: dict[str, Any] | None = None,
) -> None:
    """Display learning completion message."""
    if self._progress and self._task_id is not None:
        if self._progress.tasks[self._task_id].total is None:
            self._progress.update(self._task_id, total=100)
        self._progress.update(self._task_id, completed=100)

    if self._live:
        self._live.stop()

    message = Text(
        f"✓ [{learner.name}] Learning finished",
        style="bold green",
    )
    if metrics and self.show_metrics:
        metrics_str = ", ".join(f"{k}={v}" for k, v in metrics.items())
        message.append(f" ({metrics_str})", style="green")

    self.console.print(message)
    self._live = None
    self._progress = None
    self._task_id = None
    self._has_progress_updates = False

on_learning_error(learner, error)

Display error message.

Source code in src/flowcean/core/callbacks/rich.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def on_learning_error(
    self,
    learner: Named,
    error: Exception,
) -> None:
    """Display error message."""
    if self._live:
        self._live.stop()

    self.console.print(
        f"✗ [{learner.name}] Learning failed: {error}",
        style="bold red",
    )

    self._live = None
    self._progress = None
    self._task_id = None
    self._has_progress_updates = False

RichSpinnerCallback(console=None)

Bases: LearnerCallback

Simplified Rich callback with just a spinner.

Initialize the Rich spinner callback.

Source code in src/flowcean/core/callbacks/rich.py
171
172
173
174
175
176
177
def __init__(
    self,
    console: Console | None = None,
) -> None:
    """Initialize the Rich spinner callback."""
    self.console = console or Console()
    self._live: Live | None = None

on_learning_start(learner, context=None)

Display learning start message with spinner.

Source code in src/flowcean/core/callbacks/rich.py
179
180
181
182
183
184
185
186
187
188
189
190
def on_learning_start(
    self,
    learner: Named,
    context: dict[str, Any] | None = None,  # noqa: ARG002
) -> None:
    """Display learning start message with spinner."""
    text = Text()
    text.append("⠋", style="bold blue")
    text.append(f" [{learner.name}] Learning", style="bold")

    self._live = Live(text, console=self.console, refresh_per_second=10)
    self._live.start()

on_learning_progress(learner, progress=None, metrics=None)

Update spinner state.

Source code in src/flowcean/core/callbacks/rich.py
192
193
194
195
196
197
198
def on_learning_progress(
    self,
    learner: Named,
    progress: float | None = None,
    metrics: dict[str, Any] | None = None,
) -> None:
    """Update spinner state."""

on_learning_end(learner, model, metrics=None)

Display learning completion message.

Source code in src/flowcean/core/callbacks/rich.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def on_learning_end(
    self,
    learner: Named,
    model: Model,  # noqa: ARG002
    metrics: dict[str, Any] | None = None,  # noqa: ARG002
) -> None:
    """Display learning completion message."""
    if self._live:
        self._live.stop()

    self.console.print(
        f"✓ [{learner.name}] Learning finished",
        style="bold green",
    )
    self._live = None

on_learning_error(learner, error)

Display error message.

Source code in src/flowcean/core/callbacks/rich.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def on_learning_error(
    self,
    learner: Named,
    error: Exception,
) -> None:
    """Display error message."""
    if self._live:
        self._live.stop()

    self.console.print(
        f"✗ [{learner.name}] Learning failed: {error}",
        style="bold red",
    )
    self._live = None

CallbackMixin

Mixin to add callback support to learners.

create_callback_manager(callbacks)

Create a CallbackManager from supported callback inputs.

Parameters:

Name Type Description Default
callbacks list[LearnerCallback] | LearnerCallback | None

Callbacks to manage. Can be: - None: Uses no callbacks - Single callback: Wraps in a list - List of callbacks: Uses directly

required

Returns:

Type Description
CallbackManager

CallbackManager instance.

Source code in src/flowcean/core/callbacks/support.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def create_callback_manager(
    callbacks: list[LearnerCallback] | LearnerCallback | None,
) -> CallbackManager:
    """Create a CallbackManager from supported callback inputs.

    Args:
        callbacks: Callbacks to manage. Can be:
            - None: Uses no callbacks
            - Single callback: Wraps in a list
            - List of callbacks: Uses directly

    Returns:
        CallbackManager instance.
    """
    if callbacks is None:
        return CallbackManager([])
    if isinstance(callbacks, list):
        return CallbackManager(callbacks)
    return CallbackManager([callbacks])

get_default_callbacks()

Return the default callbacks for learners.

Returns:

Type Description
list[LearnerCallback]

An empty list so learners stay silent unless callbacks are provided.

Source code in src/flowcean/core/callbacks/support.py
32
33
34
35
36
37
38
def get_default_callbacks() -> list[LearnerCallback]:
    """Return the default callbacks for learners.

    Returns:
        An empty list so learners stay silent unless callbacks are provided.
    """
    return []