Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit37862be

Browse files
committed
2PC decoding and tablesync.
Attempt to cover subtle issues with tablesync and 2PC decoding. Before going toSYNCDONE, now sync worker finishes all xacts prepared beforehand; seetablesync.c header comments.To make this work, 2PC decoding is now controlled by pgoutput option which isset by subscriber depending on value of logical_replication_2pc GUC. Tablesyncworker always disables 2PC.Add option to return num of unfinished prepares in CREATE_REPLICATION_SLOT. Topreserve backward compatibility, e.g. with basebackup which expects 4 atts.
1 parent31495de commit37862be

File tree

23 files changed

+948
-364
lines changed

23 files changed

+948
-364
lines changed

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include"mb/pg_wchar.h"
2828
#include"miscadmin.h"
2929
#include"pgstat.h"
30+
#include"replication/logicalworker.h"
3031
#include"replication/walreceiver.h"
3132
#include"utils/builtins.h"
3233
#include"utils/memutils.h"
@@ -73,7 +74,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
7374
constchar*slotname,
7475
booltemporary,
7576
CRSSnapshotActionsnapshot_action,
76-
XLogRecPtr*lsn);
77+
XLogRecPtr*lsn,
78+
int*num_unfinished_prepares);
7779
staticWalRcvExecResult*libpqrcv_exec(WalReceiverConn*conn,
7880
constchar*query,
7981
constintnRetTypes,
@@ -419,6 +421,22 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
419421
PQfreemem(pubnames_literal);
420422
pfree(pubnames_str);
421423

424+
/*
425+
* If logical_replication_2pc is off, don't include it at all --
426+
* probably we are talking to vanilla server.
427+
*/
428+
if (options->proto.logical.twophase!=LOGICAL_REPLICATION_2PC_OFF)
429+
{
430+
appendStringInfo(&cmd,", twophase '%u'",
431+
options->proto.logical.twophase);
432+
}
433+
434+
/* Same with prepare_notifies */
435+
if (options->proto.logical.prepare_notifies)
436+
{
437+
appendStringInfo(&cmd,", prepare_notifies 'true'");
438+
}
439+
422440
appendStringInfoChar(&cmd,')');
423441
}
424442
else
@@ -790,7 +808,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
790808
staticchar*
791809
libpqrcv_create_slot(WalReceiverConn*conn,constchar*slotname,
792810
booltemporary,CRSSnapshotActionsnapshot_action,
793-
XLogRecPtr*lsn)
811+
XLogRecPtr*lsn,int*num_unfinished_prepares)
794812
{
795813
PGresult*res;
796814
StringInfoDatacmd;
@@ -818,6 +836,10 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
818836
appendStringInfoString(&cmd," USE_SNAPSHOT");
819837
break;
820838
}
839+
if (num_unfinished_prepares!=NULL)
840+
{
841+
appendStringInfoString(&cmd," COUNT_PREPARES");
842+
}
821843
}
822844

823845
res=libpqrcv_PQexec(conn->streamConn,cmd.data);
@@ -838,6 +860,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
838860
else
839861
snapshot=NULL;
840862

863+
if (num_unfinished_prepares!=NULL&& (PQnfields(res)==5))
864+
*num_unfinished_prepares=pg_atoi(PQgetvalue(res,0,4),sizeof(int32),0);
865+
841866
PQclear(res);
842867

843868
returnsnapshot;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp