Skip to main content

rabbitmq

Author: Meroxa, Inc.Conduit team logo
0

Latest release

Description

A RabbitMQ source and destination plugin for Conduit, written in Go.

Source Parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-source
type: source
plugin: "rabbitmq"
name: example-source
settings:
# The name of the queue to consume from / publish to
# Type: string
queue.name: ""
# The RabbitMQ server URL
# Type: string
url: ""
# Indicates if the server should consider messages acknowledged once
# delivered.
# Type: bool
consumer.autoAck: "false"
# Indicates if the consumer should be exclusive.
# Type: bool
consumer.exclusive: "false"
# The name of the consumer
# Type: string
consumer.name: ""
# Indicates if the server should not deliver messages published by the
# same connection.
# Type: bool
consumer.noLocal: "false"
# Indicates if the consumer should be declared without waiting for
# server confirmation.
# Type: bool
consumer.noWait: "false"
# Indicates if the queue will be deleted when there are no more
# consumers.
# Type: bool
queue.autoDelete: "false"
# Indicates if the queue will survive broker restarts.
# Type: bool
queue.durable: "true"
# Indicates if the queue can be accessed by other connections.
# Type: bool
queue.exclusive: "false"
# Indicates if the queue should be declared without waiting for server
# confirmation.
# Type: bool
queue.noWait: "false"
# The path to the CA certificate to use for TLS
# Type: string
tls.caCert: ""
# The path to the client certificate to use for TLS
# Type: string
tls.clientCert: ""
# The path to the client key to use for TLS
# Type: string
tls.clientKey: ""
# Indicates if TLS should be used
# Type: bool
tls.enabled: "false"
# Maximum delay before an incomplete batch is read from the source.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets read from the source.
# Type: int
sdk.batch.size: "0"
# Specifies whether to use a schema context name. If set to false, no
# schema context name will be used, and schemas will be saved with the
# subject name specified in the connector (not safe because of name
# conflicts).
# Type: bool
sdk.schema.context.enabled: "true"
# Schema context name to be used. Used as a prefix for all schema
# subject names. If empty, defaults to the connector ID.
# Type: string
sdk.schema.context.name: ""
# Whether to extract and encode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# The subject of the key schema. If the record metadata contains the
# field "opencdc.collection" it is prepended to the subject name and
# separated with a dot.
# Type: string
sdk.schema.extract.key.subject: "key"
# Whether to extract and encode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"
# The subject of the payload schema. If the record metadata contains
# the field "opencdc.collection" it is prepended to the subject name
# and separated with a dot.
# Type: string
sdk.schema.extract.payload.subject: "payload"
# The type of the payload schema.
# Type: string
sdk.schema.extract.type: "avro"

Destination Parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-destination
type: destination
plugin: "rabbitmq"
name: example-destination
settings:
# The name of the queue to consume from / publish to
# Type: string
queue.name: ""
# The RabbitMQ server URL
# Type: string
url: ""
# The application that created the message.
# Type: string
delivery.appID: ""
# The encoding of the message content.
# Type: string
delivery.contentEncoding: ""
# The MIME type of the message content. Defaults to
# "application/json".
# Type: string
delivery.contentType: "application/json"
# The correlation ID used to correlate RPC responses with requests.
# Type: string
delivery.correlationID: ""
# The message delivery mode. Non-persistent (1) or persistent (2).
# Default is 2 (persistent).
# Type: int
delivery.deliveryMode: "2"
# The message expiration time, if any.
# Type: string
delivery.expiration: ""
# Indicates if the message should be treated as immediate. If true,
# the message is not queued if no consumers are on the matching queue.
# Type: bool
delivery.immediate: "false"
# Indicates if the message is mandatory. If true, tells the server to
# return the message if it cannot be routed to a queue.
# Type: bool
delivery.mandatory: "false"
# The message type name.
# Type: string
delivery.messageTypeName: ""
# The message priority. Ranges from 0 to 9. Default is 0.
# Type: int
delivery.priority: "0"
# The address to reply to.
# Type: string
delivery.replyTo: ""
# The user who created the message. Useful for publishers.
# Type: string
delivery.userID: ""
# Indicates if the exchange will be deleted when the last queue is
# unbound from it.
# Type: bool
exchange.autoDelete: "false"
# Indicates if the exchange will survive broker restarts.
# Type: bool
exchange.durable: "true"
# Indicates if the exchange is used for internal purposes and cannot
# be directly published to by a client.
# Type: bool
exchange.internal: "false"
# The name of the exchange.
# Type: string
exchange.name: ""
# Indicates if the exchange should be declared without waiting for
# server confirmation.
# Type: bool
exchange.noWait: "false"
# The type of the exchange (e.g., direct, fanout, topic, headers).
# Type: string
exchange.type: ""
# Indicates if the queue will be deleted when there are no more
# consumers.
# Type: bool
queue.autoDelete: "false"
# Indicates if the queue will survive broker restarts.
# Type: bool
queue.durable: "true"
# Indicates if the queue can be accessed by other connections.
# Type: bool
queue.exclusive: "false"
# Indicates if the queue should be declared without waiting for server
# confirmation.
# Type: bool
queue.noWait: "false"
# The routing key to use when publishing to an exchange
# Type: string
routingKey: "{{ index .Metadata "rabbitmq.routingKey" }}"
# The path to the CA certificate to use for TLS
# Type: string
tls.caCert: ""
# The path to the client certificate to use for TLS
# Type: string
tls.clientCert: ""
# The path to the client key to use for TLS
# Type: string
tls.clientKey: ""
# Indicates if TLS should be used
# Type: bool
tls.enabled: "false"
# Maximum delay before an incomplete batch is written to the
# destination.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets written to the destination.
# Type: int
sdk.batch.size: "0"
# Allow bursts of at most X records (0 or less means that bursts are
# not limited). Only takes effect if a rate limit per second is set.
# Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
# effective batch size will be equal to `sdk.rate.burst`.
# Type: int
sdk.rate.burst: "0"
# Maximum number of records written per second (0 means no rate
# limit).
# Type: float
sdk.rate.perSecond: "0"
# The format of the output record. See the Conduit documentation for a
# full list of supported formats
# (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
# Type: string
sdk.record.format: "opencdc/json"
# Options to configure the chosen output record format. Options are
# normally key=value pairs separated with comma (e.g.
# opt1=val2,opt2=val2), except for the `template` record format, where
# options are a Go template.
# Type: string
sdk.record.format.options: ""
# Whether to extract and decode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# Whether to extract and decode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"