Movatterモバイル変換


[0]ホーム

URL:


✨ As of November 2023, Arcion has become a part of Databricks.Learn more here
Confluent Platform

Destination Confluent Platform#

In the following steps, we referthe extractedreplicant-cli as the$REPLICANT_HOME directory.

Don’t have a Confluent Platform account? Get onehere.

I. Set up connection configuration#

Specify your Confluent Platform connection details to Replicant with a connection configuration file. You can find a sample connection configuration fileconfluent.yaml in the$REPLICANT_HOME/conf/conn directory.

Specify your connection in the following manner:

type:KAFKAusername:'CLUSTER_API_KEY'password:'CLUSTER_API_SECRET'auth-type:SASLbrokers:broker1:host:'BOOTSTRAP_SERVER_NAME'port:'BOOTSTRAP_SERVER_PORT'is-cloud-instance:truemax-connections:30

Replace the following:

In the preceding configuration,max-connections specifies the maximum number of connections Replicant can open in Confluent Platform. Feel free to change its value as you need.

In Arcion Cloud, fill up the connection details in theConnection form tab. TheConnection form requires the same set of connection details as Arcion self-hosted CLI:

  • Enter a name for your connection in theConnection name field.
  • Enter thebootstrap hostname and port number in theBootstrap Host andPort fields respectively.
  • Enter theresource-specific API key to access your Kafka cluster in theKey field.
  • Enter the secret associated with yourKey in theSecret field.
  • Specify the maximum number of connections Replicant can open in Confluent Platform in theMax connections field. Defaults to30.
  • Specify the duration in milliseconds Replicant waits before retrying a failed operation in theRetry wait durations in ms field. Defaults to1000.
  • Specify the number of times Replicant retries a failed operation in theMax retries field. Defaults to30.

II. Configure mapper file (optional)#

If you want to define data mapping from your source to Confluent Platform, specify the mapping rules in the mapper file. For more information on how to define the mapping rules and run Replicant CLI with the mapper file, seeMapper Configuration.

When mapping source object names to Kafka topics, you can choose between two delimiters for topic names. For more information, seeDelimiter in Kafka topic and Redis stream names.

III. Set up Applier configuration#

  1. From$REPLICANT_HOME, naviagte to the sample Confluent Platform Applier configuration file:

    vi conf/dst/confluent.yaml
  2. The configuration file contains global Applier parameters, with snapshot and realtime parameters following the global parameters:

    Global configuration parameters#

    Global configuration parameters live at the topmost level of the Applier configuration file. So youmust specify them at the topmost place of the Applier configuration file. The global configuration parameters affect both snapshot and real-time replication.

    The following global Applier configuration parameters are available.

    replication-format#

    The structure of the published events.

    The following values are allowed:

    Parameters related to snapshot mode#

    For snapshot mode, the following Confluent Platform-specific parameters are available:

    replication-factor[v21.12.02.6]#

    Replication factor for data topics. For Kafka cluster setup this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.

    num-shards[v21.12.02.6]#

    Number of partitions per data topic. By default this is set to a number of applier threads for getting the best possible scaling by allowing each individual applier thread to write to an independent partition of a Kafka topic.

    shard-key[v21.12.02.6]#

    Shard key to be used for partitioning data topics.

    shard-function[v21.12.02.6]#

    Sharding function to be used to deduce the partition allotment based onshard-key for all data topics. Values allowed areMOD andNONE.

    Default: By default, this parameter is set toNONE, meaning Kafka will use it’s partitioning algorithm.

    kafka-compression-type[v20.05.12.3]#

    Compression type. Allowed values arelz4,snappy,gzip, andnone.

    Default: By default, this parameter is set tolz4.

    kafka-batch-size-in-bytes[v20.05.12.3]#

    Batch size for Kafka producer.

    Default: By default, this parameter is set to100000.

    kafka-buffer-memory-size-in-bytes*[v20.05.12.3]#

    Memory allocated to Kafka client to store unsent messages. (Default set to 67108864)

    Default: By default, this parameter is set to67108864.

    kafka-linger-ms[v20.05.12.3]#

    Config used to give more time for Kafka batches to fill (in milliseconds).

    Default: By default, this parameter is set to10.

    kafka-interceptor-classes[v21.09.17.2]#

    Config used to specify list of interceptor classes. It corresponds to Kafka’sProducerConfig.INTERCEPTOR_CLASSES_CONFIG.

    producer-max-block-ms[v22.07.19.7]#

    Corresponds to themax.block.ms parameter of Kafka Producer.

    Default: Default value is60_000.

    create-topic-timeout-ms[v22.07.19.7]#

    Specifies the timeout for topic creation.

    Default: Default value is60_000.

    per-table-config[v20.12.04.6]#

    This configuration allows you to specify various properties for target tables on a per table basis like the following:

    replication-factor[v21.12.02.6]
    Replication factor for data topics. For Kafka cluster setup, this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.
    num-shards[v21.12.02.6]
    Number of partitions per data topic. By default this is set to a number of applier threads for getting the best possible scaling by allowing each individual applier thread to write to an independent partition of a Kafka topic.
    shard-key[v21.12.02.6]
    Shard key to be used for partitioning data topic.
    shard-function[v21.12.02.6]
    Sharding function to be used to deduce the partition allotment based on `shard-key` for all data topics. Values allowed areMOD andNONE.

    Default: By default, this parameter is set toNONE, meaning Kafka will use it’s partitioning algorithm.

    Below is a sample config forsnapshot mode:

    snapshot:threads:16txn-size-rows:10000replication-factor:1schema-dictionary: SCHEMA_DUMP  # Allowed values:POJO | SCHEMA_DUMP| NONEkafka-compression-type:lz4kafka-batch-size-in-bytes:100000kafka-buffer-memory-size-in-bytes:67108864kafka-linger-ms:10skip-tables-on-failures:falsekafka-interceptor-classes: ["KafkaInterceptors.SampleInterceptor"]producer-max-block-ms:60_000create-topic-timeout-ms:100_000

    Parameters related to realtime mode#

    If you want to operate in realtime mode, you can use arealtime section to specify your configuration. The following Kafka-specific parameters are available:

    split-topic#

    true orfalse.

    Creates a separate topic for snapshot and CDC data iftrue. Iffalse, a single topic contains the data for snapshot and CDC.split-topic is a global parameter forrealtime mode. So you can’t change it on a per-table basis.

    Default:true.

    split-topic is applicableonly whenreplication-format is set toJSON.

    replication-factor[v21.12.02.6]#

    Replication factor for CDC topics. For Kafka cluster setup this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.

    num-shards[v21.12.02.6]#

    Number of partitions to be created for all CDC log topics.

    shard-key[v21.12.02.6]#

    Shard key to be used for partitioning CDC logs in all target topics.

    shard-function[v21.12.02.6]#

    Sharding function to be used to deduce the partition allotment based onshard-key for all CDC log topics. Values allowed areMOD andNONE.

    Default: By default, this parameter is set toNONE, meaning Kafka will use it’s partitioning algorithm.

    kafka-compression-type[v20.05.12.3]#

    Compression type. Allowed values arelz4,snappy,gzip, andnone.

    Default: By default, this parameter is set tolz4.

    kafka-batch-size-in-bytes[v20.05.12.3]#

    Batch size for Kafka producer.

    Default: By default, this parameter is set to100000.

    kafka-buffer-memory-size-in-bytes*[v20.05.12.3]#

    Memory allocated to Kafka client to store unsent messages. (Default set to 67108864)

    Default: By default, this parameter is set to67108864.

    kafka-linger-ms[v20.05.12.3]#

    Config used to give more time for Kafka batches to fill (in milliseconds).

    Default: By default, this parameter is set to10.

    kafka-interceptor-classes[v21.09.17.2]#

    Config used to specify list of interceptor classes. It corresponds to Kafka’sProducerConfig.INTERCEPTOR_CLASSES_CONFIG.

    producer-max-block-ms[v22.07.19.7]#

    Corresponds to themax.block.ms parameter of Kafka Producer.

    Default: Default value is60_000.

    create-topic-timeout-ms[v22.07.19.7]#

    Specifies the timeout for topic creation.

    Default: Default value is60_000.

    per-table-config[v20.12.04.6]#

    This configuration allows you to specify various properties for target tables on a per table basis like the following:

    replication-factor[v21.12.02.6]
    Replication factor for data topics. For Kafka cluster setup, this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.
    num-shards[v21.12.02.6]
    Number of partitions per data topic. By default this is set to a number of applier threads for getting the best possible scaling by allowing each individual applier thread to write to an independent partition of a Kafka topic.
    shard-key[v21.12.02.6]
    Shard key to be used for partitioning data topic.
    shard-function[v21.12.02.6]
    Sharding function to be used to deduce the partition allotment based on `shard-key` for all data topics. Values allowed areMOD andNONE.

    Default: By default, this parameter is set toNONE, meaning Kafka will use it’s partitioning algorithm.

    Below is a sample config forrealtime mode:

    realtime:txn-size-rows:1000before-image-format: ALL  # Allowed values:KEY, ALLafter-image-format: ALL   # Allowed values:UPDATED, ALLkafka-compression-type:lz4shard-key:idnum-shards:1shard-function: MOD # Allowed values:MOD, NONE. NONE means storage will use its default shardingskip-tables-on-failures:falseproducer-max-block-ms:60_000create-topic-timeout-ms:100_000per-table-config:  -tables:io_blitzz_nation:shard-key:idnum-shards: 16 #default:1shard-function:NONEio_blitzz_region:shard-key:idio_blitzz_customer:shard-key:custkeynum-shards:16

Attention:

  • During replication, Replicant stores metadata information related to replicated tables in a special topic with the prefixreplicate_io_replication_schema. You can configure the replication factor and partitioning for this topic using thereplication-factor andnum-shards parameters respectively inthesnapshot section of the Applier configuration file.You must set these parameters for the metadata topic inthesnapshot section of your Applier configuration file, even if you’re operating in realtime mode. Metadata topic is common tosnapshot,realtime, andfull modes of Replicant. So its settings are included in thesnapshot section.

    For more information about how different Replicant modes work, seeRunning Replicant.

  • Replicant uses Kafka’s transactional API for writing data in batches to Kafka. Transactional API ensures exactly-once delivery semantics.

  • Replicant doesn’t address realtime changes for views when replicating from the following databases to Kafka:

For a detailed explanation of configuration parameters in the applier file, seeApplier Reference.


[8]ページ先頭

©2009-2026 Movatter.jp