unwrap.debezium
Unwraps a Debezium record from the input OpenCDC record.
Description
In this processor, the wrapped (Debezium) record replaces the wrapping record (being processed) completely, except for the position.
The Debezium record's metadata and the wrapping record's metadata is merged, with the Debezium metadata having precedence.
This is useful in cases where Conduit acts as an intermediary between a Debezium source and a Debezium destination. In such cases, the Debezium record is set as the OpenCDC record's payload,and needs to be unwrapped for further usage.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.debezium"
settings:
# Field is a reference to the field that contains the Debezium record.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"
Name | Type | Default | Description |
---|---|---|---|
field | string | .Payload.After | Field is a reference to the field that contains the Debezium record. For more information about the format, see Referencing fields. |
sdk.schema.decode.key.enabled | bool | true | Whether to decode the record key using its corresponding schema from the schema registry. |
sdk.schema.decode.payload.enabled | bool | true | Whether to decode the record payload using its corresponding schema from the schema registry. |
sdk.schema.encode.key.enabled | bool | true | Whether to encode the record key using its corresponding schema from the schema registry. |
sdk.schema.encode.payload.enabled | bool | true | Whether to encode the record payload using its corresponding schema from the schema registry. |
Examples
Unwrap a Debezium record
This example how to unwrap a Debezium record from a field nested in a record's
.Payload.After
field. It additionally shows how the key is unwrapped, and the metadata merged.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.debezium"
settings:
field: ".Payload.After.nested"
Name | Value |
---|---|
field | .Payload.After.nested |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdC1wb3NpdGlvbg==", | 2 | "position": "dGVzdC1wb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | - | "metadata-key": "metadata-value" | 5 | + | "metadata-key": "metadata-value", |
6 | + | "opencdc.readAt": "1674061777225877000", | |||
7 | + | "opencdc.version": "v1" | |||
6 | }, | 8 | }, | ||
7 | - | "key": "{\"payload\":\"27\"}", | 9 | + | "key": "27", |
8 | "payload": { | 10 | "payload": { | ||
9 | "before": null, | 11 | "before": null, | ||
10 | "after": { | 12 | "after": { | ||
11 | - | "nested": "{\n \"payload\": {\n \"after\": {\n \"description\": \"test1\",\n \"id\": 27\n },\n \"before\": null,\n \"op\": \"c\",\n \"source\": {\n \"opencdc.readAt\": \"1674061777225877000\",\n \"opencdc.version\": \"v1\"\n },\n \"transaction\": null,\n \"ts_ms\": 1674061777225\n },\n \"schema\": {}\n}" | 13 | + | "description": "test1", |
14 | + | "id": 27 | |||
12 | } | 15 | } | ||
13 | } | 16 | } | ||
14 | } | 17 | } |