Connector Configuration
A source or destination connector's configuration is provided through a struct, with typed parameter values, which makes it simple to work with throughout the connector code.
Here's an example:
type SourceConfig struct {
sdk.DefaultSourceMiddleware
// Number of records to be generated (0 means infinite).
RecordCount int
}
type Source struct {
config SourceConfig
}
func (s *Source) Config() sdk.SourceConfig {
return &s.config
}
func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
// do something with s.config.RecordCount
}
// other source connector methods
The Conduit Connector SDK will automatically parse a user configuration (e.g.
the one in
a pipeline configuration file) into
the instance returned by Config()
.
As it can be seen from the example, every configuration struct should embed
either sdk.DefaultSourceMiddleware
or sdk.DefaultDestinationMiddleware
,
which ensures that certain functionality is automatically added to the source or
destination connector.
Parameter names
Parameter names are used in configuration maps that are populated by a user. A
parameter's name is taken from a field's json
tag. If the tag is not present,
then the parameter name is the field name written in camel case.
Example: no json
tag
type SourceConfig struct {
sdk.DefaultSourceMiddleware
// Number of records to be generated (0 means infinite).
RecordCount int
}
In this case, the parameter name is recordCount
. In a pipeline configuration
file, the connector settings will be as follows:
connectors:
- id: source1
type: source
plugin: foo-connector
settings:
recordCount: 100
Example: with json
tag
type SourceConfig struct {
sdk.DefaultSourceMiddleware
// Number of records to be generated (0 means infinite).
RecordCount int `json:rec_count`
}
In this case, the parameter name is rec_count
. In a pipeline configuration
file, the connector settings will be as follows:
connectors:
- id: source1
type: source
plugin: foo-connector
settings:
rec_count: 100
Parameter validation
The Connector SDK can validate input user values. Such validations are specified through validation tags.
type SourceConfig struct {
sdk.DefaultSourceMiddleware
// Number of records to be generated (0 means infinite).
RecordCount int `json:"recordCount" validate:"gt=-1"`
}
The supported validations are described in the following sections:
required
A boolean tag to indicate if a field is required or not. If it is added to the validate tag without a value, then we assume the field is required.
NameRequired string `validate:"required"`
NameRequired2 string `validate:"required=true"`
NameNotRequired string `validate:"required=false"`
lt
or less-than
Takes an int or a float value, indicated that the parameter should be less than the value provided.
gt
or greater-than
Takes an int or a float value, indicated that the parameter should be greater than the value provided.
Age int `validate:"gt=0,lt=200"`
Age2 float `validate:"greater-than=0,less-than=200.2"`
inclusion
Validates that the parameter value is included in a specified list, this list
values are separated using a pipe character |
.
Gender string `validate:"inclusion=male|female|other"`
exclusion
Validates that the parameter value is NOT included in a specified list, this
list values are separated using a pipe character |
.
Color string `validate:"exclusion=red|green"`
regex
Validates that the parameter value matches the regex pattern.
Email string `validate:"regex=^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$"`
Special parameter types
Slices
Comma-separated values in a user configuration map is automatically parsed into a slice. For example, let's say we have the following configuration struct:
type SourceConfig struct {
MyIntSlice []int
}
and the following configuration provided by a user:
- id: source1
type: source
plugin: builtin:your-connector
name: source1
settings:
myIntSlice: "1,2,3"
then the value that the SDK will parse is the following:
SourceConfig{
MyIntSlice: []int{1, 2, 3},
}
Dynamic parameter names (maps of configurations)
The Connector SDK supports structs that are nested in maps. This is especially useful when multiple entities need to be configured, like multiple tables. Here's an example:
type Config struct {
TableConfigs map[string]TableConfig `json:"tables"`
}
type TableConfig struct {
// SortingColumn allows to force using a custom column to sort the snapshot.
SortingColumn string `json:"sortingColumn"`
}
This allows a user to use the following configuration:
- id: source1
type: source
plugin: builtin:your-connector
name: source1
settings:
tables.users_table.sortingColumn: updated_at
tables.admins.sortingColumn: last_updated_at
Custom parameter validation and parsing
If custom validation and parsing is needed, it can be added through the
Validate()
method, as in the example below:
func (c *SourceConfig) Validate(ctx context.Context) error {
// custom validation can be added here
return nil
}
Middleware
We recommend that configuration structs embed SDK middleware structs, so that certain commonly used functionalities are automatically added to connectors.
If there's a strong reason for not embedding any of the middleware, then a
configuration still needs to embed sdk.UnimplementedSourceConfig
/
sdk.UnimplementedDestinationConfig
.
Source configuration middleware
When a source configuration struct embeds sdk.DefaultSourceMiddleware
, the
following functionality is automatically added to the source:
- batching (i.e. the source can return data in batches)
- encoding with a schema (i.e. if the data has an attached schema, it will be encoded with it)
- customizing the schema context name
- auto-generating a schema
A developer can choose to add all the mentioned features by embedding
sdk.DefaultSourceMiddleware
, embedding only some of those, or none.
The default configuration values can be set in NewSource()
in the following
way:
func NewSource() sdk.Source {
return sdk.SourceWithMiddleware(&Source{
config: source.Config{
DefaultSourceMiddleware: sdk.DefaultSourceMiddleware{
SourceWithSchemaExtraction: sdk.SourceWithSchemaExtraction{
PayloadEnabled: lang.Ptr(false),
},
},
},
})
}
Destination configuration middleware
When a destination configuration struct embeds sdk.DefaultDestinationMiddleware
, the
following functionalities are automatically added to the destination:
- batching (i.e. the destination can write data in batches)
- rate limiting
- record formatting
- automatically decoding a record using its schema
A developer can choose to add all the mentioned features by embedding
sdk.DefaultDestinationMiddleware
, embedding only some of those, or none.
The default configuration values can be set in NewDestination()
in the following
way:
func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{
config: destination.Config{
DefaultDestinationMiddleware: sdk.DefaultDestinationMiddleware{
DestinationWithBatch: sdk.DestinationWithBatch{
BatchSize: 100,
},
},
},
})
}