Developing a Source Connector
A Source is responsible for continuously reading data from a third party system and returning it in the form of an OpenCDC Record.
You need to implement the functions required by
the Source
interface and provide your own implementations. Information about individual
functions are listed below. The source.go
file is the main file where the
functionality of your source connector is implemented.
Source struct
Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Source implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector.
type Source struct {
sdk.UnimplementedSource
config SourceConfig
tail *tail.Tail
}
Source Connector Lifecycle Functions
NewSource()
A constructor function for your Source struct. Note that this is the same
function that should be set as the value of Connector.NewSource
. The
constructor should be used to wrap your Source in the default
DefaultSourceMiddleware
.
func NewSource() sdk.Source {
// Create Source and wrap it in the default middleware.
return sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware()...,
)
}
Additional options via SourceMiddlewareOption
:
In case you need to add additional middleware options, you can do so by passing
it to the sdk.SourceWithMiddleware
function via
sdk.DefaultSourceMiddleware(opts ...SourceMiddlewareOption)
. Currently, the
available source middleware options can be
found here.
If you're using a source connector that's not generating structured data (i.e.
produces raw data), you might want to disable schema extraction by default by
overwriting the sdk.SourceWithSchemaExtractionConfig
options:
sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware(
// disable schema extraction by default, because the source produces raw data
sdk.SourceWithSchemaExtractionConfig{
PayloadEnabled: lang.Ptr(false),
KeyEnabled: lang.Ptr(false),
},
)...,
)
Parameters()
A map of named Parameters that describe how to configure the connector. This map
is typically generates using
paramgen
.
func (s *Source) Parameters() config.Parameters {
return s.config.Parameters()
}
Configure()
Validates and stores configuration data for the connector. Any complex validation logic should be implemented here.
func (s *Source) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters())
if err != nil {
return err
}
// add custom validations here
return nil
}
Open()
Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function.
Every record read by a source connector has
a position attached.
The position given to Open()
is the position of the record that was the last
to be successfully processed end-to-end, before the connector stopped, or nil
if no records were read. Hence, a position needs to contain enough information
for a source connector to resume reading records from where it exactly stopped.
A position is a slice of bytes that can represent any data structure. In Conduit
connectors, it's common to see that a position is actually a struct
, that's
marshalled into a JSON string. In the example below, the position is an offset
within the file being read.
func (s *Source) Open(ctx context.Context, p opencdc.Position) error {
// parse the position
var offset int64
if p != nil {
var err error
offset, err = strconv.ParseInt(string(p), 10, 64)
if err != nil {
return fmt.Errorf("invalid position %v, expected a number", p)
}
}
// seek to the position, i.e. the offset
sdk.Logger(ctx).Info().
Int64("position", offset).
Msgf("seeking...")
t, err := tail.TailFile(
s.config.Path,
tail.Config{
Follow: true,
Location: &tail.SeekInfo{
Offset: offset,
Whence: io.SeekStart,
},
Logger: tail.DiscardingLogger,
},
)
if err != nil {
return fmt.Errorf("could not tail file: %w", err)
}
s.tail = t
return nil
}
Read()
Gathers data from the configured data source and formats it into a
opencdc.Record
that is returned from the function. The returned
opencdc.Record
is queued into the pipeline to be consumed by a Destination
connector.
func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
select {
case line, ok := <-s.tail.Lines:
if !ok {
return opencdc.Record{}, s.tail.Err()
}
return sdk.Util.Source.NewRecordCreate(
opencdc.Position(strconv.FormatInt(line.SeekInfo.Offset, 10)),
map[string]string{
MetadataFilePath: s.config.Path,
},
opencdc.RawData(strconv.Itoa(line.Num)), // use line number as key
opencdc.RawData(line.Text), // use line content as payload
), nil
case <-ctx.Done():
return opencdc.Record{}, ctx.Err()
}
}
Ack()
Ack
signals to the third party system that the record with the supplied
position was successfully processed. It's worth noting that while some source
connectors need to implement this functionality (e.g. in the case of messaging
brokers), others don't have to (e.g. a file source).
func (s *Source) Ack(ctx context.Context, position opencdc.Position) error {
sdk.Logger(ctx).Trace().Msg("record successfully processed")
return nil // no ack needed
}
Teardown()
Teardown signals to the connector that there will be no more calls to any other
function. Any connections that were created in the Open()
function should be
closed here.
func (s *Source) Teardown(context.Context) error {
if s.tail != nil {
return s.tail.Stop()
}
return nil
}
Schema
By default, the sdk source middleware will dynamically generate a schema for each record. While convenient, this is quite slow, as the middleware will try to create a schema for every single record.
If the source connector is reading unstructured data, or you want to use your own schema extraction logic, you can disable the middleware like this, as mentioned before:
func NewSource() sdk.Source {
return sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware(
sdk.SourceWithSchemaExtractionConfig{
PayloadEnabled: lang.Ptr(false),
KeyEnabled: lang.Ptr(false),
},
)...,
)
}
Otherwise you can use schema.Create to manually handle the schema creation:
import (
"github.com/conduitio/conduit-connector-sdk/schema"
"github.com/conduitio/conduit-commons/schema/avro"
hambaavro "github.com/hamba/avro/v2"
)
// create a user schema with name string and age int
recordSchema, err := avro.NewBuilder("users", "example_namespace").
AddField("name", hambaavro.NewPrimitiveSchema(hambaavro.String, nil)).
AddField("age", hambaavro.NewPrimitiveSchema(hambaavro.Int, nil)).
Build()
if err != nil {
panic(err)
}
s, err := schema.Create(ctx, schema.TypeAvro, "users", []byte(recordSchema.String()))
if err != nil {
panic(err)
}
An example implementation can be found in the postgres connector.