Skip to main content

kafka

Author: Meroxa, Inc.Conduit team logo
7

Latest release

Description

Source

A Kafka source connector is represented by a single consumer in a Kafka consumer group. By virtue of that, a source's logical position is the respective consumer's offset in Kafka. Internally, though, we're not saving the offset as the position: instead, we're saving the consumer group ID, since that's all which is needed for Kafka to find the offsets for our consumer.

A source is getting associated with a consumer group ID the first time the Read() method is called.

Destination

The destination connector writes records to Kafka.

Output format

The output format can be adjusted using configuration options provided by the connector SDK:

  • sdk.record.format: used to choose the format
  • sdk.record.format.options: used to configure the specifics of the chosen format

See this article for more info on configuring the output format.

Batching

Batching can also be configured using connector SDK provided options:

  • sdk.batch.size: maximum number of records in batch before it gets written to the destination (defaults to 0, no batching)
  • sdk.batch.delay: maximum delay before an incomplete batch is written to the destination (defaults to 0, no limit)

Source Parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-source
type: source
plugin: "kafka"
name: example-source
settings:
# Servers is a list of Kafka bootstrap servers, which will be used to
# discover all the servers in a cluster.
# Type: string
servers: ""
# Topics is a comma separated list of Kafka topics to read from.
# Type: string
topics: ""
# CACert is the Kafka broker's certificate.
# Type: string
caCert: ""
# ClientCert is the Kafka client's certificate.
# Type: string
clientCert: ""
# ClientID is a unique identifier for client connections established
# by this connector.
# Type: string
clientID: "conduit-connector-kafka"
# ClientKey is the Kafka client's private key.
# Type: string
clientKey: ""
# GroupID defines the consumer group id.
# Type: string
groupID: ""
# InsecureSkipVerify defines whether to validate the broker's
# certificate chain and host name. If 'true', accepts any certificate
# presented by the server and any host name in that certificate.
# Type: bool
insecureSkipVerify: ""
# ReadFromBeginning determines from whence the consumer group should
# begin consuming when it finds a partition without a committed
# offset. If this options is set to true it will start with the first
# message in that partition.
# Type: bool
readFromBeginning: ""
# RetryGroupJoinErrors determines whether the connector will
# continually retry on group join errors.
# Type: bool
retryGroupJoinErrors: "true"
# Mechanism configures the connector to use SASL authentication. If
# empty, no authentication will be performed.
# Type: string
saslMechanism: ""
# Password sets up the password used with SASL authentication.
# Type: string
saslPassword: ""
# Username sets up the username used with SASL authentication.
# Type: string
saslUsername: ""
# TLSEnabled defines whether TLS is needed to communicate with the
# Kafka cluster.
# Type: bool
tls.enabled: ""
# Maximum delay before an incomplete batch is read from the source.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets read from the source.
# Type: int
sdk.batch.size: "0"
# Specifies whether to use a schema context name. If set to false, no
# schema context name will be used, and schemas will be saved with the
# subject name specified in the connector (not safe because of name
# conflicts).
# Type: bool
sdk.schema.context.enabled: "true"
# Schema context name to be used. Used as a prefix for all schema
# subject names. If empty, defaults to the connector ID.
# Type: string
sdk.schema.context.name: ""
# Whether to extract and encode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "false"
# The subject of the key schema. If the record metadata contains the
# field "opencdc.collection" it is prepended to the subject name and
# separated with a dot.
# Type: string
sdk.schema.extract.key.subject: "key"
# Whether to extract and encode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "false"
# The subject of the payload schema. If the record metadata contains
# the field "opencdc.collection" it is prepended to the subject name
# and separated with a dot.
# Type: string
sdk.schema.extract.payload.subject: "payload"
# The type of the payload schema.
# Type: string
sdk.schema.extract.type: "avro"

Destination Parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-destination
type: destination
plugin: "kafka"
name: example-destination
settings:
# Servers is a list of Kafka bootstrap servers, which will be used to
# discover all the servers in a cluster.
# Type: string
servers: ""
# Acks defines the number of acknowledges from partition replicas
# required before receiving a response to a produce request. None =
# fire and forget, one = wait for the leader to acknowledge the
# writes, all = wait for the full ISR to acknowledge the writes.
# Type: string
acks: "all"
# BatchBytes limits the maximum size of a request in bytes before
# being sent to a partition. This mirrors Kafka's max.message.bytes.
# Type: int
batchBytes: "1000012"
# CACert is the Kafka broker's certificate.
# Type: string
caCert: ""
# ClientCert is the Kafka client's certificate.
# Type: string
clientCert: ""
# ClientID is a unique identifier for client connections established
# by this connector.
# Type: string
clientID: "conduit-connector-kafka"
# ClientKey is the Kafka client's private key.
# Type: string
clientKey: ""
# Compression set the compression codec to be used to compress
# messages.
# Type: string
compression: "snappy"
# DeliveryTimeout for write operation performed by the Writer.
# Type: duration
deliveryTimeout: ""
# InsecureSkipVerify defines whether to validate the broker's
# certificate chain and host name. If 'true', accepts any certificate
# presented by the server and any host name in that certificate.
# Type: bool
insecureSkipVerify: ""
# Mechanism configures the connector to use SASL authentication. If
# empty, no authentication will be performed.
# Type: string
saslMechanism: ""
# Password sets up the password used with SASL authentication.
# Type: string
saslPassword: ""
# Username sets up the username used with SASL authentication.
# Type: string
saslUsername: ""
# TLSEnabled defines whether TLS is needed to communicate with the
# Kafka cluster.
# Type: bool
tls.enabled: ""
# Topic is the Kafka topic. It can contain a [Go
# template](https://pkg.go.dev/text/template) that will be executed
# for each record to determine the topic. By default, the topic is the
# value of the `opencdc.collection` metadata field.
# Type: string
topic: "{{ index .Metadata "opencdc.collection" }}"
# Maximum delay before an incomplete batch is written to the
# destination.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets written to the destination.
# Type: int
sdk.batch.size: "0"
# Allow bursts of at most X records (0 or less means that bursts are
# not limited). Only takes effect if a rate limit per second is set.
# Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
# effective batch size will be equal to `sdk.rate.burst`.
# Type: int
sdk.rate.burst: "0"
# Maximum number of records written per second (0 means no rate
# limit).
# Type: float
sdk.rate.perSecond: "0"
# The format of the output record. See the Conduit documentation for a
# full list of supported formats
# (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
# Type: string
sdk.record.format: "opencdc/json"
# Options to configure the chosen output record format. Options are
# normally key=value pairs separated with comma (e.g.
# opt1=val2,opt2=val2), except for the `template` record format, where
# options are a Go template.
# Type: string
sdk.record.format.options: ""
# Whether to extract and decode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# Whether to extract and decode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"