Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit576d589

Browse files
committed
2PC in pgoutput for shardman: append gid to avoid collisions.
For all non-shardman xacts 2PC decoding is disabled. Also, add GUC enabling 2PCdecoding always in pgoutput to test it (011_twophase.pl).
1 parentf46f9d8 commit576d589

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

‎src/backend/replication/logical/proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data,
140140
*/
141141
void
142142
logicalrep_write_prepare(StringInfoout,ReorderBufferTXN*txn,
143-
XLogRecPtrprepare_lsn)
143+
XLogRecPtrprepare_lsn,constchar*gid)
144144
{
145145
uint8flags=0;
146146

@@ -165,7 +165,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
165165
pq_sendint64(out,txn->commit_time);
166166

167167
/* send gid */
168-
pq_sendstring(out,txn->gid);
168+
pq_sendstring(out,gid);
169169
}
170170

171171
/*

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,18 @@
1212
*/
1313
#include"postgres.h"
1414

15+
#include"access/genam.h"
16+
#include"access/heapam.h"
1517
#include"catalog/pg_publication.h"
1618

19+
#include"nodes/makefuncs.h"
20+
1721
#include"replication/logical.h"
1822
#include"replication/logicalproto.h"
1923
#include"replication/origin.h"
2024
#include"replication/pgoutput.h"
2125

26+
#include"utils/guc.h"
2227
#include"utils/inval.h"
2328
#include"utils/int8.h"
2429
#include"utils/memutils.h"
@@ -27,6 +32,8 @@
2732

2833
PG_MODULE_MAGIC;
2934

35+
externvoid_PG_init(void);
36+
3037
externvoid_PG_output_plugin_init(OutputPluginCallbacks*cb);
3138

3239
staticvoidpgoutput_startup(LogicalDecodingContext*ctx,
@@ -62,6 +69,7 @@ static bool publications_valid;
6269
staticList*LoadPublications(List*pubnames);
6370
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
6471
uint32hashvalue);
72+
staticchar*append_shardman_node_id(constchar*gid);
6573

6674
/* Entry in the map used to remember which relation schemas we sent. */
6775
typedefstructRelationSyncEntry
@@ -75,12 +83,29 @@ typedef struct RelationSyncEntry
7583
/* Map used to remember which relation schemas we sent. */
7684
staticHTAB*RelationSyncCache=NULL;
7785

86+
/* GUC just for tests */
87+
staticbooluse_twophase;
88+
7889
staticvoidinit_rel_sync_cache(MemoryContextdecoding_context);
7990
staticRelationSyncEntry*get_rel_sync_entry(PGOutputData*data,Oidrelid);
8091
staticvoidrel_sync_cache_relation_cb(Datumarg,Oidrelid);
8192
staticvoidrel_sync_cache_publication_cb(Datumarg,intcacheid,
8293
uint32hashvalue);
8394

95+
void
96+
_PG_init(void)
97+
{
98+
DefineCustomBoolVariable(
99+
"pgoutput.use_twophase",
100+
"Toggle 2PC",
101+
NULL,
102+
&use_twophase,
103+
false,
104+
PGC_SUSET,
105+
0,
106+
NULL,NULL,NULL);
107+
}
108+
84109
/*
85110
* Specify output plugin callbacks
86111
*/
@@ -337,10 +362,17 @@ static void
337362
pgoutput_prepare_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
338363
XLogRecPtrprepare_lsn)
339364
{
365+
char*gid=txn->gid;
366+
340367
OutputPluginUpdateProgress(ctx);
341368

342369
OutputPluginPrepareWrite(ctx, true);
343-
logicalrep_write_prepare(ctx->out,txn,prepare_lsn);
370+
/* Append :sysid to gid to avoid collision */
371+
if (strstr(gid,"pgfdw:")!=NULL)
372+
gid=psprintf("%s:%lx",txn->gid,GetSystemIdentifier());
373+
logicalrep_write_prepare(ctx->out,txn,prepare_lsn,gid);
374+
if (strstr(gid,"pgfdw:")!=NULL)
375+
pfree(gid);
344376
OutputPluginWrite(ctx, true);
345377
}
346378

@@ -351,23 +383,38 @@ static void
351383
pgoutput_commit_prepared_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
352384
XLogRecPtrprepare_lsn)
353385
{
386+
char*gid=txn->gid;
387+
354388
OutputPluginUpdateProgress(ctx);
355389

356390
OutputPluginPrepareWrite(ctx, true);
357-
logicalrep_write_prepare(ctx->out,txn,prepare_lsn);
391+
/* Append :sysid to gid to avoid collision */
392+
if (strstr(gid,"pgfdw:")!=NULL)
393+
gid=psprintf("%s:%lx",txn->gid,GetSystemIdentifier());
394+
logicalrep_write_prepare(ctx->out,txn,prepare_lsn,gid);
395+
if (strstr(gid,"pgfdw:")!=NULL)
396+
pfree(gid);
358397
OutputPluginWrite(ctx, true);
359398
}
399+
360400
/*
361401
* PREPARE callback
362402
*/
363403
staticvoid
364404
pgoutput_abort_prepared_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
365405
XLogRecPtrprepare_lsn)
366406
{
407+
char*gid=txn->gid;
408+
367409
OutputPluginUpdateProgress(ctx);
368410

369411
OutputPluginPrepareWrite(ctx, true);
370-
logicalrep_write_prepare(ctx->out,txn,prepare_lsn);
412+
/* Append :sysid to gid to avoid collision */
413+
if (strstr(gid,"pgfdw:")!=NULL)
414+
gid=psprintf("%s:%lx",txn->gid,GetSystemIdentifier());
415+
logicalrep_write_prepare(ctx->out,txn,prepare_lsn,gid);
416+
if (strstr(gid,"pgfdw:")!=NULL)
417+
pfree(gid);
371418
OutputPluginWrite(ctx, true);
372419
}
373420

@@ -502,13 +549,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
502549
/*
503550
* Filter out unnecessary two-phase transactions.
504551
*
505-
*Currently, we forward all two-phase transactions
552+
*Make 2PC on shardman's xacts.
506553
*/
507554
staticbool
508555
pgoutput_filter_prepare(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
509-
TransactionIdxid,constchar*gid)
556+
TransactionIdxid,constchar*gid)
510557
{
511-
return false;
558+
if (strstr(gid,"pgfdw:")!=NULL)/* shardman */
559+
{
560+
return false;
561+
}
562+
return !use_twophase;
512563
}
513564

514565
/*

‎src/include/replication/logicalproto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
9292
externvoidlogicalrep_write_abort(StringInfoout,ReorderBufferTXN*txn,
9393
XLogRecPtrabort_lsn);
9494
externvoidlogicalrep_write_prepare(StringInfoout,ReorderBufferTXN*txn,
95-
XLogRecPtrprepare_lsn);
95+
XLogRecPtrprepare_lsn,constchar*gid);
9696
externvoidlogicalrep_read_commit(StringInfoin,
9797
LogicalRepCommitData*commit_data,uint8*flags);
9898
externvoidlogicalrep_read_prepare(StringInfoin,

‎src/test/subscription/t/011_twophase.pl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
$node_publisher->append_conf(
1212
'postgresql.conf',qq(
1313
max_prepared_transactions = 10
14+
pgoutput.use_twophase = true
1415
));
1516
$node_publisher->start;
1617

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp