Movatterモバイル変換


[0]ホーム

URL:



Facebook
Postgres Pro
Facebook
Request a Demo
H.3. wal2json — convert WAL changes into JSON via logical decoding
Prev UpAppendix H. Third-Party Modules and Extensions Shipped as Individual PackagesHome Next

H.3. wal2json — convert WAL changes into JSON via logical decoding#

Thewal2json module is aPostgres Pro extension for logical decoding that converts database changes from the write-ahead log (WAL) intoJSON format.Postgres Pro has access to tuples generated byINSERT andUPDATE operations. Depending on the configured replica identity, it can also access previous row versions ofUPDATE andDELETE. Changes can be consumed either through the streaming protocol (logical replication slots) or through a specialized SQL API.

Format version 1 generates a JSON object per transaction. This object contains all new and old tuples, with optional inclusion of properties such as transaction timestamps, schema-qualified names, data types, and transaction identifiers.

Format version 2 generates a JSON object per tuple, with optional JSON objects marking transaction start and end. Different tuple properties can also be included.

H.3.1. Installation and Setup#

Thewal2json extension is provided withPostgres Pro Standard as a separate pre-built packagewal2json-std-17 (for the detailed installation instructions, seeChapter 16). Once you havePostgres Pro Standard installed, complete the following steps to enablewal2json:

  1. Enable logical decoding by settingwal_level tological in thepostgresql.conf file.

    Optionally, you can also edit values of themax_replication_slots andmax_wal_senders parameters.

  2. Restart the database server for the changes to take effect.

H.3.2. Options#

Thewal2json module provides a variety of options for managing logical decoding:

include-xids

Add transaction IDs to each set of changes in the JSON output. The default isoff.

include-timestamp

Addtimestamp to each set of changes in the JSON output. The default isoff.

include-schemas

Add the schema name to each change record in the JSON output. The default ison.

include-types

Addtype to each change record in the JSON output. The default ison.

include-typmod

Add type modifiers for columns that have them. The default ison.

include-type-oids

Add type OIDs. The default isoff.

include-domain-data-type

Replace a domain name with the underlying data type. The default isoff.

include-column-positions

Add the column position (pg_attribute.attnum). The default isoff.

include-origin

Add the origin of each change. The default isoff.

include-not-null

Add information whether a column is marked asnot null in thecolumnoptionals field of the JSON output. The default isoff.

include-default

Add default expressions to the JSON output. The default isoff.

include-pk

Add primary key information including column names and data type to thepk field of the JSON output. The default isoff.

numeric-data-types-as-string

Convert values of numeric data types to strings in the JSON output. The JSON specification does not supportInfinity andNaN as valid numeric values. There might bepotential interoperability problems for double precision numbers. The default isoff.

pretty-print

Add spaces and indentation to the JSON output structure. The default isoff.

write-in-chunks

Enable writing the JSON output in smaller chunks instead of writing the entire changeset at once. This option is used only whenformat-version is set to1. The default isoff.

include-lsn

Add thenextlsn field to each changeset. The default isoff.

include-transaction

Add records denoting the start and end of each transaction to the JSON output. The default isoff.

filter-origins

Exclude changes from the specified origins. The default is empty, which means that no origin will be filtered. The value is a comma-separated list.

filter-tables

Exclude rows from the specified tables. The default is empty, which means that no table will be filtered. The value is a comma-separated list. The tables should be schema-qualified.*.foo means thefoo table in all schemas, andbar.* means all tables in thebar schema. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table names are case-sensitive. For example, table"public"."Foo bar" should be specified aspublic.Foo\ bar.

add-tables

Include rows only from the specified tables. The default is empty, which means that all tables from all schemas are included. It follows the same rules asfilter-tables.

filter-msg-prefixes

Exclude messages whose prefix is specified in the option value. The default is empty, which means that no messages will be filtered. The value is a comma-separated list.

add-msg-prefixes

Include only messages whose prefix is specified in the option value. The default is all prefixes. The value is a comma-separated list.wal2json appliesfilter-msg-prefixes before this option.

format-version

Define which output format to use. The default is1.

actions

Define which operations will be included in the JSON output. The default is all actions (INSERT,UPDATE,DELETE, andTRUNCATE). However, if you are usingformat-version = 1,TRUNCATE is not enabled (for backward compatibility).

H.3.3. Examples#

There are two ways to obtain the changes (JSON objects) fromwal2json:calling functions via SQL orpg_recvlogical.

H.3.3.1. pg_recvlogical#

This is an example how to obtain JSON objects fromwal2json usingpg_recvlogical. Besides the configuration mentioned above, it is necessary to configure a replication connection to usepg_recvlogical. SincePostgres Pro version 10, logical replication matches a normal entry with a database name or keywords such asall.

To configure a replication connection and database parameters:

  1. Add a replication connection rule topg_hba.conf:

              local    mydatabase      myuser                     trust
  2. Optionally, setmax_wal_senders inpostgresql.conf:

              max_wal_senders = 1
  3. Restart the database server if you changedmax_wal_senders.

To obtain JSON objects fromwal2json:

  1. Open a terminal and connect to the database:

             $ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json         $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
  2. In another terminal:

            $ cat /tmp/example1.sql        CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));        CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);        BEGIN;        INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());        INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());        INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');        DELETE FROM table1_with_pk WHERE a < 3;        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');        INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');        -- it is not added to stream because there isn't a pk or a replica identity        UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';        COMMIT;        DROP TABLE table1_with_pk;        DROP TABLE table1_without_pk;        $ psql -At -f /tmp/example1.sql postgres        CREATE TABLE        CREATE TABLE        BEGIN        INSERT 0 1        INSERT 0 1        INSERT 0 1        3/78BFC828        3/78BFC880        DELETE 2        3/78BFC990        INSERT 0 1        UPDATE 1        COMMIT        DROP TABLE        DROP TABLE
  3. The output in the first terminal might look like this:

              $ psql -At -f /tmp/example2.sql postgres          CREATE TABLE          CREATE TABLE          init          BEGIN          INSERT 0 1          INSERT 0 1          INSERT 0 1          3/78C2CA50          3/78C2CAA8          DELETE 2          3/78C2CBD8          INSERT 0 1          UPDATE 1          COMMIT          {              "change": [                  {                      "kind": "message",                      "transactional": false,                      "prefix": "wal2json",                      "content": "this non-transactional message will be delivered even if you rollback the transaction"                  }              ]          }          psql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing          {            "change": [              {                "kind": "insert",                "schema": "public",                "table": "table2_with_pk",                "columnnames": ["a", "b", "c"],                "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],                "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]              }              ,{                "kind": "insert",                "schema": "public",                "table": "table2_with_pk",                "columnnames": ["a", "b", "c"],                "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],                "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]              }              ,{                "kind": "insert",                "schema": "public",                "table": "table2_with_pk",                "columnnames": ["a", "b", "c"],                "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],                "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]              }                  ,{                      "kind": "message",                      "transactional": true,                      "prefix": "wal2json",                      "content": "this message will be delivered"                  }              ,{                "kind": "delete",                "schema": "public",                "table": "table2_with_pk",                "oldkeys": {                  "keynames": ["a", "c"],                  "keytypes": ["integer", "timestamp without time zone"],                  "keyvalues": [1, "2018-03-27 12:05:29.914496"]                }              }              ,{                "kind": "delete",                "schema": "public",                "table": "table2_with_pk",                "oldkeys": {                  "keynames": ["a", "c"],                  "keytypes": ["integer", "timestamp without time zone"],                  "keyvalues": [2, "2018-03-27 12:05:29.914496"]                }              }              ,{                "kind": "insert",                "schema": "public",                "table": "table2_without_pk",                "columnnames": ["a", "b", "c"],                "columntypes": ["integer", "numeric(5,2)", "text"],                "columnvalues": [1, 2.34, "Tapir"]              }            ]          }          stop          DROP TABLE          DROP TABLE
  4. To drop the slot in the first terminal:

            Ctrl+C        $ pg_recvlogical -d postgres --slot test_slot --drop-slot

H.3.3.2. Calling SQL Functions#

These are examples how to obtain changes fromwal2json via SQL.

Ifformat-version is set to1, the script might look like this:

        $ cat /tmp/example2.sql        CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));        CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);        SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');        BEGIN;        INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());        INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());        INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');        DELETE FROM table2_with_pk WHERE a < 3;        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');        INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');        -- it is not added to stream because there isn't a pk or a replica identity        UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';        COMMIT;        SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');        SELECT 'stop' FROM pg_drop_replication_slot('test_slot');        DROP TABLE table2_with_pk;        DROP TABLE table2_without_pk;

The expected output might look like this:

        $ psql -At -f /tmp/example2.sql postgres        CREATE TABLE        CREATE TABLE        init        BEGIN        INSERT 0 1        INSERT 0 1        INSERT 0 1        3/78C2CA50        3/78C2CAA8        DELETE 2        3/78C2CBD8        INSERT 0 1        UPDATE 1        COMMIT        {            "change": [                {                    "kind": "message",                    "transactional": false,                    "prefix": "wal2json",                    "content": "this non-transactional message will be delivered even if you rollback the transaction"                }            ]        }        psql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing        {          "change": [            {              "kind": "insert",              "schema": "public",              "table": "table2_with_pk",              "columnnames": ["a", "b", "c"],              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],              "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]            }            ,{              "kind": "insert",              "schema": "public",              "table": "table2_with_pk",              "columnnames": ["a", "b", "c"],              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],              "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]            }            ,{              "kind": "insert",              "schema": "public",              "table": "table2_with_pk",              "columnnames": ["a", "b", "c"],              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],              "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]            }                ,{                    "kind": "message",                    "transactional": true,                    "prefix": "wal2json",                    "content": "this message will be delivered"                }            ,{              "kind": "delete",              "schema": "public",              "table": "table2_with_pk",              "oldkeys": {                "keynames": ["a", "c"],                "keytypes": ["integer", "timestamp without time zone"],                "keyvalues": [1, "2018-03-27 12:05:29.914496"]              }            }            ,{              "kind": "delete",              "schema": "public",              "table": "table2_with_pk",              "oldkeys": {                "keynames": ["a", "c"],                "keytypes": ["integer", "timestamp without time zone"],                "keyvalues": [2, "2018-03-27 12:05:29.914496"]              }            }            ,{              "kind": "insert",              "schema": "public",              "table": "table2_without_pk",              "columnnames": ["a", "b", "c"],              "columntypes": ["integer", "numeric(5,2)", "text"],              "columnvalues": [1, 2.34, "Tapir"]            }          ]        }        stop        DROP TABLE        DROP TABLE

Ifformat-version is set to2, the script might look like this:

        $ cat /tmp/example3.sql        CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));        CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);        SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');        BEGIN;        INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());        INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());        INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');        DELETE FROM table3_with_pk WHERE a < 3;        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');        INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');        -- it is not added to stream because there isn't a pk or a replica identity        UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';        COMMIT;        SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');        SELECT 'stop' FROM pg_drop_replication_slot('test_slot');        DROP TABLE table3_with_pk;        DROP TABLE table3_without_pk;

The expected output might look like this:

        $ psql -At -f /tmp/example3.sql postgres        CREATE TABLE        CREATE TABLE        init        BEGIN        INSERT 0 1        INSERT 0 1        INSERT 0 1        3/78CB8F30        3/78CB8F88        DELETE 2        3/78CB90B8        INSERT 0 1        UPDATE 1        COMMIT        psql:/tmp/example3.sql:20: WARNING:  no tuple identifier for UPDATE in table "public"."table3_without_pk"        {"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}        {"action":"B"}        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}        {"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}        {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}        {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}        {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}        {"action":"C"}        stop        DROP TABLE        DROP TABLE

Prev Up Next
H.2. tds_fdw — connect to databases that use the TDS protocol Home Appendix I. Additional Supplied Programs
epubpdf
Go to Postgres Pro Standard 17
By continuing to browse this website, you agree to the use of cookies. Go toPrivacy Policy.

[8]ページ先頭

©2009-2025 Movatter.jp