Skip to main content

Developing a Source Connector

A Source is responsible for continuously reading data from a third party system and returning it in the form of an OpenCDC Record.

You need to implement the functions required by the Source interface and provide your own implementations. Information about individual functions are listed below. The source.go file is the main file where the functionality of your source connector is implemented.

Source struct

Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Source implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector.

type Source struct {
sdk.UnimplementedSource

config SourceConfig
tail *tail.Tail
}

Source Connector Lifecycle Functions

NewSource()

A constructor function for your Source struct. Note that this is the same function that should be set as the value of Connector.NewSource. The constructor should be used to wrap your Source in the default DefaultSourceMiddleware.

func NewSource() sdk.Source {
// Create Source and wrap it in the default middleware.
return sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware()...,
)
}

Additional options via SourceMiddlewareOption:

In case you need to add additional middleware options, you can do so by passing it to the sdk.SourceWithMiddleware function via sdk.DefaultSourceMiddleware(opts ...SourceMiddlewareOption). Currently, the available source middleware options can be found here.

note

If you're using a source connector that's not generating structured data (i.e. produces raw data), you might want to disable schema extraction by default by overwriting the sdk.SourceWithSchemaExtractionConfig options:

sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware(
// disable schema extraction by default, because the source produces raw data
sdk.SourceWithSchemaExtractionConfig{
PayloadEnabled: lang.Ptr(false),
KeyEnabled: lang.Ptr(false),
},
)...,
)

Parameters()

A map of named Parameters that describe how to configure the connector. This map is typically generates using paramgen.

func (s *Source) Parameters() config.Parameters {
return s.config.Parameters()
}

Configure()

Validates and stores configuration data for the connector. Any complex validation logic should be implemented here.

func (s *Source) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters())
if err != nil {
return err
}
// add custom validations here
return nil
}

Open()

Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function.

Every record read by a source connector has a position attached. The position given to Open() is the position of the record that was the last to be successfully processed end-to-end, before the connector stopped, or nil if no records were read. Hence, a position needs to contain enough information for a source connector to resume reading records from where it exactly stopped.

A position is a slice of bytes that can represent any data structure. In Conduit connectors, it's common to see that a position is actually a struct, that's marshalled into a JSON string. In the example below, the position is an offset within the file being read.

func (s *Source) Open(ctx context.Context, p opencdc.Position) error {
// parse the position
var offset int64
if p != nil {
var err error
offset, err = strconv.ParseInt(string(p), 10, 64)
if err != nil {
return fmt.Errorf("invalid position %v, expected a number", p)
}
}

// seek to the position, i.e. the offset
sdk.Logger(ctx).Info().
Int64("position", offset).
Msgf("seeking...")

t, err := tail.TailFile(
s.config.Path,
tail.Config{
Follow: true,
Location: &tail.SeekInfo{
Offset: offset,
Whence: io.SeekStart,
},
Logger: tail.DiscardingLogger,
},
)
if err != nil {
return fmt.Errorf("could not tail file: %w", err)
}

s.tail = t
return nil
}

Read()

Gathers data from the configured data source and formats it into a opencdc.Record that is returned from the function. The returned opencdc.Record is queued into the pipeline to be consumed by a Destination connector.

func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
select {
case line, ok := <-s.tail.Lines:
if !ok {
return opencdc.Record{}, s.tail.Err()
}
return sdk.Util.Source.NewRecordCreate(
opencdc.Position(strconv.FormatInt(line.SeekInfo.Offset, 10)),
map[string]string{
MetadataFilePath: s.config.Path,
},
opencdc.RawData(strconv.Itoa(line.Num)), // use line number as key
opencdc.RawData(line.Text), // use line content as payload
), nil
case <-ctx.Done():
return opencdc.Record{}, ctx.Err()
}
}

Ack()

Ack signals to the third party system that the record with the supplied position was successfully processed. It's worth noting that while some source connectors need to implement this functionality (e.g. in the case of messaging brokers), others don't have to (e.g. a file source).

func (s *Source) Ack(ctx context.Context, position opencdc.Position) error {
sdk.Logger(ctx).Trace().Msg("record successfully processed")
return nil // no ack needed
}

Teardown()

Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the Open() function should be closed here.

func (s *Source) Teardown(context.Context) error {
if s.tail != nil {
return s.tail.Stop()
}
return nil
}

Schema

By default, the sdk source middleware will dynamically generate a schema for each record. While convenient, this is quite slow, as the middleware will try to create a schema for every single record.

If the source connector is reading unstructured data, or you want to use your own schema extraction logic, you can disable the middleware like this, as mentioned before:

func NewSource() sdk.Source {
return sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware(
sdk.SourceWithSchemaExtractionConfig{
PayloadEnabled: lang.Ptr(false),
KeyEnabled: lang.Ptr(false),
},
)...,
)
}

Otherwise you can use schema.Create to manually handle the schema creation:

	import (
"github.com/conduitio/conduit-connector-sdk/schema"
"github.com/conduitio/conduit-commons/schema/avro"
hambaavro "github.com/hamba/avro/v2"
)

// create a user schema with name string and age int

recordSchema, err := avro.NewBuilder("users", "example_namespace").
AddField("name", hambaavro.NewPrimitiveSchema(hambaavro.String, nil)).
AddField("age", hambaavro.NewPrimitiveSchema(hambaavro.Int, nil)).
Build()
if err != nil {
panic(err)
}

s, err := schema.Create(ctx, schema.TypeAvro, "users", []byte(recordSchema.String()))
if err != nil {
panic(err)
}

An example implementation can be found in the postgres connector.

scarf pixel conduit-site-docs-developing-connectors