Skip to main content

custom.javascript

Run custom JavaScript code.

Description

A processor that makes it possible to process Conduit records using JavaScript.

The following helper functions and variables are available:

  • logger: a logger that outputs to Conduit's logs. Check out Zerolog's API on how to use it.
  • Record(): constructs a new record which represents a successful processing result. It's analogous to sdk.SingleRecord from Conduit's Go processor SDK.
  • RawData(): creates a raw data object. It's analogous to opencdc.RawData. Optionally, it accepts a string argument, which will be cast into a byte array, for example: record.Key = RawData("new key").
  • StructuredData(): creates a structured data (map-like) object.

To find out what's possible with the JS processor, also refer to the documentation for goja, which is the JavaScript engine we use.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "custom.javascript"
settings:
# JavaScript code for this processor. It needs to have a function
# `process()` that accept a record and returns a record. The
# `process()` function can either modify the input record and return
# it, or create a new record. If a record needs to be filtered
# (dropped from the pipeline), then the `process()` function should
# return `null`.
# Type: string
script: ""
# The path to a .js file containing the processor code.
# Type: string
script.path: ""
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"

Examples

Modify a record's metadata and payload using JavaScript

In this example we use the custom.javascript processor to add a metadata key to the input record. It also prepends "hello, " to .Payload.After.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "custom.javascript"
settings:
script: |
function process(rec) {
rec.Metadata["processed"] = "true";
let existing = String.fromCharCode.apply(String, rec.Payload.After);
rec.Payload.After = RawData("hello, " + existing);
return rec;
}

Record difference

Before
After
1
{
1
{
2
  "position": null,
2
  "position": null,
3
  "operation": "Operation(0)",
3
  "operation": "Operation(0)",
4
  "metadata": {
4
  "metadata": {
5
-
    "existing-key": "existing-value"
5
+
    "existing-key": "existing-value",
6
+
    "processed": "true"
6
  },
7
  },
7
  "key": null,
8
  "key": null,
8
  "payload": {
9
  "payload": {
9
    "before": null,
10
    "before": null,
10
-
    "after": "world"
11
+
    "after": "hello, world"
11
  }
12
  }
12
}
13
}

scarf pixel conduit-site-docs-using-processors