Getting Started with Processors
A processor is a component that operates on a single record that flows through a pipeline. It can either change the record (i.e. transform it) or filter it out based on some criteria. Since they are part of pipelines, making yourself familiar with pipeline semantics is highly recommended.
Processors are optional components in a pipeline, i.e. a pipeline can be started without them. They are always attached to a single parent, which can be either a connector or a pipeline. With that, we can say that we have the following types of processors:
- Source processors: these processors only receive messages originating at a specific source connector. Source processors are created by specifying the corresponding source connector as the parent entity.
- Pipeline processors: these processors receive all messages that flow through the pipeline, regardless of the source or destination. Pipeline processors are created by specifying the pipeline as the parent entity.
- Destination processors: these processors receive only messages that are meant to be sent to a specific destination connector. Destination processors are created by specifying the corresponding destination connector as the parent entity.
Given that every processor can have one (and only one) parent, processors cannot be shared. In case the same processing needs to happen for different sources or destinations, you have two options:
- If records from all sources (or all destinations) need to be processed in the same way, then you can create a pipeline processor
- If records from some, but not all, sources (or destinations) need to be processed in the same way, then you need to create multiple processors (one for each source or destination) and configure them in the same way.
Creating a processor through a pipeline configuration file
Creating a pipeline processor through a pipeline configuration file can be done as below:
version: 2.0
pipelines:
- id: example-pipeline
connectors:
# define source and destination connectors
# ...
processors:
- id: extract-name
type: extractfieldpayload
settings:
field: name
Similarly, we can configure a connector processor, i.e. a processor attached to a connector:
version: 2.0
pipelines:
- id: example-pipeline
connectors:
- id: conn1
# other connector configuration
processors:
- id: extract-name
type: extractfieldpayload
settings:
field: name
# other connectors
The documentation about pipeline configuration files can be found here.
Creating a processor through the HTTP API
Processors are created through the /processors
endpoint. Here's an example:
POST /v1/processors
{
// type of the processor in Conduit
"type": "extractfieldpayload",
"parent": {
// type of parent: TYPE_CONNECTOR or TYPE_PIPELINE
"type": "TYPE_CONNECTOR",
// parent ID (connector ID in this case)
"id": "aed07589-44d8-4c68-968c-1f6c5197f13b"
},
"config": {
"settings": {
// configuration map for this processor
"field": "name"
}
}
}
The request to create a processor is described in api.swagger.json.
Supported processors
Conduit provides a number of built-in processors, such as filtering fields, replacing them, posting payloads to HTTP endpoints etc. Conduit also provides the ability to write custom processors in JavaScript.