Skip to main content

Build your own

You can build your own Conduit standalone processors in Go using the Processor SDK.

We currently only provide a Go SDK for processors. However, if you'd like to use another language for writing a processor, feel free to open an issue and request a specific language SDK. You can also read how standalone processors work under the hood and build an SDK yourself.

The Processor SDK exposes two ways of building processors, one for simple processors without configuration parameters, and another that gives you full control over the processor.

Using sdk.NewProcesorFunc

If the processor is very simple and can be reduced to a single function (e.g. no configuration needed), then we can use sdk.NewProcessorFunc() to create a processor as below:

//go:build wasm

package main

import (
sdk "github.com/conduitio/conduit-processor-sdk"
)

func main() {
sdk.Run(&sdk.NewProcessorFunc(
sdk.Specification{Name: "simple-processor"}),
func(ctx context.Context, rec opencdc.Record) (opencdc.Record, error) {
// do something with the record
return rec
},
)
}

However, if the processor needs configuration, or is more complicated than only one function, then we should use the full processor approach.

Using sdk.Processor

To build the full-blown processor, the SDK contains an interface called sdk.Processor that contains some methods to be implemented. These methods are:

Specification

Specification contains the metadata for the processor, which can be used to define how to reference the processor, describe what the processor does and the configuration parameters it expects. Here's a list of the fields in the sdk.Specification struct and their descriptions:

  • Name: the name of the processor. Note that the name should be unique across all processors, as it's used to reference the processor in a pipeline (see referencing processors).
  • Version: the version of the processor. This should be a valid semver version and needs to be updated whenever the processor's behavior changes.
  • Summary: a short description of what the processor does (ideally a one-liner).
  • Description: a more detailed description of what the processor does. This field can contain markdown.
  • Author: the author of the processor.
  • Parameters: a map of the processor's configuration parameters. Each parameter should have a name, a type, a description, and a list of validations. Note that the validations defined on a parameter are automatically executed in sdk.ParseConfig (see Configure).

Conduit also provides paramgen, a helpful tool that generates the Parameters map from a Go struct. This allows us to create a configuration struct that contains the processor's parameters, define default values and validations using struct tags, and generate the Parameters map. Check out the ParamGen readme for more details.

info

Note that a processor's name and version need to be unique across all processors, as they are used to reference the processor in a pipeline. If two processors have the same name and version, Conduit will refuse to load them.

Example without ParamGen

You can define the Specification method as below and manually define the parameters map:

package example

import (
"context"

"github.com/conduitio/conduit-commons/config"
sdk "github.com/conduitio/conduit-processor-sdk"
)

func (p *AddFieldProcessor) Specification(context.Context) (sdk.Specification, error) {
return sdk.Specification{
Name: "myAddFieldProcessor",
Summary: "Add a field to the record.",
Description: `This processor lets you configure a field that will
be added to the record into field. If the payload is not
structured data, this processor will panic.`,
Version: "v1.0.0",
Author: "John Doe",
Parameters: map[string]config.Parameter{
"field": {
Type: config.ParameterTypeString,
Description: "Field is the target field that will be set.",
Validations: []config.Validation{
config.ValidationRequired{},
},
},
"name": {
Type: config.ParameterTypeString,
Description: "Name is the value of the field to add.",
Validations: []config.Validation{
config.ValidationRequired{},
},
},
},
}, nil
}

Example with ParamGen

  1. Add a struct that contains the needed parameters:
//go:generate paramgen -output=addField_paramgen.go addFieldConfig

type addFieldConfig struct {
// Field is the target field that will be set.
Field string `json:"field" validate:"required"`
// Name is the value of the field to add.
Name string `json:"value" validate:"required"`
}
  1. Generate the parameters by running:
paramgen -output=addField_paramgen.go addFieldConfig

This will generate a file called addField_paramgen.go that contains the generated parameters map, which in turn can be used under specification to make it simpler and shorter, example:

//go:generate paramgen -output=addField_paramgen.go addFieldConfig

type addFieldConfig struct {
// Field is the target field that will be set.
Field string `json:"field" validate:"required"`
// Name is the value of the field to add.
Name string `json:"value" validate:"required"`
}

func (p *AddFieldProcessor) Specification(context.Context) (sdk.Specification, error) {
return sdk.Specification{
Name: "myAddFieldProcessor",
Summary: "Add a field to the record.",
Description: `This processor lets you configure a field that will
be added to the record into field. If the payload is not
structured data, this processor will panic.`,
Version: "v1.0.0",
Author: "John Doe",
Parameters: addFieldConfig{}.Parameters(), // generated by paramgen
}, nil
}

Configure

Configure is the first function to be called in a processor. It provides the processor with the configuration that needs to be validated and stored to be used in other methods. This method should not open connections or any other resources. It should solely focus on parsing and validating the configuration itself.

To add custom validations, simply validate the parameters manually under this method, and return an error if the config map is not valid. On the other hand, using the utility function below would apply the builtin validations to the configuration.

The Processor SDK provides some useful utility functions to help implementing this method:

  • sdk.ParseConfig: used to sanitize the configuration, apply defaults, validate it using builtin validations, and copy the values into the target object.
  • sdk.NewReferenceResolver: creates a new reference resolver from the input string. The input string is a reference to a field in a record, check [Referencing record fields](/docs/using/processors/referencing-fields for more details. The method will return a resolver that can be used to resolve a reference to the specified field in a record and manipulate that field (get, set and delete the value, or rename the referenced field).

Using these utility functions, most of the Configure method implementations would look something like:

func (p *AddFieldProcessor) Configure(ctx context.Context, m map[string]string) error {
err := sdk.ParseConfig(ctx, m, &p.config, addFieldConfig{}.Parameters())
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

resolver, err := sdk.NewReferenceResolver(p.config.Field)
if err != nil {
return fmt.Errorf("failed to parse the %q param: %w", "field", err)
}
p.referenceResolver = resolver
return nil
}

Open

This function is used to open connections, start background jobs, or initialize resources that are needed for the processor.

Note that implementing this function is optional.

Process

Process is the main show of the processor, here we would manipulate the records received and return the processed ones.

After processing the slice of records that the function got, and if no errors occurred, it should return a slice of sdk.ProcessedRecord that matches the length of the input slice. However, if an error occurred while processing a specific record, then it should be reflected in the ProcessedRecord with the same index as the input record, and should return the slice at that index length.

For the interface sdk.ProcessedRecord, there are three main processed records types:

  1. sdk.SingleRecord: is a single processed record that will continue down the pipeline.
  2. sdk.FilterRecord: is a record that will be acked and filtered out of the pipeline.
  3. sdk.ErrorRecord: is a record that failed to be processed and will be nacked.

Example:

func (p *AddFieldProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, record := range records {
resolver, err := p.referenceResolver.Resolve(&record)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
err = resolver.Set(p.config.Name)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
out = append(out, sdk.SingleRecord(record))
}
return out
}

Note that Process should be idempotent, as it may be called multiple times with the same records (e.g. after a restart when records were not flushed).

Teardown

This function acts like a counterpart to Open, use this function to close any open connections or resources that were initialized under Open.

Note that implementing this function is also optional.

Entrypoint

Since the processor will be run as a standalone WASM plugin, we need to add an entrypoint to it. Also, we should add a go:build tag to ensure that this file is only included in the build when targeting WebAssembly.

the entrypoint will have to be in a separate package (i.e. folder), by Go convention it's normally under cmd/my-binary-name, so it would look something like:

.
├── my-processor.go # actual processor implementation
└── cmd
└── processor
└── main.go # entrypoint

Entry point example:

//go:build wasm

package main

import (
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/my-processor/example"
)

func main() {
sdk.Run(example.NewProcessor())
}

Check Compiling the processor for what to do next, and how to compile the processor.

Schemas

Processors have access to the schemas in the used Schema Registry. By default, if the pipeline uses a schema registry and the processor gets a record with the schema info in the Metadata, then the processor will have a middleware enabled. The middleware will decode the records before they are passed to the processor using their corresponding schema from the schema registry, and encode them again after the processing is done. To change this default behaviour, you can change these processor's configurations accordingly:

  • sdk.schema.decode.key.enabled: Whether to decode the record key using its corresponding schema from the schema registry.
  • sdk.schema.decode.payload.enabled: Whether to decode the record payload using its corresponding schema from the schema registry.
  • sdk.schema.encode.key.enabled: Whether to encode the record key using its corresponding schema from the schema registry.
  • sdk.schema.encode.payload.enabled: Whether to encode the record payload using its corresponding schema from the schema registry.

Example of a pipeline configuration file with these parameters:

version: 2.2
pipelines:
- id: test
status: running
connectors:
- id: employees-source
type: source
plugin: standalone:generator
settings:
rate: 1
collections.str.format.type: structured
collections.str.format.options.id: int
collections.str.format.options.name: string
collections.str.format.options.admin: bool
collections.str.operations: create
- id: logger-dest
type: destination
plugin: standalone:log
processors:
- id: access-schema
plugin: standalone:processor-simple
sdk.schema.decode.key.enabled: false # disabling the default behaviour
sdk.schema.encode.key.enabled: false # disabling the default behaviour

Processors can access the Schema Registry and the schemas using two utility functions:

  1. schema.Create : You can use this utility function to create a new schema and add it to the Schema Registry. This function can be called in any of the main processor methods.

Example:

func (p *exampleProcessor) Open(ctx context.Context) error {
// Add a new schema to the schema registry before starting to process the records.
schemaBytes := []byte(`{
"name": "record",
"type": "record",
"fields": [
{
"name": "admin",
"type": "boolean"
},
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
}`)
_, err := schema.Create(ctx, schema.TypeAvro, "subject1", schemaBytes)
return err
}
  1. schema.Get: You can use this utility function to get a schema from the Schema Registry using its version and subject. This function can be called in any of the main processor methods.

Example:

func (p *exampleProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, record := range records {
// get the schema subject name from the metadata
subject, err := rec.Metadata.GetPayloadSchemaSubject()
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
// get the schema version from the metadata
version, err := rec.Metadata.GetPayloadSchemaVersion()
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
// get the schema using the subject and the version
get, err := schema.Get(ctx, subject, version)
// print the schema
fmt.Println(string(get.Bytes))
}
return out
}

Example output (the printed schema):

{
"name": "record",
"type": "record",
"fields": [
{
"name": "admin",
"type": "boolean"
},
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
}

Logging

You can get a zerolog.logger instance from the context using the sdk.Logger function. This logger is pre-configured to append logs in the format expected by Conduit.

Keep in mind that logging in the hot path (i.e. in the Process method) can have a significant impact on the performance of the processor, therefore we recommend using the Trace level for logs that are not essential for the operation of the processor.

Example:

func (p *AddFieldProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
logger := sdk.Logger(ctx)
logger.Trace().Msg("Processing records")
// ...
}

Compiling the processor

Conduit uses WebAssembly to run standalone processors. This means that we need to build the processor as a WebAssembly module. You can do this by setting the environment variables GOARCH=wasm and GOOS=wasip1 when running go build. This will produce a WebAssembly module that can be used as a processor in Conduit.

So, to compile the processor, run:

GOARCH=wasm GOOS=wasip1 go build -o processor.wasm cmd/processor/main.go

Congratulations! Now you have a new standalone processor. Check Standalone processors for details on how to use your standalone processor in a Conduit pipeline.

note

To see more standalone processor examples, check out our example processor repository.

scarf pixel conduit-site-docs-developing-processors