avro.decode
Decodes a field's raw data in the Avro format.
Description
The processor takes raw data (bytes or a string) in the specified field and decodes it from the Avro format into structured data. It extracts the schema ID from the data, downloads the associated schema from the schema registry and decodes the payload. The schema is cached locally after it's first downloaded.
If the processor encounters structured data or the data can't be decoded it returns an error.
This processor is the counterpart to avro.encode
.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.decode"
settings:
# The field that will be decoded.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"
Name | Type | Default | Description |
---|---|---|---|
field | string | .Payload.After | The field that will be decoded. For more information about the format, see Referencing fields. |
sdk.schema.decode.key.enabled | bool | true | Whether to decode the record key using its corresponding schema from the schema registry. |
sdk.schema.decode.payload.enabled | bool | true | Whether to decode the record payload using its corresponding schema from the schema registry. |
sdk.schema.encode.key.enabled | bool | true | Whether to encode the record key using its corresponding schema from the schema registry. |
sdk.schema.encode.payload.enabled | bool | true | Whether to encode the record payload using its corresponding schema from the schema registry. |
Examples
Decode a record field in Avro format
This example shows the usage of the avro.decode
processor.
The processor decodes the record's.Key
field using the schema that is
downloaded from the schema registry and needs to exist under the subjectexample-decode
.
In this example we use the following schema:
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.decode"
settings:
field: ".Key"
Name | Value |
---|---|
field | .Key |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdC1wb3NpdGlvbg==", | 2 | "position": "dGVzdC1wb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | "key1": "val1" | 5 | "key1": "val1" | ||
6 | }, | 6 | }, | ||
7 | - | "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", | 7 | + | "key": { |
8 | + | "myInt": 1, | |||
9 | + | "myString": "bar" | |||
10 | + | }, | |||
8 | "payload": { | 11 | "payload": { | ||
9 | "before": null, | 12 | "before": null, | ||
10 | "after": null | 13 | "after": null | ||
11 | } | 14 | } | ||
12 | } | 15 | } |