Movatterモバイル変換


[0]ホーム

URL:


✨ As of November 2023, Arcion has become a part of Databricks.Learn more here
Redis Streams

Destination Redis Streams#

This page describes how to load data in realtime intoRedis Streams, an append-only log data structure.

The following steps referthe extracted Arcion self-hosted CLI download as the$REPLICANT_HOME directory.

I. Set up connection configuration#

Specify our Redis connection details to Replicant with a connection configuration file. You can find a sample connection configuration fileredis_stream.yaml in the$REPLICANT_HOME/conf/conn directory.

For connecting to Redis, you can choose between two methods for an authenticated connection:

Connect with username and password#

For connecting to Redis with basic username and password authentication, you can specify your credentials in the connection configuration file. Follow these instructions based on whether or not you have SSL encryption enabled for Redis connection.

Without SSL encryption for Redis connection, specify your configuration in the following manner:

type:REDIS_STREAMhost:HOSTNAMEport:PORT_NUMBERusername:'USERNAME'password:'PASSWORD'max-connections:30max-retries:10

Replace the following:

  • HOSTNAME: the Redis server hostname.
  • PORT_NUMBER: the port number of Redis host
  • USERNAME: the username to connect to the Redis server
  • PASSWORD: the password associated withUSERNAME

In the preceding sample:

  • max-connections specifies the maximum number of connections Replicant can open in Redis.
  • max-retries specifies the number of times any failed operation on the system will be re-attempted.

Feel free to change these two values as you need.

You can enable data encryption for Redis connection using SSL. In that case, you need to specify the TrustStore holding the CA certificate along with the username and password. For example:

type:REDIS_STREAMhost:HOSTNAMEport:PORT_NUMBERusername:'USERNAME'password:'PASSWORD'ssl:enable:truetrust-store:path:"PATH_TO_TRUSTSTORE"password:"TRUSTSTORE_PASSWORD"ssl-store-type:'TRUSTSTORE_TYPE'max-connections:30max-retries:10

Replace the following:

  • HOSTNAME: the Redis server hostname.
  • PORT_NUMBER: the port number of Redis host
  • USERNAME: the username to connect to the Redis server
  • PASSWORD: the password associated withUSERNAME
  • PATH_TO_TRUSTSTORE: path to the TrustStore
  • TRUSTSTORE_PASSWORD: the TrustStore password
  • TRUSTSTORE_TYPE: the TrustStore type—for example,PKCS12

In the preceding sample:

  • max-connections specifies the maximum number of connections Replicant can open in Redis.
  • max-retries specifies the number of times any failed operation on the system will be re-attempted.

Feel free to change these two values as you need.

Connect using SSL#

If you prefer both client authentication and data encryption using SSL, specify both TrustStore and KeyStore details in the connection configuration file. For example:

type:REDIS_STREAMhost:HOSTNAMEport:PORT_NUMBERssl:enable:truetrust-store:path:"PATH_TO_TRUSTSTORE"password:"TRUSTSTORE_PASSWORD"ssl-store-type:'TRUSTSTORE_TYPE'key-store:path:"PATH_TO_KEYSTORE"password:"KEYSTORE_PASSWORD"ssl-store-type:'KEYSTORE_TYPE'max-connections:30max-retries:10

Replace the following:

In the preceding sample:

Feel free to change these two values as you need.

II. Configure mapper file (optional)#

If you want to define data mapping from your source to Redis Streams, specify the mapping rules in the mapper file. For more information on how to define the mapping rules and run Replicant CLI with the mapper file, seeMapper Configuration.

When mapping source object names to Redis streams, you can choose between two delimiters for stream names. For more information, seeDelimiter in Kafka topic and Redis stream names.

III. Set up Applier configuration#

To configure replication mode according to your requirements, specify your configuration in the Applier configuration file. You can find a sample Applier configuration fileredis_stream.yaml in the$REPLICANT_HOME/conf/dst directory. For example:

snapshot:threads:16realtime:threads:16split-stream:false

For more information on running Replicant in different modes, seeRunning Replicant.

You can configure Redis Streams for operating in eithersnapshot orrealtime modes.

Configuresnapshot mode#

For operating in snapshot mode, specify your configuration under thesnapshot section of the conifiguration file. For example:

snapshot:threads:32batch-size-rows:10_000txn-size-rows:10_000

Additionalsnapshot parameters#

log-row-level-errors#

true orfalse.

During snapshot replication, if a given batch fails, Replicant retries the failed rows. You can set this parameter totrue if you want to log the failed rows inthe trace.log file.

For more information about the Applier parameters forsnapshot mode, seeSnapshot Mode.

Configurerealtime mode#

For operating in realtime mode, specify your configuration under therealtime section of the conifiguration file. For example:

realtime:threads:16replay-consistency:EVENTUALtxn-size-rows:10_000batch-size-rows:10_000

Additionalrealtime parameters#

split-stream#

true orfalse.

Creates a separate stream for snapshot and CDC data iftrue. Iffalse, a single stream contains the data for snapshot and CDC.split-stream is a global parameter forrealtime mode. So you can’t change it on a per-table basis.

Default:true.

For more information about the configuration parameters forrealtime mode, seeRealtime Mode.

Design considerations#

Supported platforms#

Arcion Replicant supports the following sources for Redis Streams as target:

For MySQL, you can also enableGlobal Transaction ID (GTID) based logging andenforce GTID consistency if Redis messages require them. To do so, add the following to your MySQL option filemy.cnf:

gtid_mode=ON enforce-gtid-consistency=ON

Failures and rollbacks#

Redis stream acts like an append log that where each Stream entry has an ID for each message and allows deleting messages with a given Stream entry ID. However, Redis does not support rollback functionality with transactions. So, if some rows in a batch fail, the entire transaction is not rolled back. Due to this behavior, Replicant follows this strategy:

Replicant’s behavior after reachingmax-retries#

After reaching the maximum number of re-attempts specified inmax-retries, Replicant’s behavior depends on the replication mode andthe type of transactional consistency.

During snapshot phase

Theskip-tables-on-failures Applier configuration parameter defaults totrue. Therefore, Replicant excludes the table from the replication rather than stopping the Replicant process by throwing an exception. This behavior prevents the rest of the tables from going into an inconsistent state. Using thedynamic reinitialization feature, you can add the tables Replicant excludes from replication.

You can also disableskip-tables-on-failures. In that case, Replicant throws an exception and performs snapshot recovery when you resume replication withthe--resume option.

During realtime phase with eventual replay consistency

Theskip-tables-on-failures Applier configuration parameter defaults totrue. Therefore, Replicant excludes the table from the replication rather than stopping the Replicant process by throwing an exception. This behavior prevents the rest of the tables from going into an inconsistent state. Using thedynamic reinitialization feature, you can add the tables Replicant excludes from replication.

You can also disableskip-tables-on-failures. In that case, Replicant throws an exception and performs real-time recovery when you resume replication withthe--resume option.

During realtime phase with global replay consistency

Replicant dumps the Stream entry IDs for the messages it couldn’t delete programmatically in the following file:

$REPLICANT_HOME/data/<replication_id>/bad_rows/replicate_io_indoubt_txn_log

You need to clean up those entries manually andresume the replication run. You can use the following command for cleaning up the entries:

redis-cli XDEL STREAM_NAME STREAM_ENTRY_ID_FROM_FILE[,STREAM_ENTRY_ID_FROM_FILE]

ReplaceSTREAM_NAME andSTREAM_ENTRY_ID_FROM_FILE with the corresponding stream names and IDs.

Transactional consistency in realtime mode#

Realtime mode supports the following two consistency modes.

GLOBAL
Replicant carries out real-time replication with global transactional consistency. A single stream holds CDC logs in transaction order.
EVENTUAL
Replicant carries out real-time replication with eventual consistency. Replicant also carries out replay per table and a stream object exists for each table.

Set the realtime configuration parameterreplay-consistency to whatever mode you wantunder therealtime section of the Applier configuration file.

DML message structure#

  1. Each message contains a key and a value. The key uniquely identifies the change.
  2. Each message contains a schema and a payload. The payload follows the schema definition.
  3. Replicant uses primary key, unique key, or row identifier key column to form key structure. In the absence of primary key, unique key, or row identifier key column, Replicant uses the"default" string for the key.
  4. Whenever a column that uniquely identifies a record is updated, instead of creating an update event, Replicant generates delete and insert events. The delete event deletes existing record and insert event inserts a new record.
  5. For each delete operation, Replicant generates a tombstone event. Replicant assigns the event the same key as the previous delete operation and sets the value to"default".
Click to see sample key and value structure

Key structure#

{"schema": {"type":"struct","optional":false,"name":"REDIS_STREAM_Connector.tpch.region.Key","fields": [      {"type":"int32","optional":false,"field":"r_regionkey"      }    ]  },"payload": {"r_regionkey":10  }}

Value structure#

{"schema": {"type":"struct","optional":false,"name":"REDIS_STREAM_Connector.tpch.region.Envelope","fields": [      {"type":"struct","optional":true,"field":"before","name":"REDIS_STREAM_Connector.tpch.region.Value","fields": [          {"type":"int32","optional":false,"field":"r_regionkey"          },          {"type":"string","optional":false,"field":"r_name"          },          {"type":"string","optional":true,"field":"r_comment"          }        ]      },      {"type":"struct","optional":true,"field":"after","name":"REDIS_STREAM_Connector.tpch.region.Value","fields": [          {"type":"int32","optional":false,"field":"r_regionkey"          },          {"type":"string","optional":false,"field":"r_name"          },          {"type":"string","optional":true,"field":"r_comment"          }        ]      },      {"type":"struct","optional":false,"field":"source","name":"REDIS_STREAM_Connector","fields": [          {"type":"string","optional":false,"field":"version"          },          {"type":"string","optional":false,"field":"connector"          },          {"type":"string","optional":false,"field":"name"          },          {"type":"int64","optional":true,"field":"ts_ms"          },          {"type":"string","optional":true,"field":"db"          },          {"type":"string","optional":true,"field":"schema"          },          {"type":"string","optional":false,"field":"table"          },          {"type":"string","optional":true,"field":"query"          },          {"type":"string","optional":true,"field":"snapshot"          },          {"type":"int64","optional":true,"field":"server_id"          },          {"type":"string","optional":true,"field":"gtid"          },          {"type":"string","optional":true,"field":"file"          },          {"type":"int64","optional":true,"field":"pos"          },          {"type":"int32","optional":true,"field":"row"          }        ]      },      {"type":"string","optional":false,"field":"op"      },      {"type":"int64","optional":true,"field":"ts_ms"      }    ]  },"payload": {"before": {"r_regionkey":10,"r_name":"Test_nation","r_comment":"ReplicationWorks"    },"after": {"r_regionkey":10,"r_name":"Test_nation","r_comment":"TestReplication"    },"source": {"version":"5.7.41","connector":"MYSQL","name":"REDIS_STREAM_Connector","ts_ms":1681129889000,"db":"tpch","schema":null,"table":"region","snapshot":"false","server_id":"1","gtid":null,"file":"mysql-log.000019","pos":4436,"row":5,"thread":69,"query":"UPDATE tpch.region SET r_regionkey=10 AND r_name=Test_nation AND r_comment=TestReplication WHERE r_regionkey=10 AND r_name=Test_nation AND r_comment=ReplicationWorks"    },"op":"u","ts_ms":1681110090233  }}

[8]ページ先頭

©2009-2026 Movatter.jp