Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit87f3525

Browse files
committed
2 parentsb23bb5e +11a14de commit87f3525

File tree

5 files changed

+113
-66
lines changed

5 files changed

+113
-66
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3747,6 +3747,11 @@ static bool MtmProcessDDLCommand(char const* queryString)
37473747
return false;
37483748
}
37493749

3750+
staticvoidMtmFinishDDLCommand()
3751+
{
3752+
LogLogicalMessage("E","",1, true);
3753+
}
3754+
37503755
voidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize)
37513756
{
37523757
intallocated;
@@ -3768,6 +3773,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
37683773
DestReceiver*dest,char*completionTag)
37693774
{
37703775
boolskipCommand= false;
3776+
boolexecuted= false;
37713777

37723778
MTM_LOG3("%d: Process utility statement %s",MyProcPid,queryString);
37733779
switch (nodeTag(parsetree))
@@ -3912,6 +3918,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39123918
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
39133919
MtmUtilityProcessedInXid=GetCurrentTransactionId();
39143920
MtmProcessDDLCommand(queryString);
3921+
executed= true;
39153922
}
39163923
}
39173924

@@ -3933,6 +3940,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39333940
MtmTx.snapshot=INVALID_CSN;
39343941
}
39353942

3943+
if (executed)
3944+
{
3945+
MtmFinishDDLCommand();
3946+
}
39363947
}
39373948

39383949

@@ -3967,12 +3978,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
39673978
}
39683979
}
39693980

3970-
// if (MyXactAccessedRel)
3971-
// {
3972-
// MTM_LOG1("MtmTx.containsDML = true");
3973-
// MtmTx.containsDML = true;
3974-
// }
3975-
39763981
if (PreviousExecutorFinishHook!=NULL)
39773982
{
39783983
PreviousExecutorFinishHook(queryDesc);

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62-
staticboolinside_tx= false;
63-
6462
staticRelationread_rel(StringInfos,LOCKMODEmode);
6563
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
6664
staticEState*create_rel_estate(Relationrel);
@@ -353,8 +351,6 @@ process_remote_begin(StringInfo s)
353351
StartTransactionCommand();
354352
MtmJoinTransaction(&gtid,snapshot);
355353

356-
inside_tx= true;
357-
358354
return true;
359355
}
360356

@@ -364,11 +360,6 @@ process_remote_transactional_message(StringInfo s)
364360
intrc;
365361
intmessageSize=pq_getmsgint(s,4);
366362
charconst*stmt=pq_getmsgbytes(s,messageSize);
367-
if (!inside_tx)
368-
{
369-
MTM_LOG1("%d: Ignoring utility statement %s",MyProcPid,stmt);
370-
return;
371-
}
372363

373364
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
374365
SPI_connect();
@@ -673,7 +664,6 @@ process_remote_commit(StringInfo in)
673664
if (flags&PGLOGICAL_CAUGHT_UP) {
674665
MtmRecoveryCompleted();
675666
}
676-
inside_tx= false;
677667
}
678668

679669
staticvoid
@@ -998,7 +988,7 @@ void MtmExecutor(void* work, size_t size)
998988
{
999989
while (true) {
1000990
charaction=pq_getmsgbyte(&s);
1001-
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
991+
MTM_LOG1("%d: REMOTE process action %c",MyProcPid,action);
1002992
#if0
1003993
if (Mtm->status==MTM_RECOVERY) {
1004994
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);
@@ -1060,6 +1050,7 @@ void MtmExecutor(void* work, size_t size)
10601050
continue;
10611051
}
10621052
case'G':
1053+
case'E':
10631054
{
10641055
process_remote_transactional_message(&s);
10651056
continue;

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 94 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
#include"multimaster.h"
3939
#include"pglogical_relid_map.h"
4040

41-
staticboolMtmIsFilteredTxn;
42-
staticintMtmTransactionRecords;
41+
staticintMtmTransactionRecords;
42+
staticTransactionIdMtmCurrentXid;
43+
staticboolDDLInProress= false;
4344

4445
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4546

@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7475
constchar*relname;
7576
uint8relnamelen;
7677
Oidrelid;
77-
if (MtmIsFilteredTxn) {
78+
79+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN) {
80+
MTM_LOG1("%d: pglogical_write_message filtered",MyProcPid);
7881
return;
7982
}
80-
83+
84+
if (DDLInProress) {
85+
MTM_LOG1("%d: pglogical_write_message filtered DDLInProress",MyProcPid);
86+
return;
87+
}
88+
8189
relid=RelationGetRelid(rel);
8290
pq_sendbyte(out,'R');/* sending RELATION */
8391
pq_sendint(out,relid,sizeofrelid);/* use Oid as relation identifier */
@@ -107,36 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107115
{
108116
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
109117
csn_tcsn=MtmTransactionSnapshot(txn->xid);
118+
119+
MtmCurrentXid=txn->xid;
120+
110121
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111122
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
112-
113-
if (!isRecovery&&csn==INVALID_CSN) {
114-
MtmIsFilteredTxn= true;
115-
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered",MyProcPid,txn->xid);
116-
}else {
117-
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
118-
MtmIsFilteredTxn= false;
119-
pq_sendbyte(out,'B');/* BEGIN */
120-
pq_sendint(out,MtmNodeId,4);
121-
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
122-
pq_sendint64(out,csn);
123-
MtmTransactionRecords=0;
124-
}
123+
124+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
125+
pq_sendbyte(out,'B');/* BEGIN */
126+
pq_sendint(out,MtmNodeId,4);
127+
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
128+
pq_sendint64(out,csn);
129+
MtmTransactionRecords=0;
125130
}
126131

127132
staticvoid
128133
pglogical_write_message(StringInfoout,
129134
constchar*prefix,Sizesz,constchar*message)
130135
{
131-
if (*prefix=='L') {
136+
if (*prefix=='L')
137+
{
132138
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
133-
}else {
134-
if (MtmIsFilteredTxn)
139+
}
140+
elseif (*prefix=='G')
141+
{
142+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
135143
{
136-
MTM_LOG3("%d: pglogical_write_message filtered",MyProcPid);
144+
MTM_LOG1("%d: pglogical_write_message filtered",MyProcPid);
137145
return;
138146
}
147+
DDLInProress= true;
139148
}
149+
elseif (*prefix=='E')
150+
{
151+
DDLInProress= false;
152+
}
153+
140154
pq_sendbyte(out,*prefix);
141155
pq_sendint(out,sz,4);
142156
pq_sendbytes(out,message,sz);
@@ -169,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
169183
Assert(flags!=PGLOGICAL_COMMIT_PREPARED||txn->xid<1000||MtmTransactionRecords!=1);
170184

171185
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
172-
if (MtmIsFilteredTxn) {
173-
Assert(MtmTransactionRecords==0);
174-
return;
175-
}
186+
//if (MtmIsFilteredTxn) {
187+
//Assert(MtmTransactionRecords == 0);
188+
//return;
189+
//}
176190
}else {
177191
csn_tcsn=MtmTransactionSnapshot(txn->xid);
178192
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
@@ -242,11 +256,20 @@ static void
242256
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
243257
Relationrel,HeapTuplenewtuple)
244258
{
245-
if (!MtmIsFilteredTxn) {
246-
MtmTransactionRecords+=1;
247-
pq_sendbyte(out,'I');/* action INSERT */
248-
pglogical_write_tuple(out,data,rel,newtuple);
259+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
260+
MTM_LOG1("%d: pglogical_write_insert filtered",MyProcPid);
261+
return;
262+
}
263+
264+
if (DDLInProress) {
265+
MTM_LOG1("%d: pglogical_write_insert filtered DDLInProress",MyProcPid);
266+
return;
249267
}
268+
269+
MtmTransactionRecords+=1;
270+
pq_sendbyte(out,'I');/* action INSERT */
271+
pglogical_write_tuple(out,data,rel,newtuple);
272+
250273
}
251274

252275
/*
@@ -256,23 +279,30 @@ static void
256279
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
257280
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
258281
{
259-
if (!MtmIsFilteredTxn) {
260-
MtmTransactionRecords+=1;
282+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
283+
MTM_LOG1("%d: pglogical_write_update filtered",MyProcPid);
284+
return;
285+
}
261286

262-
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
287+
if (DDLInProress) {
288+
MTM_LOG1("%d: pglogical_write_update filtered DDLInProress",MyProcPid);
289+
return;
290+
}
263291

292+
MtmTransactionRecords+=1;
264293

265-
pq_sendbyte(out,'U');/* action UPDATE */
266-
/* FIXME support whole tuple (O tuple type) */
267-
if (oldtuple!=NULL)
268-
{
269-
pq_sendbyte(out,'K');/* old key follows */
270-
pglogical_write_tuple(out,data,rel,oldtuple);
271-
}
272-
273-
pq_sendbyte(out,'N');/* new tuple follows */
274-
pglogical_write_tuple(out,data,rel,newtuple);
294+
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
295+
296+
pq_sendbyte(out,'U');/* action UPDATE */
297+
/* FIXME support whole tuple (O tuple type) */
298+
if (oldtuple!=NULL)
299+
{
300+
pq_sendbyte(out,'K');/* old key follows */
301+
pglogical_write_tuple(out,data,rel,oldtuple);
275302
}
303+
304+
pq_sendbyte(out,'N');/* new tuple follows */
305+
pglogical_write_tuple(out,data,rel,newtuple);
276306
}
277307

278308
/*
@@ -282,11 +312,19 @@ static void
282312
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
283313
Relationrel,HeapTupleoldtuple)
284314
{
285-
if (!MtmIsFilteredTxn) {
286-
MtmTransactionRecords+=1;
287-
pq_sendbyte(out,'D');/* action DELETE */
288-
pglogical_write_tuple(out,data,rel,oldtuple);
315+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
316+
MTM_LOG1("%d: pglogical_write_delete filtered",MyProcPid);
317+
return;
318+
}
319+
320+
if (DDLInProress) {
321+
MTM_LOG1("%d: pglogical_write_delete filtered DDLInProress",MyProcPid);
322+
return;
289323
}
324+
325+
MtmTransactionRecords+=1;
326+
pq_sendbyte(out,'D');/* action DELETE */
327+
pglogical_write_tuple(out,data,rel,oldtuple);
290328
}
291329

292330
/*
@@ -311,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
311349
inti;
312350
uint16nliveatts=0;
313351

352+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
353+
MTM_LOG1("%d: pglogical_write_tuple filtered",MyProcPid);
354+
return;
355+
}
356+
357+
if (DDLInProress) {
358+
MTM_LOG1("%d: pglogical_write_tuple filtered DDLInProress",MyProcPid);
359+
return;
360+
}
361+
314362
desc=RelationGetDescr(rel);
315363

316364
pq_sendbyte(out,'T');/* sending TUPLE */

‎src/backend/replication/logical/reorderbuffer.c‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2280,6 +2280,9 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
22802280

22812281
data= ((char*)rb->outbuf)+sizeof(ReorderBufferDiskChange);
22822282

2283+
/* might have been reallocated above */
2284+
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
2285+
22832286
/* write the prefix including the size */
22842287
memcpy(data,&prefix_size,sizeof(Size));
22852288
data+=sizeof(Size);

‎src/test/regress/serial_schedule‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ test: oidjoins
5050
test: type_sanity
5151
test: opr_sanity
5252
test: insert
53-
# test: insert_conflict # issue#17
53+
# test: insert_conflict
5454
test: create_function_1
5555
test: create_type
5656
test: create_table
@@ -147,11 +147,11 @@ test: indirect_toast
147147
test: equivclass
148148
test: plancache
149149
test: limit
150-
#test: plpgsql # issue#21
150+
test: plpgsql
151151
test: copy2
152152
test: temp
153153
test: domain
154-
#test: rangefuncs # issue#21
154+
test: rangefuncs
155155
test: prepare
156156
test: without_oid
157157
test: conversion

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp