Developing a Source Connector
A Source is responsible for continuously reading data from a third party system and returning it in the form of an SDK Record.
You need to implement the functions required by the Source interface and provide your own implementations. Information about individual functions are listed below. The source.go
file is the main file where the functionality of your Source Connector is implemented.
Source struct
Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Source implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector
type Source struct {
sdk.UnimplementedSource
config SourceConfig
lastPositionRead sdk.Position //nolint:unused // this is just an example
watcher *fsnotify.Watcher
recentlyCreated sync.Map // To keep track of recently created files
createCooldown time.Duration // Cooldown period after a create event
}
Source Connector Lifecycle Functions
NewSource()
A constructor function for your Source struct. Note that this is the same function that should be set as the value of Connector.NewSource
. The constructor should be used to wrap your Source in the default DefaultSourceMiddleware
.
func NewSource() sdk.Source {
// Create Source and wrap it in the default middleware.
return sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware()...,
)
}
Additional options via SourceMiddlewareOption
:
In case you need to add additional middleware options, you can do so by passing it to the sdk.SourceWithMiddleware
function via sdk.DefaultSourceMiddleware(opts ...SourceMiddlewareOption)
. Currently, the available source middleware options can be cound here.
If you're using a source connector that's not generating structured data (i.e. produces raw data), you might want to disable schema extraction by default by overwriting the sdk.SourceWithSchemaExtractionConfig
options:
sdk.SourceWithMiddleware(
&Source{},
sdk.DefaultSourceMiddleware(
// disable schema extraction by default, because the source produces raw data
sdk.SourceWithSchemaExtractionConfig{
PayloadEnabled: lang.Ptr(false),
KeyEnabled: lang.Ptr(false),
},
)...,
)
Parameters()
A map of named Parameters that describe how to configure the connector. This map is typically generates using paramgen
.
func (s *Source) Parameters() map[string]sdk.Parameter {
return s.config.Parameters()
}
Configure()
Validates and stores configuration data for the connector. Any complex validation logic should be implemented here.
func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
sdk.Logger(ctx).Info().Msg("Configuring Source...")
err := sdk.Util.ParseConfig(cfg, &s.config)
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}
// add custom validations here
return nil
}
Open()
Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function.
func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
configDirPath := s.config.Directory
files, err := ioutil.ReadDir(configDirPath)
if err != nil {
return fmt.Errorf("error reading directory '%s': %w", configDirPath, err)
}
for _, f := range files {
sdk.Logger(ctx).Info().Msgf(" - %s\n", f.Name())
}
w, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("error creating fsnotify watcher: %w", err)
}
s.watcher = w
s.createCooldown = 2 * time.Second
return s.watcher.Add(s.config.Directory)
}
Read()
Gathers data from the configured data source and formats it into a sdk.Record
that is returned from the function. The returned sdk.Record
is queued into the pipeline to be consumed by a Destination connector.
func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
for {
select {
case event, ok := <-s.watcher.Events:
if !ok {
return sdk.Record{}, fmt.Errorf("events channel was closed")
}
if event.Op&fsnotify.Create == fsnotify.Create {
sdk.Logger(ctx).Info().Msgf("Detected new file: %s", event.Name)
// Mark the file as recently created to avoid processing if it is modified shortly after being created
s.markAsRecentlyCreated(event.Name)
// Read the newly created file
fileContent, err := ioutil.ReadFile(event.Name)
if err != nil {
return sdk.Record{}, err
}
recordKey := sdk.RawData(filepath.Base(event.Name))
recordValue := sdk.RawData(fileContent)
// Return a Record reflecting that a new file has been created
return sdk.Util.Source.NewRecordCreate(
sdk.Position(recordKey),
map[string]string{
MetadataFilePath: event.Name,
},
recordKey,
recordValue,
), nil
}
// If the event is not a Create event, continue listening without doing anything
continue
case err, ok := <-s.watcher.Errors:
if !ok {
return sdk.Record{}, fmt.Errorf("errors channel was closed")
}
return sdk.Record{}, fmt.Errorf("error on watcher: %w", err)
case <-ctx.Done():
return sdk.Record{}, ctx.Err()
}
}
}
Ack()
Ack signals to the implementation that the record with the supplied position was successfully processed.
func (s *Source) Ack(ctx context.Context, position sdk.Position) error {
sdk.Logger(ctx).Debug().Msg("Record successfully processed")
return nil
}
Teardown()
Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the Open()
function should be closed here.
func (s *Source) Teardown(ctx context.Context) error {
if s.watcher != nil {
err := s.watcher.Close()
if err != nil {
// Log the error or handle it as needed
sdk.Logger(ctx).Error().Msgf("Failed to close fsnotify watcher: %v", err)
return fmt.Errorf("failed to close fsnotify watcher: %w", err)
}
}
return nil
}