Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbb2b803

Browse files
committed
PGPRO-731 # Table copy fixes
1 parentc4a17b0 commitbb2b803

File tree

7 files changed

+26
-71
lines changed

7 files changed

+26
-71
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4408,7 +4408,8 @@ Datum mtm_broadcast_table(PG_FUNCTION_ARGS)
44084408
MtmCopyRequestcopy;
44094409
copy.sourceTable=PG_GETARG_OID(0);
44104410
copy.targetNodes=PG_GETARG_INT64(1);
4411-
LogLogicalMessage("B", (char*)&copy,sizeof(copy), false);
4411+
LogLogicalMessage("B", (char*)&copy,sizeof(copy), true);
4412+
MtmTx.containsDML= true;
44124413
PG_RETURN_VOID();
44134414
}
44144415

@@ -4417,7 +4418,8 @@ Datum mtm_copy_table(PG_FUNCTION_ARGS)
44174418
MtmCopyRequestcopy;
44184419
copy.sourceTable=PG_GETARG_OID(0);
44194420
copy.targetNodes= (nodemask_t)1 << (PG_GETARG_INT32(1)-1);
4420-
LogLogicalMessage("B", (char*)&copy,sizeof(copy), false);
4421+
LogLogicalMessage("B", (char*)&copy,sizeof(copy), true);
4422+
MtmTx.containsDML= true;
44214423
PG_RETURN_VOID();
44224424
}
44234425

‎contrib/mmts/pglogical_apply.c

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -368,49 +368,6 @@ process_remote_begin(StringInfo s)
368368
return true;
369369
}
370370

371-
staticvoid
372-
process_broadcast_table(StringInfos)
373-
{
374-
Relationrel;
375-
charch;
376-
EState*estate;
377-
TupleDatanew_tuple;
378-
TupleTableSlot*newslot;
379-
TupleTableSlot*oldslot;
380-
HeapTupletup;
381-
382-
StartTransactionCommand();
383-
384-
ch=pq_getmsgbyte(s);
385-
Assert(ch=='R');
386-
rel=read_rel(s,AccessExclusiveLock);
387-
388-
heap_truncate_one_rel(rel);
389-
390-
estate=create_rel_estate(rel);
391-
newslot=ExecInitExtraTupleSlot(estate);
392-
oldslot=ExecInitExtraTupleSlot(estate);
393-
ExecSetSlotDescriptor(newslot,RelationGetDescr(rel));
394-
ExecSetSlotDescriptor(oldslot,RelationGetDescr(rel));
395-
396-
ExecOpenIndices(estate->es_result_relation_info, false);
397-
398-
while (s->cursor!=s->len) {
399-
read_tuple_parts(s,rel,&new_tuple);
400-
tup=heap_form_tuple(RelationGetDescr(rel),
401-
new_tuple.values,new_tuple.isnull);
402-
ExecStoreTuple(tup,newslot,InvalidBuffer, true);
403-
simple_heap_insert(rel,newslot->tts_tuple);
404-
UserTableUpdateOpenIndexes(estate,newslot);
405-
}
406-
407-
ExecCloseIndices(estate->es_result_relation_info);
408-
ExecResetTupleTable(estate->es_tupleTable, true);
409-
FreeExecutorState(estate);
410-
411-
CommitTransactionCommand();
412-
}
413-
414371
staticbool
415372
process_remote_message(StringInfos)
416373
{
@@ -421,12 +378,6 @@ process_remote_message(StringInfo s)
421378

422379
switch (action)
423380
{
424-
case'B':
425-
{
426-
process_broadcast_table(s);
427-
standalone= true;
428-
break;
429-
}
430381
case'C':
431382
{
432383
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
@@ -1206,6 +1157,9 @@ void MtmExecutor(void* work, size_t size)
12061157
s.len=save_len;
12071158
break;
12081159
}
1160+
case'0':
1161+
heap_truncate_one_rel(rel);
1162+
break;
12091163
case'M':
12101164
{
12111165
close_rel(rel);

‎contrib/mmts/pglogical_output.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ static bool startup_message_sent = false;
8181

8282
#defineOUTPUT_BUFFER_SIZE (16*1024*1024)
8383

84-
staticvoidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
84+
voidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
8585
{
8686
if (flush) {
8787
OutputPluginWrite(ctx,last_write);
8888
}
8989
}
9090

91-
staticvoidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
91+
voidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
9292
{
9393
if (!ctx->prepared_write) {
9494
OutputPluginPrepareWrite(ctx,last_write);
@@ -557,7 +557,7 @@ pg_decode_message(LogicalDecodingContext *ctx,
557557
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
558558

559559
MtmOutputPluginPrepareWrite(ctx, true, !transactional);
560-
data->api->write_message(ctx->out,data,prefix,sz,message);
560+
data->api->write_message(ctx->out,ctx,prefix,sz,message);
561561
MtmOutputPluginWrite(ctx, true, !transactional);
562562
}
563563

‎contrib/mmts/pglogical_output.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,7 @@ typedef struct PGLogicalTupleData
102102
boolchanged[MaxTupleAttributeNumber];
103103
}PGLogicalTupleData;
104104

105+
externvoidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush);
106+
externvoidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush);
107+
105108
#endif/* PG_LOGICAL_OUTPUT_H */

‎contrib/mmts/pglogical_proto.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,38 +168,34 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168168
}
169169
}
170170

171-
staticvoidpglogical_broadcast_table(StringInfoout,PGLogicalOutputData*data,MtmCopyRequest*copy)
171+
staticvoidpglogical_broadcast_table(StringInfoout,LogicalDecodingContext*ctx,MtmCopyRequest*copy)
172172
{
173173
if (BIT_CHECK(copy->targetNodes,MtmReplicationNodeId-1)) {
174174
HeapScanDescscandesc;
175175
HeapTupletuple;
176176
Relationrel;
177177

178-
StartTransactionCommand();
179-
180178
rel=heap_open(copy->sourceTable,ShareLock);
181179

182-
pq_sendbyte(out,'M');
183-
pq_sendbyte(out,'B');
184-
pq_sendint(out,sizeof(*copy),4);
185-
pq_sendbytes(out, (char*)copy,sizeof(*copy));
186-
187-
pglogical_write_rel(out,data,rel);
180+
pglogical_write_rel(out,ctx->output_plugin_private,rel);
181+
182+
pq_sendbyte(out,'0');
188183

189184
scandesc=heap_beginscan(rel,GetTransactionSnapshot(),0,NULL);
190185
while ((tuple=heap_getnext(scandesc,ForwardScanDirection))!=NULL)
191186
{
192-
pglogical_write_tuple(out,data,rel,tuple);
187+
MtmOutputPluginPrepareWrite(ctx, false, false);
188+
pq_sendbyte(out,'I');/* action INSERT */
189+
pglogical_write_tuple(out,ctx->output_plugin_private,rel,tuple);
190+
MtmOutputPluginWrite(ctx, false, false);
193191
}
194192
heap_endscan(scandesc);
195193
heap_close(rel,ShareLock);
196-
197-
CommitTransactionCommand();
198194
}
199195
}
200196

201197
staticvoid
202-
pglogical_write_message(StringInfoout,PGLogicalOutputData*data,
198+
pglogical_write_message(StringInfoout,LogicalDecodingContext*ctx,
203199
constchar*prefix,Sizesz,constchar*message)
204200
{
205201
MtmLastRelId=InvalidOid;
@@ -231,7 +227,7 @@ pglogical_write_message(StringInfo out, PGLogicalOutputData *data,
231227
*/
232228
return;
233229
case'B':
234-
pglogical_broadcast_table(out,data, (MtmCopyRequest*)message);
230+
pglogical_broadcast_table(out,ctx, (MtmCopyRequest*)message);
235231
return;
236232
}
237233
pq_sendbyte(out,'M');

‎contrib/mmts/pglogical_proto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedefvoid (*pglogical_write_begin_fn)(StringInfoout,structPGLogicalOutputData*data,
2323
ReorderBufferTXN*txn);
24-
typedefvoid (*pglogical_write_message_fn)(StringInfoout,structPGLogicalOutputData*data,
24+
typedefvoid (*pglogical_write_message_fn)(StringInfoout,LogicalDecodingContext*ctx,
2525
constchar*prefix,Sizesz,constchar*message);
2626
typedefvoid (*pglogical_write_commit_fn)(StringInfoout,structPGLogicalOutputData*data,
2727
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn);

‎contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,9 +547,9 @@ pglogical_receiver_main(Datum main_arg)
547547
MtmSpillToFile(spill_file,buf.data,buf.used);
548548
ByteBufferReset(&buf);
549549
}
550-
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='B'||stmt[1]=='C'))) {
550+
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C'))) {
551551
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
552-
if (stmt[0]=='M'&&(stmt[1]=='B'||stmt[1]=='C')) {/* concurrent DDL should be executed by parallel workers */
552+
if (stmt[0]=='M'&&stmt[1]=='C') {/* concurrent DDL should be executed by parallel workers */
553553
MtmExecute(stmt,msg_len);
554554
}else {
555555
MtmExecutor(stmt,msg_len);/* all other messages can be processed by receiver itself */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp