Skip to content

opc

OPCAdapter(config_path)

Bases: Adapter

Flowcean adapter for OPC (Open Platform Communications) protocol.

Initialize the OPC adapter.

Initializes a new OPC to Flowcean adapter instance. The adapter reads its configuration from a YAML file. The path to this file is provided in the config_path argument and the file has the following structure:

# Path to the OPC Server
server-url: "opc.tcp://127.0.0.1:4840"

# Specify the time window to record before the recording flag is set.
# This enables the capture and processing of data that triggers the
# recording flag.
pre_capture_window_length: 2

# Define Flowcean inputs which their respective feature name, the
# opc-id and the datatype of the feature.
# All inputs will be passed on to the Flowcean model when triggering an
# inference.
# Available data types are: float32, float64, int32, int64 and bool.
inputs:
    - feature: "feature_a"
      opc-id: "<OPC-ID String>"
      type: "int32"
    - feature: "feature_b"
      opc-id: "<OPC-ID String>"
      type: "float64"
    - ...

# Define the outputs from the Flowcean Model to the OPC Server.
# All outputs need to be mapped to on OPC field for the adapter to work
# correctly. If certain outputs are not needed / used, those should be
# dropped using transforms. The available datatypes are the same as for
# the input features.
outputs:
    - feature: "result_feature"
    opc-id: "<OPC-ID String>"
    type: "int32"

# Flags are special OPC fields, which are used to communicate between
# the adapter and the OPC server.

# The streaming flag is set by the OPC server to `True` when Flowcean
# should start to record data. Once set back to `False`, the data is
# forwarded to the model, an inference step is executed and the results
# are send back to the OPC server.
stream_flag: "<OPC-ID String>"

# The prediction flag is set to `True` by the adapter when it
# successfully sends data to the OPC server.
# This can be used on the OPC side to process the newly received data.
# The flag is *not* reset by the adapter, so it will remain `True`
# until the OPC server resets it.
prediction_flag: "<OPC-ID String>"

# The connection flag is set to `True` by the adapter when it
# successfully connects to the OPC server and set to `False` when it
# disconnects from the server.
# This can be used to monitor the connection status of the adapter from
# the OPC side.
connection_flag: "<OPC-ID String>"

Parameters:

Name Type Description Default
config_path str | Path

Path to the YAML configuration file containing the OPC server URL and feature definitions.

required
Source code in src/flowcean/adapter/opc/adapter.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def __init__(self, config_path: str | Path) -> None:
    r"""Initialize the OPC adapter.

    Initializes a new OPC to Flowcean adapter instance.
    The adapter reads its configuration from a YAML file.
    The path to this file is provided in the config_path argument and the
    file has the following structure:
    ```
    # Path to the OPC Server
    server-url: "opc.tcp://127.0.0.1:4840"

    # Specify the time window to record before the recording flag is set.
    # This enables the capture and processing of data that triggers the
    # recording flag.
    pre_capture_window_length: 2

    # Define Flowcean inputs which their respective feature name, the
    # opc-id and the datatype of the feature.
    # All inputs will be passed on to the Flowcean model when triggering an
    # inference.
    # Available data types are: float32, float64, int32, int64 and bool.
    inputs:
        - feature: "feature_a"
          opc-id: "<OPC-ID String>"
          type: "int32"
        - feature: "feature_b"
          opc-id: "<OPC-ID String>"
          type: "float64"
        - ...

    # Define the outputs from the Flowcean Model to the OPC Server.
    # All outputs need to be mapped to on OPC field for the adapter to work
    # correctly. If certain outputs are not needed / used, those should be
    # dropped using transforms. The available datatypes are the same as for
    # the input features.
    outputs:
        - feature: "result_feature"
        opc-id: "<OPC-ID String>"
        type: "int32"

    # Flags are special OPC fields, which are used to communicate between
    # the adapter and the OPC server.

    # The streaming flag is set by the OPC server to `True` when Flowcean
    # should start to record data. Once set back to `False`, the data is
    # forwarded to the model, an inference step is executed and the results
    # are send back to the OPC server.
    stream_flag: "<OPC-ID String>"

    # The prediction flag is set to `True` by the adapter when it
    # successfully sends data to the OPC server.
    # This can be used on the OPC side to process the newly received data.
    # The flag is *not* reset by the adapter, so it will remain `True`
    # until the OPC server resets it.
    prediction_flag: "<OPC-ID String>"

    # The connection flag is set to `True` by the adapter when it
    # successfully connects to the OPC server and set to `False` when it
    # disconnects from the server.
    # This can be used to monitor the connection status of the adapter from
    # the OPC side.
    connection_flag: "<OPC-ID String>"
    ```

    Args:
        config_path: Path to the YAML configuration file containing
                the OPC server URL and feature definitions.
    """
    super().__init__()

    # Load the configuration from the provided path
    with Path(config_path).open() as yaml_file:
        yaml_data = YAML(typ="safe").load(yaml_file)

    # Initialize the adapter fields
    self.input_features = {}
    self.output_features = {}

    schema: dict[str, Any] = {}

    # Initialize the OPC client
    self.client = Client(yaml_data["server-url"])
    self.client.connect()
    self.client.load_type_definitions()

    # Get inputs from the YAML configuration
    # Each input has a feature name, an opc-id, and a type
    for feature in yaml_data["inputs"]:
        self.input_features[feature["feature"]] = self.client.get_node(
            feature["opc-id"],
        )
        # Build the input schema from opc to flowcean
        schema[feature["feature"]] = _type_mapping_str2pl[feature["type"]]

    for feature in yaml_data["outputs"]:
        self.output_features[feature["feature"]] = self.client.get_node(
            feature["opc-id"],
        )

    streaming_flag_opc_id = yaml_data["stream_flag"]
    connection_flag_opc_id = yaml_data["connection_flag"]
    prediction_flag_opc_id = yaml_data["prediction_flag"]
    self.pre_capture_window_length = timedelta(
        seconds=yaml_data["pre_capture_window_length"],
    )

    # Setup the remaining opc nodes
    self.streaming_flag_node = self.client.get_node(
        streaming_flag_opc_id,
    )
    self.connection_flag_node = self.client.get_node(
        connection_flag_opc_id,
    )
    self.prediction_flag_node = self.client.get_node(
        prediction_flag_opc_id,
    )

    # Initialize the recording DataFrame with the derived schema
    self.input_schema = schema.copy()
    # Add a special column for the recorded time - this column will
    # only be used internally and dropped before returning the data
    # to flowcean
    schema["_recorded_time"] = pl.Datetime
    self.recorded_data = pl.DataFrame(schema=schema)

get_data()

Get data from the OPC server and return it to flowcean.

Source code in src/flowcean/adapter/opc/adapter.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def get_data(self) -> Data:
    """Get data from the OPC server and return it to flowcean."""
    self.recorded_data.clear()

    # We have to wait until the streaming flag is set to True
    # by the CPS/OPC server before we can start recording data.

    # First we wait until the streaming flag is set to True
    # by the CPS/OPC server.
    prerecording_in_progress = True

    def stream_data() -> bool:
        nonlocal prerecording_in_progress
        # Check if we are done with pre-recording and streaming
        if (
            not prerecording_in_progress
            and not self.streaming_handler.is_streaming()
        ):
            return False

        # Record a sample of data
        results = self.client.get_values(self.input_features.values())
        self.recorded_data = pl.concat(
            [
                self.recorded_data,
                pl.DataFrame(
                    [results],
                    schema=self.input_schema,
                    orient="row",
                ).with_columns(
                    _recorded_time=pl.lit(datetime.now(timezone.utc)).cast(
                        pl.Datetime,
                    ),
                ),
            ],
        )

        # Check if we are still pre-recording
        if self.streaming_handler.is_streaming():
            prerecording_in_progress = False
        else:
            # Still pre-recording
            # Discard old data that is older than `capture_time`.
            self.recorded_data = self.recorded_data.filter(
                pl.col("_recorded_time")
                >= pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime)
                - self.pre_capture_window_length,
            )

        return True

    # Run the `stream_data` function in a timed loop
    OPCAdapter._timed_loop(
        stream_data,
        150.0,
    )

    # Get rid of any duplicates and the `_recorded_time` feature column
    # in the recorded data
    return (
        self.recorded_data.lazy()
        .drop("_recorded_time")
        .unique(
            maintain_order=True,
        )
    )

send_data(data)

Send data to the OPC server.

Send a polars DataFrame or LazyFrame to the OPC server. The data must contain all required output features, otherwise a ValueError is raised.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

Polars DataFrame or LazyFrame containing the data to send.

required
Source code in src/flowcean/adapter/opc/adapter.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def send_data(self, data: pl.DataFrame | pl.LazyFrame) -> None:
    """Send data to the OPC server.

    Send a polars DataFrame or LazyFrame to the OPC server.
    The data must contain all required output features, otherwise
    a ValueError is raised.

    Args:
        data: Polars DataFrame or LazyFrame containing the data to send.
    """
    # Convert the data into a Dataframe
    df = cast("pl.DataFrame | pl.LazyFrame", data).lazy().collect()

    # Check if the data contains all required output features
    if not all(feature in df.columns for feature in self.output_features):
        missing_features = [
            feature
            for feature in self.output_features
            if feature not in df.columns
        ]
        msg = (
            f"Data is missing required output features: "
            f"{', '.join(missing_features)}"
        )
        raise ValueError(msg)

    # Send the data to the OPC server
    data_dict = df.row(0, named=True)
    for feature_name, node in self.output_features.items():
        # Get the value for the feature from the DataFrame
        value = data_dict[feature_name]

        # Set the nodes value
        node.set_attribute(
            ua.AttributeIds.Value,
            ua.DataValue(
                ua.Variant(
                    value=value,
                    varianttype=_opc_from_polars(df.schema[feature_name]),
                ),
            ),
        )

    # Set the prediction flag to True to indicate that new data has been
    # sent
    self.prediction_flag_node.set_attribute(
        ua.AttributeIds.Value,
        ua.DataValue(
            ua.Variant(
                value=True,
                varianttype=ua.VariantType.Boolean,
            ),
        ),
    )