Getting Started with Processors
A processor is a component that operates on a single record that flows through a pipeline. It can either transform the record, or filter it out based on some criteria. Since they are part of pipelines, making yourself familiar with pipeline semantics is highly recommended.
Processors are optional components in a pipeline (i.e. a pipeline can be started without them), and they are always attached to a single parent, which can be either a connector or a pipeline:
- Connector processors:
- Source processors only receive messages originating at a specific source connector. Source processors are created by specifying the corresponding source connector as the parent entity.
- Destination processors only receive messages that are meant to be sent to a specific destination connector. Destination processors are created by specifying the corresponding destination connector as the parent entity.
- Pipeline processors receive all messages that flow through the pipeline, regardless of the source or destination. Pipeline processors are created by specifying the pipeline as the parent entity.
Processor types
When it comes to using a processor, Conduit supports different types:
- Built-in processors will perform the most common operations you could expect such as filtering fields, replacing fields, posting payloads to a HTTP endpoint, etc. These are already coming as part of Conduit, and you can simply start using them with a bit of configuration. Check out this document to see everything that's available.
- Standalone processors are the ones you could write yourself to do anything that's not already covered by the Built-in ones. Here's more information about them.
How to use a processor
In these following examples, we're using the json.decode
, but you could use any other you'd like from our Built-in ones, or even reference your own Standalone processor.
When referencing the name of a processor plugin there are different ways you can make sure you're using the one you'd like. Please, check out the Referencing Processors documentation for more information.
Using a pipeline configuration file
Using a pipeline processor
Creating a pipeline processor through a pipeline configuration file can be done as below:
version: 2.2
pipelines:
- id: example-pipeline
connectors:
# define source and destination connectors
# ...
processors:
- id: extract-name
plugin: json.decode
settings:
field: name
Using a connector processor
Similarly, we can configure a connector processor, i.e. a processor attached to a connector:
version: 2.2
pipelines:
- id: example-pipeline
connectors:
- id: conn1
# other connector configuration
processors:
- id: extract-name
plugin: json.decode
settings:
field: name
# other connectors
The documentation about how to configure processors in pipeline configuration files can be found here.
Using the HTTP API
The processor endpoints live under the /v1/processors
namespace, and to attach a processor to either connector or a pipeline, you could do a POST
request to /v1/processors
specifying parent.type
as TYPE_PIPELINE
or TYPE_CONNECTOR
. Default value is TYPE_UNSPECIFIED
.
Here's how the entire request could look like.
To list all the different API HTTP requests you could perform check out our HTTP API. These are also described in our api.swagger.json.