12
12
*/
13
13
#include "postgres.h"
14
14
15
+ #include "access/genam.h"
16
+ #include "access/heapam.h"
15
17
#include "catalog/pg_publication.h"
16
18
19
+ #include "nodes/makefuncs.h"
20
+
17
21
#include "replication/logical.h"
18
22
#include "replication/logicalproto.h"
19
23
#include "replication/origin.h"
20
24
#include "replication/pgoutput.h"
21
25
26
+ #include "utils/guc.h"
22
27
#include "utils/inval.h"
23
28
#include "utils/int8.h"
24
29
#include "utils/memutils.h"
27
32
28
33
PG_MODULE_MAGIC ;
29
34
35
+ extern void _PG_init (void );
36
+
30
37
extern void _PG_output_plugin_init (OutputPluginCallbacks * cb );
31
38
32
39
static void pgoutput_startup (LogicalDecodingContext * ctx ,
@@ -62,6 +69,7 @@ static bool publications_valid;
62
69
static List * LoadPublications (List * pubnames );
63
70
static void publication_invalidation_cb (Datum arg ,int cacheid ,
64
71
uint32 hashvalue );
72
+ static char * append_shardman_node_id (const char * gid );
65
73
66
74
/* Entry in the map used to remember which relation schemas we sent. */
67
75
typedef struct RelationSyncEntry
@@ -75,12 +83,29 @@ typedef struct RelationSyncEntry
75
83
/* Map used to remember which relation schemas we sent. */
76
84
static HTAB * RelationSyncCache = NULL ;
77
85
86
+ /* GUC just for tests */
87
+ static bool use_twophase ;
88
+
78
89
static void init_rel_sync_cache (MemoryContext decoding_context );
79
90
static RelationSyncEntry * get_rel_sync_entry (PGOutputData * data ,Oid relid );
80
91
static void rel_sync_cache_relation_cb (Datum arg ,Oid relid );
81
92
static void rel_sync_cache_publication_cb (Datum arg ,int cacheid ,
82
93
uint32 hashvalue );
83
94
95
+ void
96
+ _PG_init (void )
97
+ {
98
+ DefineCustomBoolVariable (
99
+ "pgoutput.use_twophase" ,
100
+ "Toggle 2PC" ,
101
+ NULL ,
102
+ & use_twophase ,
103
+ false,
104
+ PGC_SUSET ,
105
+ 0 ,
106
+ NULL ,NULL ,NULL );
107
+ }
108
+
84
109
/*
85
110
* Specify output plugin callbacks
86
111
*/
@@ -337,10 +362,17 @@ static void
337
362
pgoutput_prepare_txn (LogicalDecodingContext * ctx ,ReorderBufferTXN * txn ,
338
363
XLogRecPtr prepare_lsn )
339
364
{
365
+ char * gid = txn -> gid ;
366
+
340
367
OutputPluginUpdateProgress (ctx );
341
368
342
369
OutputPluginPrepareWrite (ctx , true);
343
- logicalrep_write_prepare (ctx -> out ,txn ,prepare_lsn );
370
+ /* Append :sysid to gid to avoid collision */
371
+ if (strstr (gid ,"pgfdw:" )!= NULL )
372
+ gid = psprintf ("%s:%lx" ,txn -> gid ,GetSystemIdentifier ());
373
+ logicalrep_write_prepare (ctx -> out ,txn ,prepare_lsn ,gid );
374
+ if (strstr (gid ,"pgfdw:" )!= NULL )
375
+ pfree (gid );
344
376
OutputPluginWrite (ctx , true);
345
377
}
346
378
@@ -351,23 +383,38 @@ static void
351
383
pgoutput_commit_prepared_txn (LogicalDecodingContext * ctx ,ReorderBufferTXN * txn ,
352
384
XLogRecPtr prepare_lsn )
353
385
{
386
+ char * gid = txn -> gid ;
387
+
354
388
OutputPluginUpdateProgress (ctx );
355
389
356
390
OutputPluginPrepareWrite (ctx , true);
357
- logicalrep_write_prepare (ctx -> out ,txn ,prepare_lsn );
391
+ /* Append :sysid to gid to avoid collision */
392
+ if (strstr (gid ,"pgfdw:" )!= NULL )
393
+ gid = psprintf ("%s:%lx" ,txn -> gid ,GetSystemIdentifier ());
394
+ logicalrep_write_prepare (ctx -> out ,txn ,prepare_lsn ,gid );
395
+ if (strstr (gid ,"pgfdw:" )!= NULL )
396
+ pfree (gid );
358
397
OutputPluginWrite (ctx , true);
359
398
}
399
+
360
400
/*
361
401
* PREPARE callback
362
402
*/
363
403
static void
364
404
pgoutput_abort_prepared_txn (LogicalDecodingContext * ctx ,ReorderBufferTXN * txn ,
365
405
XLogRecPtr prepare_lsn )
366
406
{
407
+ char * gid = txn -> gid ;
408
+
367
409
OutputPluginUpdateProgress (ctx );
368
410
369
411
OutputPluginPrepareWrite (ctx , true);
370
- logicalrep_write_prepare (ctx -> out ,txn ,prepare_lsn );
412
+ /* Append :sysid to gid to avoid collision */
413
+ if (strstr (gid ,"pgfdw:" )!= NULL )
414
+ gid = psprintf ("%s:%lx" ,txn -> gid ,GetSystemIdentifier ());
415
+ logicalrep_write_prepare (ctx -> out ,txn ,prepare_lsn ,gid );
416
+ if (strstr (gid ,"pgfdw:" )!= NULL )
417
+ pfree (gid );
371
418
OutputPluginWrite (ctx , true);
372
419
}
373
420
@@ -502,13 +549,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
502
549
/*
503
550
* Filter out unnecessary two-phase transactions.
504
551
*
505
- *Currently, we forward all two-phase transactions
552
+ *Make 2PC on shardman's xacts.
506
553
*/
507
554
static bool
508
555
pgoutput_filter_prepare (LogicalDecodingContext * ctx ,ReorderBufferTXN * txn ,
509
- TransactionId xid ,const char * gid )
556
+ TransactionId xid ,const char * gid )
510
557
{
511
- return false;
558
+ if (strstr (gid ,"pgfdw:" )!= NULL )/* shardman */
559
+ {
560
+ return false;
561
+ }
562
+ return !use_twophase ;
512
563
}
513
564
514
565
/*