Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit45945a8

Browse files
committed
Change flush position reporting method
1 parent08994d4 commit45945a8

File tree

5 files changed

+67
-8
lines changed

5 files changed

+67
-8
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ static bool MtmIsRecoverySession;
147147

148148
staticMtmCurrentTransMtmTx;
149149

150+
staticdlist_headMtmLsnMapping=DLIST_STATIC_INIT(MtmLsnMapping);
151+
150152
staticTransactionManagerMtmTM= {
151153
PgTransactionIdGetStatus,
152154
PgTransactionIdSetTreeStatus,
@@ -1033,6 +1035,7 @@ void MtmHandleApplyError(void)
10331035
kill(PostmasterPid,SIGQUIT);
10341036
break;
10351037
}
1038+
FreeErrorData(edata);
10361039
}
10371040

10381041

@@ -1507,6 +1510,7 @@ static void MtmInitialize()
15071510
Mtm->nodes[i].transDelay=0;
15081511
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
15091512
Mtm->nodes[i].con=MtmConnections[i];
1513+
Mtm->nodes[i].flushPos=0;
15101514
}
15111515
PGSemaphoreCreate(&Mtm->votingSemaphore);
15121516
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -2084,6 +2088,45 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20842088
on_shmem_exit(MtmOnProcExit,0);
20852089
}
20862090

2091+
XLogRecPtrMtmGetFlushPosition(intnodeId)
2092+
{
2093+
returnMtm->nodes[nodeId-1].flushPos;
2094+
}
2095+
2096+
voidMtmUpdateLsnMapping(intnode_id,XLogRecPtrend_lsn)
2097+
{
2098+
dlist_mutable_iteriter;
2099+
MtmFlushPosition*flushpos;
2100+
XLogRecPtrlocal_flush=GetFlushRecPtr();
2101+
MemoryContextold_context=MemoryContextSwitchTo(TopMemoryContext);
2102+
2103+
/* Track commit lsn */
2104+
flushpos= (MtmFlushPosition*)palloc(sizeof(MtmFlushPosition));
2105+
flushpos->node_id=node_id;
2106+
flushpos->local_end=XactLastCommitEnd;
2107+
flushpos->remote_end=end_lsn;
2108+
dlist_push_tail(&MtmLsnMapping,&flushpos->node);
2109+
2110+
MtmLock(LW_EXCLUSIVE);
2111+
dlist_foreach_modify(iter,&MtmLsnMapping)
2112+
{
2113+
flushpos=dlist_container(MtmFlushPosition,node,iter.cur);
2114+
if (flushpos->local_end <=local_flush)
2115+
{
2116+
if (Mtm->nodes[node_id-1].flushPos<local_flush) {
2117+
Mtm->nodes[node_id-1].flushPos=local_flush;
2118+
}
2119+
dlist_delete(iter.cur);
2120+
pfree(flushpos);
2121+
}else {
2122+
break;
2123+
}
2124+
}
2125+
MtmUnlock();
2126+
MemoryContextSwitchTo(old_context);
2127+
}
2128+
2129+
20872130
staticvoid
20882131
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
20892132
{

‎contrib/mmts/multimaster.h‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ typedef struct
124124
MtmConnectionInfocon;
125125
time_ttransDelay;
126126
time_tlastStatusChangeTime;
127+
XLogRecPtrflushPos;
127128
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
128129
}MtmNodeInfo;
129130

@@ -179,6 +180,15 @@ typedef struct
179180
MtmNodeInfonodes[1];/* [MtmNodes]: per-node data */
180181
}MtmState;
181182

183+
typedefstructMtmFlushPosition
184+
{
185+
dlist_nodenode;
186+
intnode_id;
187+
XLogRecPtrlocal_end;
188+
XLogRecPtrremote_end;
189+
}MtmFlushPosition;
190+
191+
182192
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
183193

184194
externcharconst*constMtmNodeStatusMnem[];
@@ -240,5 +250,7 @@ extern bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN);
240250
externvoidMtmRecoveryCompleted(void);
241251
externvoidMtmMakeTableLocal(char*schema,char*name);
242252
externvoidMtmHandleApplyError(void);
253+
externvoidMtmUpdateLsnMapping(intnodeId,XLogRecPtrendLsn);
254+
externXLogRecPtrMtmGetFlushPosition(intnodeId);
243255

244256
#endif

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,14 +509,14 @@ process_remote_commit(StringInfo in)
509509
uint8flags;
510510
csn_tcsn;
511511
constchar*gid=NULL;
512-
512+
XLogRecPtrend_lsn;
513513
/* read flags */
514514
flags=pq_getmsgbyte(in);
515515
MtmReplicationNode=pq_getmsgbyte(in);
516516

517517
/* read fields */
518518
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
519-
pq_getmsgint64(in);/* end_lsn */
519+
end_lsn=pq_getmsgint64(in);/* end_lsn */
520520
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
521521

522522
Assert(replorigin_session_origin==InvalidRepOriginId);
@@ -581,6 +581,7 @@ process_remote_commit(StringInfo in)
581581
Assert(false);
582582
}
583583
MtmEndSession(true);
584+
MtmUpdateLsnMapping(MtmReplicationNodeId,end_lsn);
584585
if (flags&PGLOGICAL_CAUGHT_UP) {
585586
MtmRecoveryCompleted();
586587
}
@@ -951,7 +952,9 @@ void MtmExecutor(int id, void* work, size_t size)
951952
}
952953
PG_CATCH();
953954
{
955+
MemoryContextoldcontext=MemoryContextSwitchTo(ApplyContext);
954956
MtmHandleApplyError();
957+
MemoryContextSwitchTo(oldcontext);
955958
EmitErrorReport();
956959
FlushErrorState();
957960
MTM_LOG2("%d: REMOTE begin abort transaction %d",MyProcPid,MtmGetCurrentTransactionId());

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,18 @@ receiver_raw_sighup(SIGNAL_ARGS)
8989
* Send a Standby Status Update message to server.
9090
*/
9191
staticbool
92-
sendFeedback(PGconn*conn,int64now,RepOriginIdoriginId)
92+
sendFeedback(PGconn*conn,int64now,intnode_id)
9393
{
9494
charreplybuf[1+8+8+8+8+1];
9595
intlen=0;
96-
XLogRecPtroutput_applied_lsn=replorigin_get_progress(originId, true);
96+
XLogRecPtroutput_applied_lsn=output_written_lsn;
97+
XLogRecPtroutput_flushed_lsn=MtmGetFlushPosition(node_id);
9798

9899
replybuf[len]='r';
99100
len+=1;
100101
fe_sendint64(output_written_lsn,&replybuf[len]);/* write */
101102
len+=8;
102-
fe_sendint64(output_applied_lsn,&replybuf[len]);/* flush */
103+
fe_sendint64(output_flushed_lsn,&replybuf[len]);/* flush */
103104
len+=8;
104105
fe_sendint64(output_applied_lsn,&replybuf[len]);/* apply */
105106
len+=8;
@@ -421,7 +422,7 @@ pglogical_receiver_main(Datum main_arg)
421422
int64now=feGetCurrentTimestamp();
422423

423424
/* Leave is feedback is not sent properly */
424-
if (!sendFeedback(conn,now,originId))
425+
if (!sendFeedback(conn,now,args->remote_node))
425426
proc_exit(1);
426427
}
427428
continue;

‎contrib/mmts/tests/dtmbench.cpp‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ void* writer(void* arg)
150150
for (int i =0; i < cfg.nIterations; i++)
151151
{
152152
//work
153-
transaction<repeatable_read>txn(*conns[random() % conns.size()]);
154-
//transaction<read_committed> txn(*conns[random() % conns.size()]);
153+
//transaction<repeatable_read> txn(*conns[random() % conns.size()]);
154+
transaction<read_committed>txn(*conns[random() % conns.size()]);
155155
int srcAcc =random() % cfg.nAccounts;
156156
int dstAcc =random() % cfg.nAccounts;
157157
if (cfg.scatter) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp