Skip to main content

Schema Extraction

Source and destination connectors can be configured to automatically extract the schema from the key and payload of a record. This is especially useful when the data is structured and the schema is known in advance. By default, Conduit extracts the schema from the key and the payload of a record and encodes them using the extracted schema.

Configuration parameters

These are the configuration parameters that control schema extraction on a source connector (Note that sdk.schema.extract.payload.enabled and sdk.schema.extract.key.enabled are also available on destination connectors):

  • sdk.schema.extract.type: The type of schema extraction to perform. Supported value is avro.
  • sdk.schema.extract.payload.enabled: A boolean value that indicates whether the payload should be extracted.
  • sdk.schema.extract.payload.subject: The subject of the payload schema.
  • sdk.schema.extract.key.enabled: A boolean value that indicates whether the key should be extracted.
  • sdk.schema.extract.key.subject: The subject of the key schema.

Example

The below pipeline will generate a single record and write it to a file. Notice that it's configured so that the generator source does not extract the schema or encode the data.

version: "2.2"
pipelines:
- id: generator-to-file
status: running
name: generator-to-file
description: Generates a single record, no schema generated, writes to file
connectors:
- id: file-src
type: source
plugin: builtin:generator
name: file-src
settings:
recordCount: "1"
collections.users.format.type: structured
collections.users.format.options.id: int
collections.users.format.options.name: string

sdk.schema.extract.payload.enabled: false
sdk.schema.extract.key.enabled: false

- id: file-dest
type: destination
plugin: builtin:file
name: file-dest
settings:
path: /tmp/file-destination.txt

When the pipeline is run, /tmp/file-destination.txt will contain output similar to this:

{
"position": "MQ==",
"operation": "create",
"metadata": {
"conduit.source.connector.id": "generator-to-file:file-src",
"opencdc.collection": "users",
"opencdc.createdAt": "1723046776830339829"
},
"key": "c2F1cm9wc2lkYW4=",
"payload": {
"before": null,
"after": {
"id": 7819649577989235000,
"name": "Iambe"
}
}
}

Notice that the written record doesn't contain any schema information in its metadata. However, if you leave the schema extraction enabled, then you'll see something below in the record's metadata:

"opencdc.payload.schema.subject": "generator-to-file:file-src:users.payload",
"opencdc.payload.schema.version": "1"