Latest release
- conduit-connector-mongo_0.2.1_Darwin_arm64.tar.gz
- conduit-connector-mongo_0.2.1_Darwin_x86_64.tar.gz
- conduit-connector-mongo_0.2.1_Linux_arm64.tar.gz
- conduit-connector-mongo_0.2.1_Linux_i386.tar.gz
- conduit-connector-mongo_0.2.1_Linux_x86_64.tar.gz
- conduit-connector-mongo_0.2.1_Windows_arm64.tar.gz
- conduit-connector-mongo_0.2.1_Windows_i386.tar.gz
- conduit-connector-mongo_0.2.1_Windows_x86_64.tar.gz
The MongoDB Source Connector connects to a MongoDB with the provided uri
, db
and collection
and starts creating records for each change detected in a
Upon starting, the Source takes a snapshot of a given collection in the database, then switches into CDC mode. In CDC mode, the plugin reads events from a Change Stream. In order for this to work correctly, your MongoDB instance must meet the criteria specified on the official website.
Snapshot Capture
When the connector first starts, snapshot mode is enabled. The connector reads
all rows of a collection in batches using
a cursor-based
limiting the rows by batchSize
. The connector stores the last processed
element value of an orderingColumn
in a position, so the snapshot process can
be paused and resumed without losing data. Once all rows in that initial
snapshot are read the connector switches into CDC mode.
This behavior is enabled by default, but can be turned off by adding
"snapshot": false
to the Source configuration.
Change Data Capture
The connector implements CDC features for MongoDB by using a Change Stream that
listens to changes in the configured collection. Every detected change is
converted into a record and returned in the call to Read
. If there is no
available record when Read
is called, the connector returns
The connector stores a resumeToken
of every Change Stream event in a position,
so the CDC process is resumble.
Azure CosmosDB for MongoDB has very limited support for Change Streams, so they cannot be used for CDC. If CDC is not possible, like in the case with CosmosDB, the connector only supports detecting insert operations by polling for new documents.
Key handling
The connector always uses the _id
field as a key.
If the _id
field is bson.ObjectID
the connector converts it to a string when
transferring a record to a destination, otherwise, it leaves it unchanged.
The MongoDB Destination takes a opencdc.Record
and parses it into a valid
MongoDB query. The Destination is designed to handle different payloads and
keys. Because of this, each record is individually parsed and written.
Collection name
If a record contains an opencdc.collection
property in its metadata it will be
written in that collection, otherwise it will fall back to use the collection
configured in the connector. Thus, a Destination can support multiple
collections in the same connector, as long as the user has proper access to
those collections.
Key handling
The connector uses all keys from an opencdc.Record
when updating and deleting
If the _id
field can be converted to a bson.ObjectID
, the connector converts
it, otherwise, it uses it as it is.
Source Parameters
version: 2.2
- id: example
status: running
- id: example-source
type: source
plugin: "mongo"
name: example-source
# Collection is the name of a collection the connector must write to
# (destination) or read from (source).
# Type: string
collection: ""
# DB is the name of a database the connector must work with.
# Type: string
db: ""
# AWSSessionToken is an AWS session token.
# Type: string
auth.awsSessionToken: ""
# DB is the name of a database that contains the user's authentication
# data.
# Type: string
auth.db: ""
# Mechanism is the authentication mechanism.
# Type: string
auth.mechanism: ""
# Password is the user's password.
# Type: string
auth.password: ""
# TLSCAFile is the path to either a single or a bundle of certificate
# authorities to trust when making a TLS connection.
# Type: string
auth.tls.caFile: ""
# TLSCertificateKeyFile is the path to the client certificate file or
# the client private key file.
# Type: string
auth.tls.certificateKeyFile: ""
# Username is the username.
# Type: string
auth.username: ""
# BatchSize is the size of a document batch.
# Type: int
batchSize: "1000"
# OrderingField is the name of a field that is used for ordering
# collection documents when capturing a snapshot.
# Type: string
orderingField: "_id"
# Snapshot determines whether the connector will take a snapshot of
# the entire collection before starting CDC mode.
# Type: bool
snapshot: "true"
# URI is the connection string. The URI can contain host names,
# IPv4/IPv6 literals, or an SRV record.
# Type: string
uri: "mongodb://localhost:27017"
# Maximum delay before an incomplete batch is read from the source.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets read from the source.
# Type: int
sdk.batch.size: "0"
# Specifies whether to use a schema context name. If set to false, no
# schema context name will be used, and schemas will be saved with the
# subject name specified in the connector (not safe because of name
# conflicts).
# Type: bool
sdk.schema.context.enabled: "true"
# Schema context name to be used. Used as a prefix for all schema
# subject names. If empty, defaults to the connector ID.
# Type: string ""
# Whether to extract and encode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# The subject of the key schema. If the record metadata contains the
# field "opencdc.collection" it is prepended to the subject name and
# separated with a dot.
# Type: string
sdk.schema.extract.key.subject: "key"
# Whether to extract and encode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"
# The subject of the payload schema. If the record metadata contains
# the field "opencdc.collection" it is prepended to the subject name
# and separated with a dot.
# Type: string
sdk.schema.extract.payload.subject: "payload"
# The type of the payload schema.
# Type: string
sdk.schema.extract.type: "avro"
Destination Parameters
version: 2.2
- id: example
status: running
- id: example-destination
type: destination
plugin: "mongo"
name: example-destination
# Collection is the name of a collection the connector must write to
# (destination) or read from (source).
# Type: string
collection: ""
# DB is the name of a database the connector must work with.
# Type: string
db: ""
# AWSSessionToken is an AWS session token.
# Type: string
auth.awsSessionToken: ""
# DB is the name of a database that contains the user's authentication
# data.
# Type: string
auth.db: ""
# Mechanism is the authentication mechanism.
# Type: string
auth.mechanism: ""
# Password is the user's password.
# Type: string
auth.password: ""
# TLSCAFile is the path to either a single or a bundle of certificate
# authorities to trust when making a TLS connection.
# Type: string
auth.tls.caFile: ""
# TLSCertificateKeyFile is the path to the client certificate file or
# the client private key file.
# Type: string
auth.tls.certificateKeyFile: ""
# Username is the username.
# Type: string
auth.username: ""
# URI is the connection string. The URI can contain host names,
# IPv4/IPv6 literals, or an SRV record.
# Type: string
uri: "mongodb://localhost:27017"
# Maximum delay before an incomplete batch is written to the
# destination.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets written to the destination.
# Type: int
sdk.batch.size: "0"
# Allow bursts of at most X records (0 or less means that bursts are
# not limited). Only takes effect if a rate limit per second is set.
# Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
# effective batch size will be equal to `sdk.rate.burst`.
# Type: int
sdk.rate.burst: "0"
# Maximum number of records written per second (0 means no rate
# limit).
# Type: float
sdk.rate.perSecond: "0"
# The format of the output record. See the Conduit documentation for a
# full list of supported formats
# (
# Type: string
sdk.record.format: "opencdc/json"
# Options to configure the chosen output record format. Options are
# normally key=value pairs separated with comma (e.g.
# opt1=val2,opt2=val2), except for the `template` record format, where
# options are a Go template.
# Type: string
sdk.record.format.options: ""
# Whether to extract and decode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# Whether to extract and decode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"