Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbe65edd

Browse files
committed
Add required database and origin filtering for logical messages.
Logical messages, added in3fe3511, during decoding failed to filtermessages emitted in other databases and messages emitted "under" areplication origin the output plugin isn't interested in.Add tests to verify that both types of filtering actually work. Whiletouching message.sql remove hunk obsoleted byd25379e.Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because3fe3511 had omitted doing so.3fe3511 additionally didn't bumpcatversion, but7a54270 has done so since.Author: Petr JelinekReported-By: Andres FreundDiscussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de
1 parent80abbeb commitbe65edd

File tree

8 files changed

+64
-15
lines changed

8 files changed

+64
-15
lines changed

‎contrib/test_decoding/expected/messages.out

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
-- predictability
22
SET synchronous_commit = on;
3-
SET client_encoding = 'utf8';
43
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
54
?column?
65
----------
@@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
7170
message: transactional: 1 prefix: test, sz: 11 content:czechtastic
7271
(7 rows)
7372

74-
SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
73+
-- test db filtering
74+
\set prevdb :DBNAME
75+
\c template1
76+
SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
7577
?column?
7678
----------
77-
init
79+
otherdb1
80+
(1 row)
81+
82+
SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
83+
?column?
84+
----------
85+
otherdb2
86+
(1 row)
87+
88+
\c :prevdb
89+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
90+
data
91+
------
92+
(0 rows)
93+
94+
SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
95+
?column?
96+
----------
97+
cleanup
7898
(1 row)
7999

‎contrib/test_decoding/expected/replorigin.out

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
5959
-- ensure we prevent duplicate setup
6060
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
6161
ERROR: cannot setup replication origin when one is already setup
62+
SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
63+
?column?
64+
----------
65+
66+
(1 row)
67+
6268
BEGIN;
6369
-- setup transaction origin
6470
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');

‎contrib/test_decoding/sql/messages.sql

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
-- predictability
22
SET synchronous_commit=on;
3-
SET client_encoding='utf8';
43

54
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','test_decoding');
65

@@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
2221

2322
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'force-binary','0','skip-empty-xacts','1');
2423

25-
SELECT'init'FROM pg_drop_replication_slot('regression_slot');
24+
-- test db filtering
25+
\set prevdb :DBNAME
26+
\c template1
27+
28+
SELECT'otherdb1'FROM pg_logical_emit_message(false,'test','otherdb1');
29+
SELECT'otherdb2'FROM pg_logical_emit_message(true,'test','otherdb2');
30+
31+
\c :prevdb
32+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'force-binary','0','skip-empty-xacts','1');
33+
34+
SELECT'cleanup'FROM pg_drop_replication_slot('regression_slot');

‎contrib/test_decoding/sql/replorigin.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
3131
-- ensure we prevent duplicate setup
3232
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
3333

34+
SELECT''FROM pg_logical_emit_message(false,'test','this message will not be decoded');
35+
3436
BEGIN;
3537
-- setup transaction origin
3638
SELECT pg_replication_origin_xact_setup('0/aabbccdd','2013-01-01 00:00');

‎src/backend/replication/logical/decode.c

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
464464
}
465465
}
466466

467+
staticinlinebool
468+
FilterByOrigin(LogicalDecodingContext*ctx,RepOriginIdorigin_id)
469+
{
470+
if (ctx->callbacks.filter_by_origin_cb==NULL)
471+
return false;
472+
473+
returnfilter_by_origin_cb_wrapper(ctx,origin_id);
474+
}
475+
467476
/*
468477
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
469478
*/
@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
474483
XLogReaderState*r=buf->record;
475484
TransactionIdxid=XLogRecGetXid(r);
476485
uint8info=XLogRecGetInfo(r)& ~XLR_INFO_MASK;
486+
RepOriginIdorigin_id=XLogRecGetOrigin(r);
477487
Snapshotsnapshot;
478488
xl_logical_message*message;
479489

@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
488498

489499
message= (xl_logical_message*)XLogRecGetData(r);
490500

501+
if (message->dbId!=ctx->slot->data.database||
502+
FilterByOrigin(ctx,origin_id))
503+
return;
504+
491505
if (message->transactional&&
492506
!SnapBuildProcessChange(builder,xid,buf->origptr))
493507
return;
@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
504518
message->message+message->prefix_size);
505519
}
506520

507-
staticinlinebool
508-
FilterByOrigin(LogicalDecodingContext*ctx,RepOriginIdorigin_id)
509-
{
510-
if (ctx->callbacks.filter_by_origin_cb==NULL)
511-
return false;
512-
513-
returnfilter_by_origin_cb_wrapper(ctx,origin_id);
514-
}
515-
516521
/*
517522
* Consolidated commit record handling between the different form of commit
518523
* records.

‎src/backend/replication/logical/message.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
#include"postgres.h"
3333

34+
#include"miscadmin.h"
35+
3436
#include"access/xact.h"
3537

3638
#include"catalog/indexing.h"
@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
6062
GetCurrentTransactionId();
6163
}
6264

65+
xlrec.dbId=MyDatabaseId;
6366
xlrec.transactional=transactional;
6467
xlrec.prefix_size=strlen(prefix)+1;
6568
xlrec.message_size=size;
@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
6972
XLogRegisterData((char*)prefix,xlrec.prefix_size);
7073
XLogRegisterData((char*)message,size);
7174

75+
/* allow origin filtering */
76+
XLogIncludeOrigin();
77+
7278
returnXLogInsert(RM_LOGICALMSG_ID,XLOG_LOGICAL_MESSAGE);
7379
}
7480

‎src/include/access/xlog_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#defineXLOG_PAGE_MAGIC0xD090/* can be used as WAL version indicator */
34+
#defineXLOG_PAGE_MAGIC0xD091/* can be used as WAL version indicator */
3535

3636
typedefstructXLogPageHeaderData
3737
{

‎src/include/replication/message.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020
typedefstructxl_logical_message
2121
{
22+
OiddbId;/* database Oid emitted from */
2223
booltransactional;/* is message transactional? */
2324
Sizeprefix_size;/* length of prefix */
2425
Sizemessage_size;/* size of the message */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp