avro.encode
Encodes a record's field into the Avro format.
Description
The processor takes a record's field and encodes it using a schema into the Avro format. It provides two strategies for determining the schema:
-
preRegistered (recommended) This strategy downloads an existing schema from the schema registry and uses it to encode the record. This requires the schema to already be registered in the schema registry. The schema is downloaded only once and cached locally.
-
autoRegister (for development purposes) This strategy infers the schema by inspecting the structured data and registers it in the schema registry. If the record schema is known in advance it's recommended to use the preRegistered strategy and manually register the schema, as this strategy comes with limitations.
The strategy uses reflection to traverse the structured data of each record and determine the type of each field. If a specific field is set to nil the processor won't have enough information to determine the type and will default to a nullable string. Because of this it is not guaranteed that two records with the same structure produce the same schema or even a backwards compatible schema. The processor registers each inferred schema in the schema registry with the same subject, therefore the schema compatibility checks need to be disabled for this schema to prevent failures. If the schema subject does not exist before running this processor, it will automatically set the correct compatibility settings in the schema registry.
This processor is the counterpart to avro.decode
.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.encode"
settings:
# The field that will be encoded.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# The subject name under which the inferred schema will be registered
# in the schema registry.
# Type: string
schema.autoRegister.subject: ""
# The subject of the schema in the schema registry used to encode the
# record.
# Type: string
schema.preRegistered.subject: ""
# The version of the schema in the schema registry used to encode the
# record.
# Type: int
schema.preRegistered.version: ""
# Strategy to use to determine the schema for the record. Available
# strategies are: * `preRegistered` (recommended) - Download an
# existing schema from the schema registry. This strategy is further
# configured with options starting with `schema.preRegistered.*`. *
# `autoRegister` (for development purposes) - Infer the schema from
# the record and register it in the schema registry. This strategy is
# further configured with options starting with
# `schema.autoRegister.*`.
# For more information about the behavior of each strategy read the
# main processor description.
# Type: string
schema.strategy: ""
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"
Name | Type | Default | Description |
---|---|---|---|
field | string | .Payload.After | The field that will be encoded. For more information about the format, see Referencing fields. |
schema.autoRegister.subject | string | null | The subject name under which the inferred schema will be registered in the schema registry. |
schema.preRegistered.subject | string | null | The subject of the schema in the schema registry used to encode the record. |
schema.preRegistered.version | int | null | The version of the schema in the schema registry used to encode the record. |
schema.strategy | string | null | Strategy to use to determine the schema for the record. Available strategies are:
For more information about the behavior of each strategy read the main processor description. |
sdk.schema.decode.key.enabled | bool | true | Whether to decode the record key using its corresponding schema from the schema registry. |
sdk.schema.decode.payload.enabled | bool | true | Whether to decode the record payload using its corresponding schema from the schema registry. |
sdk.schema.encode.key.enabled | bool | true | Whether to encode the record key using its corresponding schema from the schema registry. |
sdk.schema.encode.payload.enabled | bool | true | Whether to encode the record payload using its corresponding schema from the schema registry. |
Examples
Auto-register schema
This example shows the usage of the avro.encode
processor
with the autoRegister
schema strategy. The processor encodes the record's
.Payload.After
field using the schema that is extracted from the data
and registered on the fly under the subject example-autoRegister
.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.encode"
settings:
field: ".Payload.After"
schema.autoRegister.subject: "example-autoRegister"
schema.strategy: "autoRegister"
Name | Value |
---|---|
field | .Payload.After |
schema.autoRegister.subject | example-autoRegister |
schema.strategy | autoRegister |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdC1wb3NpdGlvbg==", | 2 | "position": "dGVzdC1wb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | "key1": "val1" | 5 | "key1": "val1" | ||
6 | }, | 6 | }, | ||
7 | "key": null, | 7 | "key": null, | ||
8 | "payload": { | 8 | "payload": { | ||
9 | "before": null, | 9 | "before": null, | ||
10 | - | "after": { | 10 | + | "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002������\u0001@\u0001\u0006bar\u0000\u0002" |
11 | - | "myFloat": 2.3, | |||
12 | - | "myInt": 1, | |||
13 | - | "myMap": { | |||
14 | - | "bar": 2.2, | |||
15 | - | "foo": true | |||
16 | - | }, | |||
17 | - | "myString": "bar", | |||
18 | - | "myStruct": { | |||
19 | - | "bar": false, | |||
20 | - | "foo": 1 | |||
21 | - | } | |||
22 | - | } | |||
23 | } | 11 | } | ||
24 | } | 12 | } |
Pre-register schema
This example shows the usage of the avro.encode
processor
with the preRegistered
schema strategy. When using this strategy, the
schema has to be manually pre-registered. In this example we use the following schema:
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}
The processor encodes the record's.Key
field using the above schema.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.encode"
settings:
field: ".Key"
schema.preRegistered.subject: "example-preRegistered"
schema.preRegistered.version: "1"
schema.strategy: "preRegistered"
Name | Value |
---|---|
field | .Key |
schema.preRegistered.subject | example-preRegistered |
schema.preRegistered.version | 1 |
schema.strategy | preRegistered |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdC1wb3NpdGlvbg==", | 2 | "position": "dGVzdC1wb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | "key1": "val1" | 5 | "key1": "val1" | ||
6 | }, | 6 | }, | ||
7 | - | "key": { | 7 | + | "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", |
8 | - | "myInt": 1, | |||
9 | - | "myString": "bar" | |||
10 | - | }, | |||
11 | "payload": { | 8 | "payload": { | ||
12 | "before": null, | 9 | "before": null, | ||
13 | "after": null | 10 | "after": null | ||
14 | } | 11 | } | ||
15 | } | 12 | } |