unwrap.opencdc
Unwraps an OpenCDC record saved in one of the record's fields.
Description
The unwrap.opencdc
processor is useful in situations where a record goes through intermediate
systems before being written to a final destination. In these cases, the original OpenCDC record is part of the payload
read from the intermediate system and needs to be unwrapped before being written.
Note: if the wrapped OpenCDC record is not in a structured data field, then it's assumed that it's stored in JSON format.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.opencdc"
settings:
# Field is a reference to the field that contains the OpenCDC record.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"
Name | Type | Default | Description |
---|---|---|---|
field | string | .Payload.After | Field is a reference to the field that contains the OpenCDC record. For more information about the format, see Referencing fields. |
sdk.schema.decode.key.enabled | bool | true | Whether to decode the record key using its corresponding schema from the schema registry. |
sdk.schema.decode.payload.enabled | bool | true | Whether to decode the record payload using its corresponding schema from the schema registry. |
sdk.schema.encode.key.enabled | bool | true | Whether to encode the record key using its corresponding schema from the schema registry. |
sdk.schema.encode.payload.enabled | bool | true | Whether to encode the record payload using its corresponding schema from the schema registry. |
Examples
Unwrap an OpenCDC record
In this example we use the unwrap.opencdc
processor to unwrap the OpenCDC record found in the record's .Payload.After
field.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "unwrap.opencdc"
settings:
field: ".Payload.After"
Name | Value |
---|---|
field | .Payload.After |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "d3JhcHBpbmcgcG9zaXRpb24=", | 2 | "position": "d3JhcHBpbmcgcG9zaXRpb24=", | ||
3 | - | "operation": "create", | 3 | + | "operation": "update", |
4 | "metadata": {}, | 4 | "metadata": {}, | ||
5 | - | "key": "wrapping key", | 5 | + | "key": { |
6 | + | "id": "test-key" | |||
7 | + | }, | |||
6 | "payload": { | 8 | "payload": { | ||
7 | "before": null, | 9 | "before": null, | ||
8 | "after": { | 10 | "after": { | ||
9 | - | "key": { | 11 | + | "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", |
10 | - | "id": "test-key" | 12 | + | "sensor_id": 1250383582, |
11 | - | }, | 13 | + | "triggered": false |
12 | - | "metadata": {}, | |||
13 | - | "operation": "update", | |||
14 | - | "payload": { | |||
15 | - | "after": { | |||
16 | - | "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", | |||
17 | - | "sensor_id": 1250383582, | |||
18 | - | "triggered": false | |||
19 | - | }, | |||
20 | - | "before": null | |||
21 | - | }, | |||
22 | - | "position": "dGVzdC1wb3NpdGlvbg==" | |||
23 | } | 14 | } | ||
24 | } | 15 | } | ||
25 | } | 16 | } |