Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A Kafka connector for Cloudant

License

NotificationsYou must be signed in to change notification settings

IBM/cloudant-kafka-connector

Repository files navigation

Release

This project includesApache KafkaConnect source and sink connectors for IBM Cloudant.

These connectors can stream events:

  • from Cloudant (source connector) to Kafka topic(s)
  • to Cloudant (sink connector) from Kafka topic(s)

Note: the connectors are also compatible with Apache CouchDB.

Release Status

Experimental

Usage

Note: The below instructions assume an installation of Kafka at$KAFKA_HOME.

Quick Start

  1. Download the zip from thereleases page. The zip filecontains the plugin jar and the non-Kafka dependencies needed to run. The zip file is signed and the signaturecan be verified by runningjarsigner -verify cloudant-kafka-connector-x.y.z.zip command.
  2. Configure theKafka connect plugin path foryour Kafka distribution, for example:plugin.path=/kafka/connect.
    • This will be configured in eitherconnect-standalone.properties orconnect-distributed.properties intheconfig directory of your Kafka installation.
    • If you're not sure which to use, editconnect-standalone.properties and follow the standalone executioninstructions below.
  3. Unzip and move to the plugin path configured earlier, for example:unzip cloudant-kafka-connector-x.y.z.zip; mv cloudant-kafka-connector-x.y.z /kafka/connect.
  4. Edit thesourceorsink example properties files and save this to theconfigdirectory of your Kafka installation.
  5. Start Kafka.
  6. Start the connector (see below).

Connector execution in Kafka is available through scripts in the Kafka install path:

$KAFKA_HOME/bin/connect-standalone.sh or$KAFKA_HOME/bin/connect-distributed.sh

Use the appropriate configuration files for standalone or distributed execution with Cloudant as source, as sink, or both.

For example:

  • standalone execution with Cloudant changes feed as source:

    $KAFKA_HOME/bin/connect-standalone.sh \$KAFKA_HOME/config/connect-standalone.properties \$KAFKA_HOME/config/connect-cloudant-source.properties
  • standalone execution with Cloudant as sink:

    $KAFKA_HOME/bin/connect-standalone.sh \$KAFKA_HOME/config/connect-standalone.properties \$KAFKA_HOME/config/connect-cloudant-sink.properties
  • standalone execution with multiple configurations, one using Cloudant as source and one using Cloudant as sink:

    $KAFKA_HOME/bin/connect-standalone.sh \$KAFKA_HOME/config/connect-standalone.properties \$KAFKA_HOME/config/connect-cloudant-source.properties \$KAFKA_HOME/config/connect-cloudant-sink.properties

Any number of connector configurations can be passed to the executing script.

Configuration

As outlined above, the Cloudant Kafka connector can be configured in standalone or distributed mode according totheKafka Connector documentation.

Theconnect-standalone orconnect-distributed configuration files contain default values which are necessary for all connectors, such as:

  1. bootstrap.servers
  2. If using a standalone workeroffset.storage.file.filename.
  3. offset.flush.interval.ms

Connector configuration

Thecloudant-changes-source-example andcloudant-sink-example properties files contain the minimum required to get started.For a full reference explaining all the connector options, seehere (source) andhere (sink).

Authentication

In order to read from or write to Cloudant, some authentication properties need to be configured. These properties are common to both the source and sink connector, and are detailed in the configuration reference, linked above.

A number of different authentication methods are supported. IBM Cloud IAM-based authentication methods are recommended and the default is to use an IAM API key. Seelocating your service credentials for details on how to find your IAM API key.

Converter Configuration

Also present in theconnect-standalone orconnect-distributed configuration files are defaults for key and value conversion, which are as follows:

key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=true

Depending on your needs, you may need to change these converter settings.For instance, in the sample configuration files, value schemas are disabled on the assumption that users will read and write events which are "raw" JSON and do not have inline schemas.

Converter Configuration: source connector

For the source connector:

  • Keys are produced as aorg.apache.kafka.connect.data.Struct containing:
    • _id: the original Cloudant document ID
    • cloudant.db: the name of the Cloudant database the event originated from
    • cloudant.url: the URL of the Cloudant instance the event originated from.
  • Values are produced as a (schemaless)java.util.Map<String, Object>.
  • These types are compatible with the defaultorg.apache.kafka.connect.json.JsonConverter and should be compatible with any other converter that can accept aStruct orMap.
  • Theschemas.enable may be safely used with akey.converter if desired.
  • The source connector does not generate schemas for the event values by default. To useschemas.enable with thevalue.converter consider using a schema registry or theMapToStruct SMT.

Converter Configuration: sink connector

For the sink connector:

  • Kafka keys are currently ignored; therefore the key converter settings are not relevant.
  • We assume that the values in kafka are serialized JSON objects, and thereforeJsonConverter is supported. If your values contain a schema ({"schema": {...}, "payload": {...}}), then setvalue.converter.schemas.enable=true, otherwise setvalue.converter.schemas.enable=false. Any other converter that converts the message values intoorg.apache.kafka.connect.data.Struct orjava.util.Map types should also work. However, it must be noted that the subsequent serialization ofMap orStruct values to JSON documents in the sink may not match expectations if a schema has not been provided.
  • Inserting only a single revision of any_id is currently supported. This means it cannot update or delete documents.
  • The_rev field in event values are preserved. To remove_rev during data flow, use theReplaceField SMT.

Note: The ID of each document written to Cloudant by the sink connector can be configured as follows:

  • From the value of thecloudant_doc_id header on the even, which will overwrite the_id field if it already exists.The Mapping Document IDs section of the SMT reference shows an example of how to use this header to set the ID based on the event key.
  • The value of the_id field in the JSON.
  • If no other non-null or non-empty value is available the document will be created with a new UUID.

Single Message Transforms

A number of SMTs (Single Message Transforms) have been provided as part of the library to customize fields or values of events during data flow.

See theSMT reference for an overview of how to use these and Kafka built-in SMTs for common use cases.

Logging

INFO level logging is configured by default to the console. To change log levels or settings, work with

$KAFKA_HOME/config/connect-log4j.properties

and add log settings like

log4j.logger.com.ibm.cloud.cloudant.kafka=DEBUG, stdout


[8]ページ先頭

©2009-2025 Movatter.jp