A Redpanda Connect stream pipeline is configured with a singleconfig file, you can generate a fresh one with:
rpk connect create > connect.yaml
| This command may take a few seconds to run. If this is the firstrpk connect command you have run, therpk connect plugin is automatically installed. |
For Docker installations:
docker run --rm docker.redpanda.com/redpandadata/connect create > ./connect.yaml
The main sections that make up a config areinput
,pipeline
andoutput
. When you generate a fresh config it’ll simply pipestdin
tostdout
like this:
input: stdin: {}pipeline: processors: []output: stdout: {}
Eventually we’ll want to configure a more usefulinput andoutput, but for now this is useful for quickly testing processors. You can execute this config with:
rpk connect run connect.yaml
For Docker installations:
docker run --rm -it -v $(pwd)/connect.yaml:/connect.yaml docker.redpanda.com/redpandadata/connect run
Anything you write to stdin will get written unchanged to stdout, cool! Resist the temptation to play with this for hours, there’s more stuff to try out.
Next, let’s add some processing steps in order to mutate messages. The most powerful one is themapping
processor which allows us to perform mappings, let’s add a mapping to uppercase our messages:
input: stdin: {}pipeline: processors: - mapping: root = content().uppercase()output: stdout: {}
Now your messages should come out in all caps.
You can add as manyprocessing steps as you like, and since processors are what make Redpanda Connect powerful they are worth experimenting with. Let’s create a more advanced pipeline that works with JSON documents:
input: stdin: {}pipeline: processors: - sleep: duration: 500ms - mapping: | root.doc = this root.first_name = this.names.index(0).uppercase() root.last_name = this.names.index(-1).hash("sha256").encode("base64")output: stdout: {}
First, we sleep for 500 milliseconds just to keep the suspense going. Next, we restructure our input JSON document by nesting it within a fielddoc
, we map the upper-cased first element ofnames
to a new fieldfirst_name
. Finally, we map the hashed and base64 encoded value of the last element ofnames
to a new fieldlast_name
.
Try running that config with some sample documents:
echo '{"id":"1","names":["celine","dion"]}{"id":"2","names":["chad","robert","kroeger"]}' | rpk connect run connect.yaml
For Docker installations:
echo '{"id":"1","names":["celine","dion"]}{"id":"2","names":["chad","robert","kroeger"]}' | docker run --rm -i -v $(pwd)/connect.yaml:/connect.yaml docker.redpanda.com/redpandadata/connect run
You should see this output in the logs:
{"doc":{"id":"1","names":["celine","dion"]},"first_name":"CELINE","last_name":"1VvPgCW9sityz5XAMGdI2BTA7/44Wb3cANKxqhiCo50="}{"doc":{"id":"2","names":["chad","robert","kroeger"]},"first_name":"CHAD","last_name":"uXXg5wCKPjpyj/qbivPbD9H9CZ5DH/F0Q1Twytnt2hQ="}