Skip to main content

unwrap.opencdc

Unwraps an OpenCDC record saved in one of the record's fields.

Description

The unwrap.opencdc processor is useful in situations where a record goes through intermediate systems before being written to a final destination. In these cases, the original OpenCDC record is part of the payload read from the intermediate system and needs to be unwrapped before being written.

Note: if the wrapped OpenCDC record is not in a structured data field, then it's assumed that it's stored in JSON format.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.opencdc"
settings:
# Field is a reference to the field that contains the OpenCDC record.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"

Examples

Unwrap an OpenCDC record

In this example we use the unwrap.opencdc processor to unwrap the OpenCDC record found in the record's .Payload.After field.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.opencdc"
settings:
field: ".Payload.After"

Record difference

Before
After
1
{
1
{
2
  "position": "d3JhcHBpbmcgcG9zaXRpb24=",
2
  "position": "d3JhcHBpbmcgcG9zaXRpb24=",
3
-
  "operation": "create",
3
+
  "operation": "update",
4
  "metadata": {},
4
  "metadata": {},
5
-
  "key": "wrapping key",
5
+
  "key": {
6
+
    "id": "test-key"
7
+
  },
6
  "payload": {
8
  "payload": {
7
    "before": null,
9
    "before": null,
8
    "after": {
10
    "after": {
9
-
      "key": {
11
+
      "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
10
-
        "id": "test-key"
12
+
      "sensor_id": 1250383582,
11
-
      },
13
+
      "triggered": false
12
-
      "metadata": {},
13
-
      "operation": "update",
14
-
      "payload": {
15
-
        "after": {
16
-
          "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
17
-
          "sensor_id": 1250383582,
18
-
          "triggered": false
19
-
        },
20
-
        "before": null
21
-
      },
22
-
      "position": "dGVzdC1wb3NpdGlvbg=="
23
    }
14
    }
24
  }
15
  }
25
}
16
}

scarf pixel conduit-site-docs-using-processors