Skip to main content

Connector Configuration

A source or destination connector's configuration is provided through a struct, with typed parameter values, which makes it simple to work with throughout the connector code.

Here's an example:

type SourceConfig struct {
sdk.DefaultSourceMiddleware

// Number of records to be generated (0 means infinite).
RecordCount int
}

type Source struct {
config SourceConfig
}

func (s *Source) Config() sdk.SourceConfig {
return &s.config
}

func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
// do something with s.config.RecordCount
}

// other source connector methods

The Conduit Connector SDK will automatically parse a user configuration (e.g. the one in a pipeline configuration file) into the instance returned by Config().

As it can be seen from the example, every configuration struct should embed either sdk.DefaultSourceMiddleware or sdk.DefaultDestinationMiddleware, which ensures that certain functionality is automatically added to the source or destination connector.

Parameter names

Parameter names are used in configuration maps that are populated by a user. A parameter's name is taken from a field's json tag. If the tag is not present, then the parameter name is the field name written in camel case.

Example: no json tag

type SourceConfig struct {
sdk.DefaultSourceMiddleware

// Number of records to be generated (0 means infinite).
RecordCount int
}

In this case, the parameter name is recordCount. In a pipeline configuration file, the connector settings will be as follows:

    connectors:
- id: source1
type: source
plugin: foo-connector
settings:
recordCount: 100

Example: with json tag

type SourceConfig struct {
sdk.DefaultSourceMiddleware

// Number of records to be generated (0 means infinite).
RecordCount int `json:rec_count`
}

In this case, the parameter name is rec_count. In a pipeline configuration file, the connector settings will be as follows:

    connectors:
- id: source1
type: source
plugin: foo-connector
settings:
rec_count: 100

Parameter validation

The Connector SDK can validate input user values. Such validations are specified through validation tags.

type SourceConfig struct {
sdk.DefaultSourceMiddleware

// Number of records to be generated (0 means infinite).
RecordCount int `json:"recordCount" validate:"gt=-1"`
}

The supported validations are described in the following sections:

required

A boolean tag to indicate if a field is required or not. If it is added to the validate tag without a value, then we assume the field is required.

  NameRequired    string `validate:"required"`
NameRequired2 string `validate:"required=true"`
NameNotRequired string `validate:"required=false"`

lt or less-than

Takes an int or a float value, indicated that the parameter should be less than the value provided.

gt or greater-than

Takes an int or a float value, indicated that the parameter should be greater than the value provided.

  Age int `validate:"gt=0,lt=200"`
Age2 float `validate:"greater-than=0,less-than=200.2"`

inclusion

Validates that the parameter value is included in a specified list, this list values are separated using a pipe character |.

    Gender string `validate:"inclusion=male|female|other"`

exclusion

Validates that the parameter value is NOT included in a specified list, this list values are separated using a pipe character |.

   Color string `validate:"exclusion=red|green"`

regex

Validates that the parameter value matches the regex pattern.

   Email string `validate:"regex=^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$"`

Special parameter types

Slices

Comma-separated values in a user configuration map is automatically parsed into a slice. For example, let's say we have the following configuration struct:

type SourceConfig struct {
MyIntSlice []int
}

and the following configuration provided by a user:

        - id: source1
type: source
plugin: builtin:your-connector
name: source1
settings:
myIntSlice: "1,2,3"

then the value that the SDK will parse is the following:

SourceConfig{
MyIntSlice: []int{1, 2, 3},
}

Dynamic parameter names (maps of configurations)

The Connector SDK supports structs that are nested in maps. This is especially useful when multiple entities need to be configured, like multiple tables. Here's an example:

type Config struct {
TableConfigs map[string]TableConfig `json:"tables"`
}

type TableConfig struct {
// SortingColumn allows to force using a custom column to sort the snapshot.
SortingColumn string `json:"sortingColumn"`
}

This allows a user to use the following configuration:

        - id: source1
type: source
plugin: builtin:your-connector
name: source1
settings:
tables.users_table.sortingColumn: updated_at
tables.admins.sortingColumn: last_updated_at

Custom parameter validation and parsing

If custom validation and parsing is needed, it can be added through the Validate() method, as in the example below:

func (c *SourceConfig) Validate(ctx context.Context) error {
// custom validation can be added here
return nil
}

Middleware

We recommend that configuration structs embed SDK middleware structs, so that certain commonly used functionalities are automatically added to connectors.

If there's a strong reason for not embedding any of the middleware, then a configuration still needs to embed sdk.UnimplementedSourceConfig/ sdk.UnimplementedDestinationConfig.

Source configuration middleware

When a source configuration struct embeds sdk.DefaultSourceMiddleware, the following functionality is automatically added to the source:

  • batching (i.e. the source can return data in batches)
  • encoding with a schema (i.e. if the data has an attached schema, it will be encoded with it)
  • customizing the schema context name
  • auto-generating a schema

A developer can choose to add all the mentioned features by embedding sdk.DefaultSourceMiddleware, embedding only some of those, or none.

The default configuration values can be set in NewSource() in the following way:

func NewSource() sdk.Source {
return sdk.SourceWithMiddleware(&Source{
config: source.Config{
DefaultSourceMiddleware: sdk.DefaultSourceMiddleware{
SourceWithSchemaExtraction: sdk.SourceWithSchemaExtraction{
PayloadEnabled: lang.Ptr(false),
},
},
},
})
}

Destination configuration middleware

When a destination configuration struct embeds sdk.DefaultDestinationMiddleware, the following functionalities are automatically added to the destination:

A developer can choose to add all the mentioned features by embedding sdk.DefaultDestinationMiddleware, embedding only some of those, or none.

The default configuration values can be set in NewDestination() in the following way:

func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{
config: destination.Config{
DefaultDestinationMiddleware: sdk.DefaultDestinationMiddleware{
DestinationWithBatch: sdk.DestinationWithBatch{
BatchSize: 100,
},
},
},
})
}