Source Amazon RDS for PostgreSQL#
This page describes how to replicate data in real time fromAmazon RDS for PostgreSQL, a managed service for PostgreSQL relational database.
The following steps refer to the extractedArcion self-hosted CLI download as the$REPLICANT_HOME directory.
Prerequisites#
I. Set up parameter group#
Create a database parameter group if you haven’t already specified a parameter group for your database instance.
Set the
rds.logical_replicationparameter to1and attachrds.logical_replicationto your database instance. You must reboot your database instance for this change to take effect. After rebooting your database instance, the system automatically sets thewal_levelparameter tological.You can verify the values for
wal_levelandrds.logical_replicationwith the following command frompsqlclient:postgres=>SELECT name,settingFROM pg_settingsWHERE nameIN ('wal_level','rds.logical_replication');The output is similar to the following:
name | setting-------------------------+---------rds.logical_replication | onwal_level | logical(2 rows)In the parameter group, make sure
max_replication_slotsequals to1or greater than the number of replication jobs that you need to run from this RDS for PostgreSQL instance.
II. Create user#
Create a user for replication in the source RDS for PostgreSQL database instance. For example, the following creates a user
alex:postgres=>CREATEROLE alex LOGIN PASSWORD'alex12345';For more information about creating users, seeUnderstanding PostgreSQL roles and permissions.
Grant the necessary permissions:
postgres=>GRANTUSAGEONSCHEMA"arcion"TO alex;postgres=>GRANTSELECTONALL TABLESINSCHEMA"arcion"TO alex;postgres=>ALTERROLE alexWITH REPLICATION;The preceding commands grant the necessary permissions to user
alexfor the schemaarcion.
III. Create logical replication slot#
Log into the PostgreSQL catalog or database with a privileged account that you want to perform replication with.
Create a logical replication slot in this catalog or database using the
wal2jsondecoding plugin:SELECT'init'FROM pg_create_logical_replication_slot('arcion_test','wal2json');The preceding command creates a replication slot with the name
arcion_test. Thewal2jsonplugin isavailable as an extension in RDS for PostgreSQL.Verify that you’ve successfully created a replication slot:
postgres=>SELECT*from pg_replication_slots;
Set up connection configuration#
To connect to your RDS for PostgreSQL instance using basic username and password authentication, you have the following two options:
Specify your credentials in a plain text YAML connection configuration file:
type:POSTGRESQLhost:HOSTNAMEport:PORT_NUMBERdatabase:"DATABASE_NAME"username:"USERNAME"password:"PASSWORD"max-connections:30socket-timeout-s:60max-retries:10retry-wait-duration-ms:1000#Add your replication slot (slot which holds the real-time changes of the source database) as follows:replication-slots:arcion_test: -wal2jsonlog-reader-type: {STREAM|SQL}Replace the following:
HOSTNAME: hostname of the RDS for PostgreSQL instancePORT_NUMBER: port number of the RDS for PostgreSQL hostDATABASE_NAME: the database nameUSERNAME: the username credential to log into your RDS for PostgreSQL instancePASSWORD: the password associated withUSERNAME
Feel free to change the following parameter values as you need:
max-connections: the maximum number of connections Replicant opens in RDS for PostgreSQL database.max-retries: number of times Replicant retries a failed operation.retry-wait-duration-ms: duration in milliseconds Replicant waits between each retry of a failed operation.socket-timeout-s: the timeout value in seconds specifying socket read operations. A value of0disables socket reads. This parameter is only available from version 22.02.12.16.
Important: Make sure that the value ofmax_connectionsin your RDS for PostgreSQL instance exceeds the value ofmax-connectionsin the preceding connection configuration file. For more information, seeMaximum number of database connections in Amazon RDS.
Replication slot#
The replication slot holds the real-time changes of the source database. The preceding sample specifies a replication slot in the following format:
replication-slots:SLOT_NAME: -PLUGIN_NAMEReplace the following:
SLOT_NAME: the replication slot namePLUGIN_NAME: the plugin you’ve used to create the replication slot. In this case, it’swal2json.
Currently only one slot can be specified.
Log reader type#
Caution: From versions 23.03.31 and later,log-reader-type is deprecated. Avoid specifying this parameter.From versions 23.03.01.12 and later, the value oflog-reader-type defaults toSTREAM. If you chooseSTREAM, Replicant captures CDC data throughPgReplicationStream. If you chooseSQL, RDS for PostgreSQL server periodically receives SQL statements for CDC data extraction.
Enable connection by username forSTREAM log reader#
If you useSTREAM as thelog-reader-type, you must allow an authenticated replication connection as theUSERNAME who performs the replication. To do so, modify thepg_hba.conf with the following entries depending on the use case:
Locate and openthe
pg_hba.conffile. You can find the defaultpg_hba.conffile inside the data directory initialized byinitdb.Make the following changes:
# TYPE DATABASE USER ADDRESS METHOD# allow local replication connection to USERNAME (IPv4 + IPv6)local replication USERNAME trusthost replication USERNAME 127.0.0.1/32 <auth-method>host replication USERNAME ::1/128 <auth-method># allow remote replication connection from any client machine to USERNAME (IPv4 + IPv6)host replication USERNAME 0.0.0.0/0 trusthost replication USERNAME ::0/0 trustReplace
USERNAMEwith the RDS for PostgreSQL database username that you want to authenticate for replication.
Set up filter configuration (optional)#
If you want to filter data from your source RDS for PostgreSQL database, specify the filter rules in the filter file. For more information on how to define the filter rules and run Replicant CLI with the filter file, seeFilter configuration.
For example:
allow:catalog:"postgres"schema:"public"types: [TABLE]allow:CUSTOMERS:allow: ["FB, IG"]ORDERS:allow: ["product","service"]conditions:"o_orderkey < 5000"RETURNS:The preceding sample consists of the following elements:
- Data of object type
TABLEin the catalogpostgresand the schemapublicgoes through replication. - From catalog
postgres, only theCUSTOMERS,ORDERS, andRETURNStables go through replication. - From
CUSTOMERStable, only theFBandIGcolumns go through replication. - From the
ORDERStable, only theproductandservicecolumns go through replication as long as those columns meet the condition inconditions. - Since the
RETURNStable doesn’t specify anything, the entire table goes through replication.
Unless you specify, Replicant replicates all tables in the catalog.
The following illustrates the format you must follow:
allow:catalog:<your_catalog_name>types:<your_object_type>allow:<your_table_name>:allow: ["your_column_name"]condtions:"your_condition"<your_table_name>:allow: ["your_column_name"]conditions:"your_condition"<your_table_name>:Set up Extractor configuration#
To configure replication according to your requirements, specify your configuration in the Extractor configuration file.
You can configure the following replication modes by specifying the parameters under their respective sections in the configuration file:
snapshotrealtimedelta-snapshot
See the following sections for more information.
For more information about different Replicant modes, seeRunning Replicant.
Configuresnapshot replication#
The following is a sample configuration for operating insnapshot mode:
snapshot:threads:16fetch-size-rows:5_000_traceDBTasks:truemin-job-size-rows:1_000_000max-jobs-per-chunk:32per-table-config: -catalog:tpchschema:publictables:lineitem:row-identifier-key: [l_orderkey, l_linenumber]split-key:l_orderkeysplit-hints:row-count-estimate:15000split-key-min-value:1split-key-max-value:60_00For more information about the configuration parameters forsnapshot mode, seeSnapshot Mode.
Configure real-time replication#
For real-time replication, you must create a heartbeat table in the source RDS for PostgreSQL database.
Create a heartbeat table in any schema of the database you are going to replicate with the following DDL:
CREATETABLE"<user_database>"."public"."replicate_io_cdc_heartbeat"("timestamp" INT8NOTNULL,PRIMARYKEY("timestamp"))Grant
INSERT,UPDATE, andDELETEprivileges to the user configured for replication.Specify your configuration under the
realtimesection of the Extractor configuration file. For example:realtime:threads:4fetch-size-rows:10000fetch-duration-per-extractor-slot-s:3_traceDBTasks:trueheartbeat:enable:truecatalog:"postgres"schema:"public"table-name:replicate_io_cdc_heartbeatcolumn-name:timestampstart-position:start-lsn:0/3261270
For more information about the configuration parameters forrealtime mode, seeRealtime Mode.
Support for DDL replication#
Replicantsupports DDL replication for real-time RDS for PostgreSQL source. For more information,contact us.
Replication without replication-slots#
If can’t create replication slots in RDS for PostgreSQL usingwal2json, then you can use a third mode of replication calleddelta snapshot. In delta snapshot, Replicant uses RDS for PostgreSQL’s internal column to identify changes.
Caution: We strongly recommend that you specifyarow-identifier-keyintheper-table-configsection for a table that does not have a primary key or a unique key defined.
You can specify your configuration under thedelta-snapshot section of the Extractor configuration file. For example:
delta-snapshot:row-identifier-key: [orderkey,suppkey]update-key: [partkey]replicate-deletes:true|falseper-table-config: -catalog:tpchschema:publictables:lineitem1:row-identifier-key: [l_orderkey, l_linenumber]split-key:l_orderkeyreplicate-deletes:falseFor more information about the configuration parameters fordelta-snapshot mode, seeDelta-snapshot Mode.