Skip to main content

unwrap.debezium

Unwraps a Debezium record from the input OpenCDC record.

Description

In this processor, the wrapped (Debezium) record replaces the wrapping record (being processed) completely, except for the position.

The Debezium record's metadata and the wrapping record's metadata is merged, with the Debezium metadata having precedence.

This is useful in cases where Conduit acts as an intermediary between a Debezium source and a Debezium destination. In such cases, the Debezium record is set as the OpenCDC record's payload,and needs to be unwrapped for further usage.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.debezium"
settings:
# Field is a reference to the field that contains the Debezium record.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"

Examples

Unwrap a Debezium record

This example how to unwrap a Debezium record from a field nested in a record's .Payload.After field. It additionally shows how the key is unwrapped, and the metadata merged.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.debezium"
settings:
field: ".Payload.After.nested"

Record difference

Before
After
1
{
1
{
2
  "position": "dGVzdC1wb3NpdGlvbg==",
2
  "position": "dGVzdC1wb3NpdGlvbg==",
3
  "operation": "create",
3
  "operation": "create",
4
  "metadata": {
4
  "metadata": {
5
-
    "metadata-key": "metadata-value"
5
+
    "metadata-key": "metadata-value",
6
+
    "opencdc.readAt": "1674061777225877000",
7
+
    "opencdc.version": "v1"
6
  },
8
  },
7
-
  "key": "{\"payload\":\"27\"}",
9
+
  "key": "27",
8
  "payload": {
10
  "payload": {
9
    "before": null,
11
    "before": null,
10
    "after": {
12
    "after": {
11
-
      "nested": "{\n  \"payload\": {\n    \"after\": {\n      \"description\": \"test1\",\n      \"id\": 27\n    },\n    \"before\": null,\n    \"op\": \"c\",\n    \"source\": {\n      \"opencdc.readAt\": \"1674061777225877000\",\n      \"opencdc.version\": \"v1\"\n    },\n    \"transaction\": null,\n    \"ts_ms\": 1674061777225\n  },\n  \"schema\": {}\n}"
13
+
      "description": "test1",
14
+
      "id": 27
12
    }
15
    }
13
  }
16
  }
14
}
17
}

scarf pixel conduit-site-docs-using-processors