Skip to main content

How to build a generator-to-log pipeline

In this guide, we'll learn how to build a basic Conduit pipeline. It will use two built-in connectors:

  • A generator source (a source connector that generates random data). Our pipeline will generate imaginary employee data.
  • A logging destination (a destination that simply logs all incoming records).

As you'll see, all it takes to accomplish this is to write a pipeline configuration file!

The steps below will guide you through installing Conduit and gradually building the pipeline. If you're in a hurry, you can skip ahead and run the pipeline immediately.

Step 1: Install Conduit

Download the latest Conduit release to a directory of your choice. In this guide, the directory will be ~/conduit-playground.

Step 2: Create a pipeline configuration file

Pipelines can be created through YAML configuration files, that are, by default, put into a directory called pipelines.

With the following command, we'll create the pipelines directory and a file that will contain our pipeline's configuration:

mkdir pipelines && touch pipelines/generator-to-log.yaml

The directory layout should be as below:

~/conduit-playground
├── conduit
└── pipelines
└── generator-to-log.yaml

Step 3: The pipeline's basics

Next, write the following to generator-to-log.yaml:

version: "2.2"
pipelines:
- id: pipeline1
# Tells Conduit to run the pipeline automatically
status: running
name: pipeline1
description: A generator-to-log pipeline

Step 4: Add the generator source

Now we can start adding connectors to the pipeline. Add a source connector under connectors in the configuration file:

    connectors:
- id: source1
type: source
plugin: builtin:generator
settings:
format.type: structured
format.options.id: int
format.options.name: string
rate: "2s"

The fields id, type, plugin and settings are found in every connector. Every connector has its own set of configuration parameters. The generator's configuration can be found here.

The configuration above instructs the generator to produce a record every 2 seconds, each with a structured payload. The payload will contain two fields: id (an integer) and name (a string).

Step 5: Add the logging destination

The last piece in the configuration is the destination connector:

      - id: destination1
type: destination
plugin: builtin:log

We're fine with the log connector's default configuration, so we're leaving out the settings field.

Step 6: Run the pipeline

To summarize the steps from above, we have the following directory structure:

~/conduit-playground
├── conduit
└── pipelines
└── generator-to-log.yaml

The file generator-to-log.yaml has the following content:

version: "2.2"
pipelines:
- id: pipeline1
status: running
name: pipeline1
description: A generator-to-log pipeline
connectors:
- id: source1
type: source
plugin: builtin:generator
settings:
format.type: structured
format.options.id: int
format.options.name: string
rate: "2s"
- id: destination1
type: destination
plugin: builtin:log

Now you can run Conduit:

./conduit
....
.::::::::::.
.:::::‘‘‘‘:::::.
.:::: ::::.
.:::::::: ::::::::.
`:::::::: ::::::::‘
`:::: ::::‘
`:::::....:::::‘
`::::::::::‘ Conduit v0.12.0 linux/amd64
‘‘‘‘
2024-09-19T10:34:54+00:00 INF All 1 tables opened in 0s component=badger.DB
2024-09-19T10:34:54+00:00 INF Discard stats nextEmptySlot: 0 component=badger.DB
2024-09-19T10:34:54+00:00 INF Set nextTxnTs to 5 component=badger.DB
2024-09-19T10:34:54+00:00 INF loading processor plugins from directory ~/conduit-playground/processors ... component=plugin.processor.standalone.Registry

You'll also notice the following line:

2024-09-19T10:34:54+00:00 INF pipeline configs provisioned component=provisioning.Service created=["pipeline1"] deleted=[] pipelines_path=./pipelines

that confirms our pipeline was loaded.

So, where do we see the results? We're using the log connector, so our test data will be in the logs:

2024-09-19T10:34:54+00:00 INF component=plugin connector_id=pipeline1:destination1 plugin_name=builtin:log plugin_type=destination record={"key":"Y29iYWx0aWM=","metadata":{"conduit.source.connector.id":"pipeline1:source1","opencdc.createdAt":"1726734894822378094","opencdc.payload.schema.subject":"pipeline1:source1:payload","opencdc.payload.schema.version":"1"},"operation":"create","payload":{"after":{"id":8433756117589358088,"name":"suspicious"},"before":null},"position":"MQ=="}

Congratulations on your first pipeline! You may want to learn more about pipeline configuration files or other features of Conduit.