Skip to main content

unwrap.kafkaconnect

Unwraps a Kafka Connect record from an OpenCDC record.

Description

This processor unwraps a Kafka Connect record from the input OpenCDC record.

The input record's payload is replaced with the Kafka Connect record.

This is useful in cases where Conduit acts as an intermediary between a Debezium source and a Debezium destination. In such cases, the Debezium record is set as the OpenCDC record's payload, and needs to be unwrapped for further usage.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.kafkaconnect"
settings:
# Field is a reference to the field that contains the Kafka Connect
# record.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"

Examples

Unwrap a Kafka Connect record

This example shows how to unwrap a Kafka Connect record.

The Kafka Connect record is serialized as a JSON string in the .Payload.After field (raw data). The Kafka Connect record's payload will replace the OpenCDC record's payload.

We also see how the key is unwrapped too. In this case, the key comes in as structured data.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.kafkaconnect"
settings:
field: ".Payload.After"

Record difference

Before
After
1
{
1
{
2
  "position": "dGVzdCBwb3NpdGlvbg==",
2
  "position": "dGVzdCBwb3NpdGlvbg==",
3
  "operation": "create",
3
  "operation": "create",
4
  "metadata": {
4
  "metadata": {
5
    "metadata-key": "metadata-value"
5
    "metadata-key": "metadata-value"
6
  },
6
  },
7
  "key": {
7
  "key": {
8
-
    "payload": {
8
+
    "id": 27
9
-
      "id": 27
10
-
    },
11
-
    "schema": {}
12
  },
9
  },
13
  "payload": {
10
  "payload": {
14
    "before": null,
11
    "before": null,
15
-
    "after": "{\n\"payload\": {\n  \"description\": \"test2\"\n},\n\"schema\": {}\n}"
12
+
    "after": {
13
+
      "description": "test2"
14
+
    }
16
  }
15
  }
17
}
16
}

scarf pixel conduit-site-docs-using-processors