Batching
Destination connectors can be configured to process records in batches. This is especially useful when the destination resource can receive multiple records in a single round-trip. By default, Conduit processes records one by one.
Enabling batch processing can improve the performance of the connector, as it reduces the number of round-trips to the destination resource. However, it can also increase the memory usage of the connector, as it needs to store multiple records in memory before flushing the batch. It can also increase the latency of the connector, as it needs to wait for the batch to be full.
Configuration parameters
There are two connector configuration parameters which control the batch size:
sdk.batch.size
: used to configure the number of records to be sent in a single batch. Default value is1
.sdk.batch.delay
: used to configure the maximum time to wait for a batch to be full before sending it to the destination resource. Default value is0
.
Examples
Example 1: Batch size
The following pipeline is configured to process batches of 100 records when writing to the destination resource. Note that the source connector is generating records at a rate of 10 records per second, meaning that records will be flushed approximately every 10 seconds.
version: 2.2
pipelines:
- id: pipeline1
status: running
name: pipeline1
description: 'A pipeline batching 100 records at a time.'
connectors:
- id: source1
type: source
plugin: builtin:generator
name: source1
settings:
rate: 10
operations: "create"
format.type: "structured"
format.options.name: "string"
format.options.company: "string"
- id: destination1
type: destination
plugin: "builtin:file"
name: destination1
settings:
sdk.batch.size: 100
path: /tmp/file-destination.txt
Example 2: Batch delay
The following pipeline is configured to collect records for 5 seconds before flushing the batch to the destination resource. Note that the source connector is generating records at a rate of 10 records per second, meaning that a batch will contain approximately 50 records.
version: 2.2
pipelines:
- id: pipeline1
status: running
name: pipeline1
description: 'A pipeline batching 100 records at a time.'
connectors:
- id: source1
type: source
plugin: builtin:generator
name: source1
settings:
rate: 10
operations: "create"
format.type: "structured"
format.options.name: "string"
format.options.company: "string"
- id: destination1
type: destination
plugin: "builtin:file"
name: destination1
settings:
sdk.batch.delay: "5s"
path: /tmp/file-destination.txt
Example 3: Batch size and delay
The following pipeline is configured to collect batches of 100 records for up to 5 seconds before flushing them to the destination resource. This means that records will be flushed at most every 5 seconds, or sooner if the batch collects 100 records.
version: 2.2
pipelines:
- id: pipeline1
status: running
name: pipeline1
description: 'A pipeline batching 100 records at a time.'
connectors:
- id: source1
type: source
plugin: builtin:generator
name: source1
settings:
rate: 10
operations: "create"
format.type: "structured"
format.options.name: "string"
format.options.company: "string"
- id: destination1
type: destination
plugin: "builtin:file"
name: destination1
settings:
sdk.batch.size: 100
sdk.batch.delay: "5s"
path: /tmp/file-destination.txt