Developing a Destination Connector
A Destination is responsible for writing an OpenCDC Record to third party systems.
You need to implement the functions required by Destination and provide your own
implementations. Information about individual functions are listed below. The *
destination.go
* file is the main file where the functionality of your
Destination Connector is implemented.
Destination struct
Every Destination implementation needs to include an UnimplementedDestination to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Destination implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector.
type Destination struct {
sdk.UnimplementedDestination
config DestinationConfig
}
Destination Connector Lifecycle Functions
NewDestination()
A constructor function for your Destination struct. Note that this is the same
function that should be set as the value of Connector.NewDestination
. The
constructor should be used to wrap your destination with the SDK destination
middleware. The middleware is configured through the destination's configuration
and is explained in the following section.
func NewDestination() sdk.Destination {
// Create a Destination and wrap it in the default middleware.
return sdk.DestinationWithMiddleware(&Destination{})
}
Config()
The Config()
function needs to return a pointer to the connector's active
configuration:
func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{})
}
The SDK will parse the configuration map provided by a user (e.g. though a
pipeline configuration file), validate it, and, if the configuration is valid,
set the values into the value returned by Config()
.
More information about configuration structs can be found
in Connector Configuration.
d.config
needs to be a struct that embeds sdk.UnimplementedDestinationConfig
.
Open()
Prepares the connector to start writing records. If needed, the connector should open connections in this function.
func (d *Destination) Open(context.Context) error {
// opens or creates a file at the given path
file, err := d.openOrCreate(d.config.Path)
if err != nil {
return err
}
d.file = file
return nil
}
Write()
Writes len(records)
from a slice of opencdc.Record
s received from the
Conduit pipeline to the destination right away without caching. It should return
the number of records written from the slice and any error encountered that
caused the write to stop early.
func (d *Destination) Write(_ context.Context, recs []opencdc.Record) (int, error) {
for i, r := range recs {
_, err := d.file.Write(append(r.Bytes(), '\n'))
if err != nil {
return i, err
}
}
return len(recs), nil
}
Teardown()
Teardown signals to the connector that there will be no more calls to any other
function. Any connections that were created in the Open()
function should be
closed here.
func (d *Destination) Teardown(context.Context) error {
if d.file != nil {
return d.file.Close()
}
return nil
}
Schema
By default, the sdk destination middleware will use the record schema fields to decode its key and payload. This isn't as problematic as the sdk source middleware, as the schema should be already created.
If the destination connector is writing unstructured data, or you want to use your own schema writing logic, you can disable the middleware like this, as mentioned before:
func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(
&Destination{},
sdk.DefaultDestinationMiddleware(
sdk.DestinationWithSchemaExtractionConfig{
PayloadEnabled: lang.Ptr(false),
KeyEnabled: lang.Ptr(false),
}
)...,
)
}
Otherwise you can use the schema fetched from schema.Get to manually decode records:
var rec opencdc.Record
sub, err := rec.Metadata.GetPayloadSchemaSubject()
v, err := rec.Metadata.GetPayloadSchemaVersion()
s, err := schema.Get(ctx, sub, v)
err = s.Unmarshal(rec.Payload.After.Bytes(), &someStruct)
// ... write the struct in any way