Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite0539a9

Browse files
knizhnikkelvich
authored andcommitted
eXplicitely set restart LSN
1 parent7463957 commite0539a9

File tree

6 files changed

+49
-46
lines changed

6 files changed

+49
-46
lines changed

‎arbiter.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,14 @@ static void MtmTransReceiver(Datum arg)
912912
switch (msg->code) {
913913
caseMSG_READY:
914914
MTM_TXTRACE(ts,"MtmTransReceiver got MSG_READY");
915+
if (ts->status==TRANSACTION_STATUS_COMMITTED) {
916+
elog(WARNING,"Receive READY response for already committed transaction %d from node %d",
917+
ts->xid,msg->node);
918+
continue;
919+
}
915920
if (ts->nVotes >=Mtm->nLiveNodes) {
921+
elog(WARNING,"Receive deteriorated READY response for transaction %d (%s) from node %d",
922+
ts->xid,ts->gid,msg->node);
916923
MtmAbortTransaction(ts);
917924
MtmWakeUpBackend(ts);
918925
}else {
@@ -956,6 +963,8 @@ static void MtmTransReceiver(Datum arg)
956963
caseMSG_PREPARED:
957964
MTM_TXTRACE(ts,"MtmTransReceiver got MSG_PREPARED");
958965
if (ts->nVotes >=Mtm->nLiveNodes) {
966+
elog(WARNING,"Receive deteriorated PREPARED response for transaction %d (%s) from node %d",
967+
ts->xid,ts->gid,msg->node);
959968
MtmAbortTransaction(ts);
960969
MtmWakeUpBackend(ts);
961970
}else {

‎multimaster.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,16 +390,17 @@ MtmInitializeSequence(int64* start, int64* step)
390390

391391
csn_tMtmTransactionSnapshot(TransactionIdxid)
392392
{
393-
MtmTransState*ts;
394393
csn_tsnapshot=INVALID_CSN;
395-
394+
396395
MtmLock(LW_SHARED);
397-
ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
398-
if (ts!=NULL&& !ts->isLocal) {
399-
snapshot=ts->snapshot;
396+
if (Mtm->status==MTM_ONLINE) {
397+
MtmTransState*ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
398+
if (ts!=NULL&& !ts->isLocal) {
399+
snapshot=ts->snapshot;
400+
Assert(ts->gtid.node==MtmNodeId||MtmIsRecoverySession);
401+
}
400402
}
401403
MtmUnlock();
402-
403404
returnsnapshot;
404405
}
405406

@@ -1009,7 +1010,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
10091010
}
10101011
}
10111012
if (!commit&&x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
1012-
Assert(Mtm->status!=MTM_RECOVERY);
1013+
Assert(Mtm->status!=MTM_RECOVERY||Mtm->recoverySlot!=MtmNodeId);
10131014
/*
10141015
* Send notification only if ABORT happens during transaction processing at replicas,
10151016
* do not send notification if ABORT is received from master
@@ -2467,29 +2468,32 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
24672468
returnREPLMODE_EXIT;
24682469
}
24692470
MTM_LOG2("%d: receiver slot mode %s",MyProcPid,MtmNodeStatusMnem[Mtm->status]);
2471+
MtmLock(LW_EXCLUSIVE);
24702472
if (Mtm->status==MTM_RECOVERY) {
24712473
recovery= true;
24722474
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId) {
24732475
/* Choose for recovery first available slot */
2474-
MTM_LOG1("Startrecovery from node %d",nodeId);
2476+
elog(WARNING,"Process %d startsrecovery from node %d",MyProcPid,nodeId);
24752477
Mtm->recoverySlot=nodeId;
24762478
Mtm->nReceivers=0;
24772479
Mtm->recoveryCount+=1;
24782480
Mtm->pglogicalNodeMask=0;
2479-
FinishAllPreparedTransactions(false);
24802481
for (i=0;i<Mtm->nAllNodes;i++) {
2481-
Mtm->nodes[i].restartLsn=0;
2482+
Mtm->nodes[i].restartLsn=InvalidXLogRecPtr;
24822483
}
2484+
MtmUnlock();
2485+
FinishAllPreparedTransactions(false);
24832486
returnREPLMODE_RECOVERY;
24842487
}
24852488
}
2489+
MtmUnlock();
24862490
/* delay opening of other slots until recovery is completed */
24872491
MtmSleep(STATUS_POLL_DELAY);
24882492
}
24892493
if (recovery) {
2490-
MTM_LOG1("Recreatereplicationslotfor node %d after end of recovery",nodeId);
2494+
MTM_LOG1("%d: Restartreplication for node %d after end of recovery",MyProcPid,nodeId);
24912495
}else {
2492-
MTM_LOG2("%d:Reuse replication slot for node %d",MyProcPid,nodeId);
2496+
MTM_LOG1("%d:Continue replication slot for node %d",MyProcPid,nodeId);
24932497
}
24942498
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
24952499
returnrecovery ?REPLMODE_RECOVERED :REPLMODE_NORMAL;

‎pglogical_apply.c

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62-
staticintMtmTransactionRecords;
62+
staticboolinside_tx= false;
6363

6464
staticRelationread_rel(StringInfos,LOCKMODEmode);
6565
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
@@ -528,6 +528,8 @@ MtmEndSession(void)
528528
if (replorigin_session_origin!=InvalidRepOriginId) {
529529
MTM_LOG2("%d: Begin reset replorigin session for node %d: %d, progress %lx",MyProcPid,MtmReplicationNodeId,replorigin_session_origin,replorigin_session_get_progress(false));
530530
replorigin_session_origin=InvalidRepOriginId;
531+
replorigin_session_origin_lsn=InvalidXLogRecPtr;
532+
replorigin_session_origin_timestamp=0;
531533
replorigin_session_reset();
532534
if (unlock) {
533535
MtmUnlockNode(MtmReplicationNodeId);
@@ -539,42 +541,25 @@ MtmEndSession(void)
539541
staticvoid
540542
process_remote_commit(StringInfoin)
541543
{
542-
inti;
543544
uint8flags;
544545
csn_tcsn;
545546
constchar*gid=NULL;
546547
XLogRecPtrend_lsn;
547548
XLogRecPtrorigin_lsn;
548-
RepOriginIdoriginId;
549-
intn_records;
549+
intorigin_node;
550550
/* read flags */
551551
flags=pq_getmsgbyte(in);
552552
MtmReplicationNodeId=pq_getmsgbyte(in);
553553

554-
n_records=pq_getmsgint(in,4);
555-
if (MtmTransactionRecords!=n_records) {
556-
elog(ERROR,"Transaction %d flags %d contains %d records instead of %d",MtmGetCurrentTransactionId(),flags,MtmTransactionRecords,n_records);
557-
}
558-
559554
/* read fields */
560555
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
561556
end_lsn=pq_getmsgint64(in);/* end_lsn */
562557
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
563558

564-
originId=(RepOriginId)pq_getmsgint(in,2);
559+
origin_node=pq_getmsgbyte(in);
565560
origin_lsn=pq_getmsgint64(in);
561+
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
566562

567-
if (originId!=InvalidRepOriginId) {
568-
for (i=0;i<Mtm->nAllNodes;i++) {
569-
if (Mtm->nodes[i].originId==originId) {
570-
Mtm->nodes[i].restartLsn=origin_lsn;
571-
break;
572-
}
573-
}
574-
if (i==Mtm->nAllNodes) {
575-
elog(WARNING,"Failed to map origin %d",originId);
576-
}
577-
}
578563
Assert(replorigin_session_origin==InvalidRepOriginId);
579564

580565
switch(PGLOGICAL_XACT_EVENT(flags))
@@ -676,8 +661,6 @@ process_remote_insert(StringInfo s, Relation rel)
676661
ScanKey*index_keys;
677662
inti;
678663

679-
MtmTransactionRecords+=1;
680-
681664
estate=create_rel_estate(rel);
682665
newslot=ExecInitExtraTupleSlot(estate);
683666
oldslot=ExecInitExtraTupleSlot(estate);
@@ -776,8 +759,6 @@ process_remote_update(StringInfo s, Relation rel)
776759
ScanKeyDataskey[INDEX_MAX_KEYS];
777760
HeapTupleremote_tuple=NULL;
778761

779-
MtmTransactionRecords+=1;
780-
781762
action=pq_getmsgbyte(s);
782763

783764
/* old key present, identifying key changed */
@@ -895,8 +876,6 @@ process_remote_delete(StringInfo s, Relation rel)
895876
ScanKeyDataskey[INDEX_MAX_KEYS];
896877
boolfound_old;
897878

898-
MtmTransactionRecords+=1;
899-
900879
estate=create_rel_estate(rel);
901880
oldslot=ExecInitExtraTupleSlot(estate);
902881
ExecSetSlotDescriptor(oldslot,RelationGetDescr(rel));
@@ -984,7 +963,6 @@ void MtmExecutor(int id, void* work, size_t size)
984963
}
985964
MemoryContextSwitchTo(ApplyContext);
986965
replorigin_session_origin=InvalidRepOriginId;
987-
MtmTransactionRecords=0;
988966
PG_TRY();
989967
{
990968
while (true) {

‎pglogical_proto.c

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,25 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
pq_sendbyte(out,MtmNodeId);
187187

188188
Assert(txn->xact_action!=XLOG_XACT_PREPARE||txn->xid<1000||MtmTransactionRecords >=2);
189-
pq_sendint(out,MtmTransactionRecords,4);
190-
189+
191190
/* send fixed fields */
192191
pq_sendint64(out,commit_lsn);
193192
pq_sendint64(out,txn->end_lsn);
194193
pq_sendint64(out,txn->commit_time);
195194

196-
pq_sendint(out,txn->origin_id,2);
195+
if (txn->origin_id!=InvalidRepOriginId) {
196+
inti;
197+
for (i=0;i<Mtm->nAllNodes&&Mtm->nodes[i].originId!=txn->origin_id;i++);
198+
if (i==Mtm->nAllNodes) {
199+
elog(WARNING,"Failed to map origin %d",txn->origin_id);
200+
i=MtmNodeId-1;
201+
}else {
202+
//Assert(i == MtmNodeId-1 || txn->origin_lsn != InvalidXLogRecPtr);
203+
}
204+
pq_sendbyte(out,i+1);
205+
}else {
206+
pq_sendbyte(out,MtmNodeId);
207+
}
197208
pq_sendint64(out,txn->origin_lsn);
198209

199210
if (txn->xact_action==XLOG_XACT_COMMIT_PREPARED) {

‎pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ pglogical_receiver_main(Datum main_arg)
247247
{
248248
intcount;
249249
ConnStatusTypestatus;
250-
XLogRecPtroriginStartPos=0;
250+
XLogRecPtroriginStartPos=InvalidXLogRecPtr;
251251

252252
/*
253253
* Determine when and how we should open replication slot.
@@ -306,8 +306,9 @@ pglogical_receiver_main(Datum main_arg)
306306
/* Start logical replication at specified position */
307307
if (mode==REPLMODE_RECOVERED) {
308308
originStartPos=Mtm->nodes[nodeId-1].restartLsn;
309+
MTM_LOG1("Restart replication from node %d from position %lx",nodeId,originStartPos);
309310
}
310-
if (originStartPos==0) {
311+
if (originStartPos==InvalidXLogRecPtr) {
311312
StartTransactionCommand();
312313
originName=psprintf(MULTIMASTER_SLOT_PATTERN,nodeId);
313314
originId=replorigin_by_name(originName, true);

‎tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ if [ "$1" = 'postgres' ]; then
6161

6262
############################################################################
6363

64-
CONNSTRS='dbname=postgres host=node1, dbname=postgres host=node2, dbname=postgres host=node3'
64+
CONNSTRS='dbname=postgresuser=postgreshost=node1, dbname=postgresuser=postgreshost=node2, dbname=postgres user=postgres host=node3'
6565
RAFT_PEERS='1:node1:6666, 2:node2:6666, 3:node3:6666'
6666

6767
cat<<-EOF >>$PGDATA/postgresql.conf

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp