Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit181d689

Browse files
knizhnikkelvich
authored andcommitted
PGPRO-731 # Table copy fixes
1 parent432ef92 commit181d689

File tree

7 files changed

+26
-71
lines changed

7 files changed

+26
-71
lines changed

‎multimaster.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4422,7 +4422,8 @@ Datum mtm_broadcast_table(PG_FUNCTION_ARGS)
44224422
MtmCopyRequestcopy;
44234423
copy.sourceTable=PG_GETARG_OID(0);
44244424
copy.targetNodes=PG_GETARG_INT64(1);
4425-
LogLogicalMessage("B", (char*)&copy,sizeof(copy), false);
4425+
LogLogicalMessage("B", (char*)&copy,sizeof(copy), true);
4426+
MtmTx.containsDML= true;
44264427
PG_RETURN_VOID();
44274428
}
44284429

@@ -4431,7 +4432,8 @@ Datum mtm_copy_table(PG_FUNCTION_ARGS)
44314432
MtmCopyRequestcopy;
44324433
copy.sourceTable=PG_GETARG_OID(0);
44334434
copy.targetNodes= (nodemask_t)1 << (PG_GETARG_INT32(1)-1);
4434-
LogLogicalMessage("B", (char*)&copy,sizeof(copy), false);
4435+
LogLogicalMessage("B", (char*)&copy,sizeof(copy), true);
4436+
MtmTx.containsDML= true;
44354437
PG_RETURN_VOID();
44364438
}
44374439

‎pglogical_apply.c

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -368,49 +368,6 @@ process_remote_begin(StringInfo s)
368368
return true;
369369
}
370370

371-
staticvoid
372-
process_broadcast_table(StringInfos)
373-
{
374-
Relationrel;
375-
charch;
376-
EState*estate;
377-
TupleDatanew_tuple;
378-
TupleTableSlot*newslot;
379-
TupleTableSlot*oldslot;
380-
HeapTupletup;
381-
382-
StartTransactionCommand();
383-
384-
ch=pq_getmsgbyte(s);
385-
Assert(ch=='R');
386-
rel=read_rel(s,AccessExclusiveLock);
387-
388-
heap_truncate_one_rel(rel);
389-
390-
estate=create_rel_estate(rel);
391-
newslot=ExecInitExtraTupleSlot(estate);
392-
oldslot=ExecInitExtraTupleSlot(estate);
393-
ExecSetSlotDescriptor(newslot,RelationGetDescr(rel));
394-
ExecSetSlotDescriptor(oldslot,RelationGetDescr(rel));
395-
396-
ExecOpenIndices(estate->es_result_relation_info, false);
397-
398-
while (s->cursor!=s->len) {
399-
read_tuple_parts(s,rel,&new_tuple);
400-
tup=heap_form_tuple(RelationGetDescr(rel),
401-
new_tuple.values,new_tuple.isnull);
402-
ExecStoreTuple(tup,newslot,InvalidBuffer, true);
403-
simple_heap_insert(rel,newslot->tts_tuple);
404-
UserTableUpdateOpenIndexes(estate,newslot);
405-
}
406-
407-
ExecCloseIndices(estate->es_result_relation_info);
408-
ExecResetTupleTable(estate->es_tupleTable, true);
409-
FreeExecutorState(estate);
410-
411-
CommitTransactionCommand();
412-
}
413-
414371
staticbool
415372
process_remote_message(StringInfos)
416373
{
@@ -421,12 +378,6 @@ process_remote_message(StringInfo s)
421378

422379
switch (action)
423380
{
424-
case'B':
425-
{
426-
process_broadcast_table(s);
427-
standalone= true;
428-
break;
429-
}
430381
case'C':
431382
{
432383
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
@@ -1206,6 +1157,9 @@ void MtmExecutor(void* work, size_t size)
12061157
s.len=save_len;
12071158
break;
12081159
}
1160+
case'0':
1161+
heap_truncate_one_rel(rel);
1162+
break;
12091163
case'M':
12101164
{
12111165
close_rel(rel);

‎pglogical_output.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ static bool startup_message_sent = false;
8181

8282
#defineOUTPUT_BUFFER_SIZE (16*1024*1024)
8383

84-
staticvoidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
84+
voidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
8585
{
8686
if (flush) {
8787
OutputPluginWrite(ctx,last_write);
8888
}
8989
}
9090

91-
staticvoidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
91+
voidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
9292
{
9393
if (!ctx->prepared_write) {
9494
OutputPluginPrepareWrite(ctx,last_write);
@@ -557,7 +557,7 @@ pg_decode_message(LogicalDecodingContext *ctx,
557557
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
558558

559559
MtmOutputPluginPrepareWrite(ctx, true, !transactional);
560-
data->api->write_message(ctx->out,data,prefix,sz,message);
560+
data->api->write_message(ctx->out,ctx,prefix,sz,message);
561561
MtmOutputPluginWrite(ctx, true, !transactional);
562562
}
563563

‎pglogical_output.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,7 @@ typedef struct PGLogicalTupleData
102102
boolchanged[MaxTupleAttributeNumber];
103103
}PGLogicalTupleData;
104104

105+
externvoidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush);
106+
externvoidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush);
107+
105108
#endif/* PG_LOGICAL_OUTPUT_H */

‎pglogical_proto.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,38 +168,34 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168168
}
169169
}
170170

171-
staticvoidpglogical_broadcast_table(StringInfoout,PGLogicalOutputData*data,MtmCopyRequest*copy)
171+
staticvoidpglogical_broadcast_table(StringInfoout,LogicalDecodingContext*ctx,MtmCopyRequest*copy)
172172
{
173173
if (BIT_CHECK(copy->targetNodes,MtmReplicationNodeId-1)) {
174174
HeapScanDescscandesc;
175175
HeapTupletuple;
176176
Relationrel;
177177

178-
StartTransactionCommand();
179-
180178
rel=heap_open(copy->sourceTable,ShareLock);
181179

182-
pq_sendbyte(out,'M');
183-
pq_sendbyte(out,'B');
184-
pq_sendint(out,sizeof(*copy),4);
185-
pq_sendbytes(out, (char*)copy,sizeof(*copy));
186-
187-
pglogical_write_rel(out,data,rel);
180+
pglogical_write_rel(out,ctx->output_plugin_private,rel);
181+
182+
pq_sendbyte(out,'0');
188183

189184
scandesc=heap_beginscan(rel,GetTransactionSnapshot(),0,NULL);
190185
while ((tuple=heap_getnext(scandesc,ForwardScanDirection))!=NULL)
191186
{
192-
pglogical_write_tuple(out,data,rel,tuple);
187+
MtmOutputPluginPrepareWrite(ctx, false, false);
188+
pq_sendbyte(out,'I');/* action INSERT */
189+
pglogical_write_tuple(out,ctx->output_plugin_private,rel,tuple);
190+
MtmOutputPluginWrite(ctx, false, false);
193191
}
194192
heap_endscan(scandesc);
195193
heap_close(rel,ShareLock);
196-
197-
CommitTransactionCommand();
198194
}
199195
}
200196

201197
staticvoid
202-
pglogical_write_message(StringInfoout,PGLogicalOutputData*data,
198+
pglogical_write_message(StringInfoout,LogicalDecodingContext*ctx,
203199
constchar*prefix,Sizesz,constchar*message)
204200
{
205201
MtmLastRelId=InvalidOid;
@@ -231,7 +227,7 @@ pglogical_write_message(StringInfo out, PGLogicalOutputData *data,
231227
*/
232228
return;
233229
case'B':
234-
pglogical_broadcast_table(out,data, (MtmCopyRequest*)message);
230+
pglogical_broadcast_table(out,ctx, (MtmCopyRequest*)message);
235231
return;
236232
}
237233
pq_sendbyte(out,'M');

‎pglogical_proto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedefvoid (*pglogical_write_begin_fn)(StringInfoout,structPGLogicalOutputData*data,
2323
ReorderBufferTXN*txn);
24-
typedefvoid (*pglogical_write_message_fn)(StringInfoout,structPGLogicalOutputData*data,
24+
typedefvoid (*pglogical_write_message_fn)(StringInfoout,LogicalDecodingContext*ctx,
2525
constchar*prefix,Sizesz,constchar*message);
2626
typedefvoid (*pglogical_write_commit_fn)(StringInfoout,structPGLogicalOutputData*data,
2727
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn);

‎pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,9 +552,9 @@ pglogical_receiver_main(Datum main_arg)
552552
MtmSpillToFile(spill_file,buf.data,buf.used);
553553
ByteBufferReset(&buf);
554554
}
555-
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='B'||stmt[1]=='C'))) {
555+
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C'))) {
556556
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
557-
if (stmt[0]=='M'&&(stmt[1]=='B'||stmt[1]=='C')) {/* concurrent DDL should be executed by parallel workers */
557+
if (stmt[0]=='M'&&stmt[1]=='C') {/* concurrent DDL should be executed by parallel workers */
558558
MtmExecute(stmt,msg_len);
559559
}else {
560560
MtmExecutor(stmt,msg_len);/* all other messages can be processed by receiver itself */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp