Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfa0ca21

Browse files
committed
Fix problem with mtm.local_tables
1 parent7d66929 commitfa0ca21

File tree

2 files changed

+59
-58
lines changed

2 files changed

+59
-58
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3347,8 +3347,10 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33473347

33483348
if (BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
33493349
{
3350-
/* Ok, then start recovery by luckiest walreceiver */
3351-
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)
3350+
/* Ok, then start recovery by luckiest walreceiver (if there is no donor node).
3351+
* If this node was populated using basebackup, then donorNodeId is not zero and we should choose this node for recovery */
3352+
if ((Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)
3353+
&& (Mtm->donorNodeId==MtmNodeId||Mtm->donorNodeId==nodeId))
33523354
{
33533355
/* Lock on us */
33543356
Mtm->recoverySlot=nodeId;
@@ -4265,8 +4267,8 @@ Datum mtm_make_table_local(PG_FUNCTION_ARGS)
42654267
/* Form a tuple. */
42664268
memset(nulls, false,sizeof(nulls));
42674269

4268-
values[Anum_mtm_local_tables_rel_schema-1]=CStringGetTextDatum(schemaName);
4269-
values[Anum_mtm_local_tables_rel_name-1]=CStringGetTextDatum(tableName);
4270+
values[Anum_mtm_local_tables_rel_schema-1]=CStringGetDatum(schemaName);
4271+
values[Anum_mtm_local_tables_rel_name-1]=CStringGetDatum(tableName);
42704272

42714273
tup=heap_form_tuple(tupDesc,values,nulls);
42724274

‎contrib/mmts/pglogical_apply.c

Lines changed: 53 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -344,16 +344,16 @@ process_remote_begin(StringInfo s)
344344
nodemask_tparticipantsMask;
345345
intrc;
346346

347-
gtid.node=pq_getmsgint(s,4);
348-
gtid.xid=pq_getmsgint64(s);
349-
snapshot=pq_getmsgint64(s);
347+
gtid.node=pq_getmsgint(s,4);
348+
gtid.xid=pq_getmsgint64(s);
349+
snapshot=pq_getmsgint64(s);
350350
participantsMask=pq_getmsgint64(s);
351351
Assert(gtid.node>0);
352352

353353
MTM_LOG2("REMOTE begin node=%d xid=%llu snapshot=%lld participantsMask=%llx",gtid.node, (long64)gtid.xid,snapshot,participantsMask);
354-
MtmResetTransaction();
354+
MtmResetTransaction();
355355

356-
SetCurrentStatementStartTimestamp();
356+
SetCurrentStatementStartTimestamp();
357357
StartTransactionCommand();
358358
MtmJoinTransaction(&gtid,snapshot,participantsMask);
359359

@@ -362,7 +362,7 @@ process_remote_begin(StringInfo s)
362362
GucAltered= false;
363363
rc=SPI_execute("RESET SESSION AUTHORIZATION; reset all;", false,0);
364364
SPI_finish();
365-
if (rc<0) {
365+
if (rc<0) {
366366
MTM_ELOG(ERROR,"Failed to set reset context: %d",rc);
367367
}
368368
}
@@ -403,13 +403,13 @@ process_remote_message(StringInfo s)
403403

404404
rc=SPI_execute(messageBody, false,0);
405405
SPI_finish();
406-
if (rc<0) {
406+
if (rc<0) {
407407
MTM_ELOG(ERROR,"Failed to execute utility statement %s",messageBody);
408-
}else {
408+
}else {
409409
MemoryContextSwitchTo(MtmApplyContext);
410410
PushActiveSnapshot(GetTransactionSnapshot());
411411

412-
if (MtmVacuumStmt!=NULL) {
412+
if (MtmVacuumStmt!=NULL) {
413413
ExecVacuum(MtmVacuumStmt,1);
414414
}elseif (MtmIndexStmt!=NULL) {
415415
Oidrelid=RangeVarGetRelidExtended(MtmIndexStmt->relation,ShareUpdateExclusiveLock,
@@ -426,7 +426,7 @@ process_remote_message(StringInfo s)
426426
true,/* check_rights */
427427
false,/* skip_build */
428428
false);/* quiet */
429-
429+
430430
}
431431
elseif (MtmDropStmt!=NULL)
432432
{
@@ -449,7 +449,7 @@ process_remote_message(StringInfo s)
449449
if (ActiveSnapshotSet())
450450
PopActiveSnapshot();
451451
}
452-
if (standalone) {
452+
if (standalone) {
453453
CommitTransactionCommand();
454454
}
455455
break;
@@ -462,18 +462,18 @@ process_remote_message(StringInfo s)
462462
/* This function is called directly by receiver, so there is no race condition and we can update
463463
* restartLSN without locks
464464
*/
465-
if (origin_node==MtmReplicationNodeId) {
465+
if (origin_node==MtmReplicationNodeId) {
466466
Assert(msg->origin_lsn==INVALID_LSN);
467467
msg->origin_lsn=MtmSenderWalEnd;
468468
}
469-
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
469+
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
470470
MTM_LOG1("Receive logical abort message for transaction %s from node %d: %llx < %llx",msg->gid,origin_node,Mtm->nodes[origin_node-1].restartLSN,msg->origin_lsn);
471471
Mtm->nodes[origin_node-1].restartLSN=msg->origin_lsn;
472-
replorigin_session_origin_lsn=msg->origin_lsn;
472+
replorigin_session_origin_lsn=msg->origin_lsn;
473473
MtmRollbackPreparedTransaction(origin_node,msg->gid);
474-
}else {
475-
if (msg->origin_lsn!=INVALID_LSN) {
476-
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx",
474+
}else {
475+
if (msg->origin_lsn!=INVALID_LSN) {
476+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx",
477477
msg->gid,origin_node,msg->origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
478478
}
479479
}
@@ -498,7 +498,7 @@ process_remote_message(StringInfo s)
498498
}
499499
returnstandalone;
500500
}
501-
501+
502502
staticvoid
503503
read_tuple_parts(StringInfos,Relationrel,TupleData*tup)
504504
{
@@ -529,7 +529,7 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
529529
constchar*data;
530530
intlen;
531531

532-
if (att->atttypid==InvalidOid) {
532+
if (att->atttypid==InvalidOid) {
533533
continue;
534534
}
535535

@@ -612,13 +612,13 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
612612
staticvoid
613613
close_rel(Relationrel)
614614
{
615-
if (rel!=NULL)
615+
if (rel!=NULL)
616616
{
617-
heap_close(rel,NoLock);
618-
}
617+
heap_close(rel,NoLock);
618+
}
619619
}
620620

621-
staticRelation
621+
staticRelation
622622
read_rel(StringInfos,LOCKMODEmode)
623623
{
624624
intrelnamelen;
@@ -629,20 +629,20 @@ read_rel(StringInfo s, LOCKMODE mode)
629629
MemoryContextold_context;
630630

631631
local_relid=pglogical_relid_map_get(remote_relid);
632-
if (local_relid==InvalidOid) {
632+
if (local_relid==InvalidOid) {
633633
rv=makeNode(RangeVar);
634634

635635
nspnamelen=pq_getmsgbyte(s);
636636
rv->schemaname= (char*)pq_getmsgbytes(s,nspnamelen);
637-
637+
638638
relnamelen=pq_getmsgbyte(s);
639639
rv->relname= (char*)pq_getmsgbytes(s,relnamelen);
640-
640+
641641
local_relid=RangeVarGetRelidExtended(rv,mode, false, false,NULL,NULL);
642642
old_context=MemoryContextSwitchTo(TopMemoryContext);
643643
pglogical_relid_map_put(remote_relid,local_relid);
644644
MemoryContextSwitchTo(old_context);
645-
}else {
645+
}else {
646646
nspnamelen=pq_getmsgbyte(s);
647647
s->cursor+=nspnamelen;
648648
relnamelen=pq_getmsgbyte(s);
@@ -707,29 +707,29 @@ process_remote_commit(StringInfo in)
707707
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
708708
strncpy(gid,pq_getmsgstring(in),sizeofgid);
709709
MTM_LOG2("%d: PGLOGICAL_PREPARE %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
710-
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
711-
MTM_LOG1("Avoid prepare of previously aborted global transaction %s",gid);
710+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
711+
MTM_LOG1("Avoid prepare of previously aborted global transaction %s",gid);
712712
AbortCurrentTransaction();
713-
}else {
713+
}else {
714714
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
715715
BeginTransactionBlock(false);
716716
CommitTransactionCommand();
717717
StartTransactionCommand();
718-
718+
719719
MtmBeginSession(origin_node);
720720
/* PREPARE itself */
721721
MtmSetCurrentTransactionGID(gid);
722722
PrepareTransactionBlock(gid);
723723
CommitTransactionCommand();
724724

725-
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_UNKNOWN)==TRANSACTION_STATUS_ABORTED) {
726-
MTM_LOG1("Perform delayed rollback of prepared global transaction %s",gid);
725+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_UNKNOWN)==TRANSACTION_STATUS_ABORTED) {
726+
MTM_LOG1("Perform delayed rollback of prepared global transaction %s",gid);
727727
StartTransactionCommand();
728728
MtmSetCurrentTransactionGID(gid);
729729
FinishPreparedTransaction(gid, false);
730-
CommitTransactionCommand();
730+
CommitTransactionCommand();
731731
Assert(!MtmTransIsActive());
732-
}
732+
}
733733
MtmEndSession(origin_node, true);
734734
}
735735
break;
@@ -771,7 +771,7 @@ process_remote_commit(StringInfo in)
771771
default:
772772
Assert(false);
773773
}
774-
if (Mtm->status==MTM_RECOVERY) {
774+
if (Mtm->status==MTM_RECOVERY) {
775775
MTM_LOG1("Recover transaction %s event=%d",gid,event);
776776
}
777777
MtmUpdateLsnMapping(MtmReplicationNodeId,end_lsn);
@@ -871,12 +871,12 @@ process_remote_insert(StringInfo s, Relation rel)
871871
if (strcmp(RelationGetRelationName(rel),MULTIMASTER_LOCAL_TABLES_TABLE)==0&&
872872
strcmp(get_namespace_name(RelationGetNamespace(rel)),MULTIMASTER_SCHEMA_NAME)==0)
873873
{
874-
MtmMakeTableLocal(TextDatumGetCString(new_tuple.values[0]),TextDatumGetCString(new_tuple.values[1]));
874+
MtmMakeTableLocal((char*)DatumGetPointer(new_tuple.values[0]),(char*)DatumGetPointer(new_tuple.values[1]));
875875
}
876-
876+
877877
ExecResetTupleTable(estate->es_tupleTable, true);
878878
FreeExecutorState(estate);
879-
879+
880880
CommandCounterIncrement();
881881
}
882882

@@ -989,12 +989,12 @@ process_remote_update(StringInfo s, Relation rel)
989989
errdetail("Most likely we have DELETE-UPDATE conflict")));
990990

991991
}
992-
992+
993993
PopActiveSnapshot();
994-
994+
995995
/* release locks upon commit */
996996
index_close(idxrel,NoLock);
997-
997+
998998
ExecResetTupleTable(estate->es_tupleTable, true);
999999
FreeExecutorState(estate);
10001000

@@ -1089,7 +1089,7 @@ void MtmExecutor(void* work, size_t size)
10891089
s.len=size;
10901090
s.maxlen=-1;
10911091
s.cursor=0;
1092-
1092+
10931093
if (MtmApplyContext==NULL) {
10941094
MtmApplyContext=AllocSetContextCreate(TopMemoryContext,
10951095
"ApplyContext",
@@ -1100,15 +1100,15 @@ void MtmExecutor(void* work, size_t size)
11001100
top_context=MemoryContextSwitchTo(MtmApplyContext);
11011101
replorigin_session_origin=InvalidRepOriginId;
11021102
PG_TRY();
1103-
{
1103+
{
11041104
boolinside_transaction= true;
1105-
do {
1105+
do {
11061106
charaction=pq_getmsgbyte(&s);
11071107
old_context=MemoryContextSwitchTo(MtmApplyContext);
1108-
1108+
11091109
MTM_LOG2("%d: REMOTE process action %c",MyProcPid,action);
11101110
#if0
1111-
if (Mtm->status==MTM_RECOVERY) {
1111+
if (Mtm->status==MTM_RECOVERY) {
11121112
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);
11131113
}
11141114
#endif
@@ -1150,7 +1150,7 @@ void MtmExecutor(void* work, size_t size)
11501150
}
11511151
case'(':
11521152
{
1153-
size_tsize=pq_getmsgint(&s,4);
1153+
size_tsize=pq_getmsgint(&s,4);
11541154
s.data=MemoryContextAlloc(TopMemoryContext,size);
11551155
save_cursor=s.cursor;
11561156
save_len=s.len;
@@ -1175,10 +1175,10 @@ void MtmExecutor(void* work, size_t size)
11751175
relid=RelationGetRelid(rel);
11761176
close_rel(rel);
11771177
rel=NULL;
1178-
next=pq_getmsgint64(&s);
1178+
next=pq_getmsgint64(&s);
11791179
AdjustSequence(relid,next);
11801180
break;
1181-
}
1181+
}
11821182
case'0':
11831183
Assert(rel!=NULL);
11841184
heap_truncate_one_rel(rel);
@@ -1198,7 +1198,7 @@ void MtmExecutor(void* work, size_t size)
11981198
}
11991199
default:
12001200
MTM_ELOG(ERROR,"unknown action of type %c",action);
1201-
}
1201+
}
12021202
MemoryContextSwitchTo(old_context);
12031203
MemoryContextResetAndDeleteChildren(MtmApplyContext);
12041204
}while (inside_transaction);
@@ -1217,16 +1217,15 @@ void MtmExecutor(void* work, size_t size)
12171217
MTM_LOG2("%d: REMOTE end abort transaction %llu",MyProcPid, (long64)MtmGetCurrentTransactionId());
12181218
}
12191219
PG_END_TRY();
1220-
if (s.data!=work) {
1220+
if (s.data!=work) {
12211221
pfree(s.data);
12221222
}
12231223
#if0/* spill file is expecrted to be closed by tranaction commit or rollback */
1224-
if (spill_file >=0) {
1224+
if (spill_file >=0) {
12251225
MtmCloseSpillFile(spill_file);
12261226
}
12271227
#endif
12281228
MemoryContextSwitchTo(top_context);
12291229
MemoryContextResetAndDeleteChildren(MtmApplyContext);
12301230
MtmReleaseLocks();
12311231
}
1232-

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp