Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd7bc3be

Browse files
knizhnikkelvich
authored andcommitted
Change flush position reporting method
1 parent501c8e7 commitd7bc3be

File tree

5 files changed

+67
-8
lines changed

5 files changed

+67
-8
lines changed

‎multimaster.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ static bool MtmIsRecoverySession;
146146

147147
staticMtmCurrentTransMtmTx;
148148

149+
staticdlist_headMtmLsnMapping=DLIST_STATIC_INIT(MtmLsnMapping);
150+
149151
staticTransactionManagerMtmTM= {
150152
PgTransactionIdGetStatus,
151153
PgTransactionIdSetTreeStatus,
@@ -1032,6 +1034,7 @@ void MtmHandleApplyError(void)
10321034
kill(PostmasterPid,SIGQUIT);
10331035
break;
10341036
}
1037+
FreeErrorData(edata);
10351038
}
10361039

10371040

@@ -1506,6 +1509,7 @@ static void MtmInitialize()
15061509
Mtm->nodes[i].transDelay=0;
15071510
Mtm->nodes[i].lastStatusChangeTime=time(NULL);
15081511
Mtm->nodes[i].con=MtmConnections[i];
1512+
Mtm->nodes[i].flushPos=0;
15091513
}
15101514
PGSemaphoreCreate(&Mtm->votingSemaphore);
15111515
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -2083,6 +2087,45 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20832087
on_shmem_exit(MtmOnProcExit,0);
20842088
}
20852089

2090+
XLogRecPtrMtmGetFlushPosition(intnodeId)
2091+
{
2092+
returnMtm->nodes[nodeId-1].flushPos;
2093+
}
2094+
2095+
voidMtmUpdateLsnMapping(intnode_id,XLogRecPtrend_lsn)
2096+
{
2097+
dlist_mutable_iteriter;
2098+
MtmFlushPosition*flushpos;
2099+
XLogRecPtrlocal_flush=GetFlushRecPtr();
2100+
MemoryContextold_context=MemoryContextSwitchTo(TopMemoryContext);
2101+
2102+
/* Track commit lsn */
2103+
flushpos= (MtmFlushPosition*)palloc(sizeof(MtmFlushPosition));
2104+
flushpos->node_id=node_id;
2105+
flushpos->local_end=XactLastCommitEnd;
2106+
flushpos->remote_end=end_lsn;
2107+
dlist_push_tail(&MtmLsnMapping,&flushpos->node);
2108+
2109+
MtmLock(LW_EXCLUSIVE);
2110+
dlist_foreach_modify(iter,&MtmLsnMapping)
2111+
{
2112+
flushpos=dlist_container(MtmFlushPosition,node,iter.cur);
2113+
if (flushpos->local_end <=local_flush)
2114+
{
2115+
if (Mtm->nodes[node_id-1].flushPos<local_flush) {
2116+
Mtm->nodes[node_id-1].flushPos=local_flush;
2117+
}
2118+
dlist_delete(iter.cur);
2119+
pfree(flushpos);
2120+
}else {
2121+
break;
2122+
}
2123+
}
2124+
MtmUnlock();
2125+
MemoryContextSwitchTo(old_context);
2126+
}
2127+
2128+
20862129
staticvoid
20872130
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
20882131
{

‎multimaster.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ typedef struct
124124
MtmConnectionInfocon;
125125
time_ttransDelay;
126126
time_tlastStatusChangeTime;
127+
XLogRecPtrflushPos;
127128
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
128129
}MtmNodeInfo;
129130

@@ -179,6 +180,15 @@ typedef struct
179180
MtmNodeInfonodes[1];/* [MtmNodes]: per-node data */
180181
}MtmState;
181182

183+
typedefstructMtmFlushPosition
184+
{
185+
dlist_nodenode;
186+
intnode_id;
187+
XLogRecPtrlocal_end;
188+
XLogRecPtrremote_end;
189+
}MtmFlushPosition;
190+
191+
182192
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
183193

184194
externcharconst*constMtmNodeStatusMnem[];
@@ -240,5 +250,7 @@ extern bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN);
240250
externvoidMtmRecoveryCompleted(void);
241251
externvoidMtmMakeTableLocal(char*schema,char*name);
242252
externvoidMtmHandleApplyError(void);
253+
externvoidMtmUpdateLsnMapping(intnodeId,XLogRecPtrendLsn);
254+
externXLogRecPtrMtmGetFlushPosition(intnodeId);
243255

244256
#endif

‎pglogical_apply.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,14 +509,14 @@ process_remote_commit(StringInfo in)
509509
uint8flags;
510510
csn_tcsn;
511511
constchar*gid=NULL;
512-
512+
XLogRecPtrend_lsn;
513513
/* read flags */
514514
flags=pq_getmsgbyte(in);
515515
MtmReplicationNode=pq_getmsgbyte(in);
516516

517517
/* read fields */
518518
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
519-
pq_getmsgint64(in);/* end_lsn */
519+
end_lsn=pq_getmsgint64(in);/* end_lsn */
520520
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
521521

522522
Assert(replorigin_session_origin==InvalidRepOriginId);
@@ -581,6 +581,7 @@ process_remote_commit(StringInfo in)
581581
Assert(false);
582582
}
583583
MtmEndSession(true);
584+
MtmUpdateLsnMapping(MtmReplicationNodeId,end_lsn);
584585
if (flags&PGLOGICAL_CAUGHT_UP) {
585586
MtmRecoveryCompleted();
586587
}
@@ -951,7 +952,9 @@ void MtmExecutor(int id, void* work, size_t size)
951952
}
952953
PG_CATCH();
953954
{
955+
MemoryContextoldcontext=MemoryContextSwitchTo(ApplyContext);
954956
MtmHandleApplyError();
957+
MemoryContextSwitchTo(oldcontext);
955958
EmitErrorReport();
956959
FlushErrorState();
957960
MTM_LOG2("%d: REMOTE begin abort transaction %d",MyProcPid,MtmGetCurrentTransactionId());

‎pglogical_receiver.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,18 @@ receiver_raw_sighup(SIGNAL_ARGS)
8989
* Send a Standby Status Update message to server.
9090
*/
9191
staticbool
92-
sendFeedback(PGconn*conn,int64now,RepOriginIdoriginId)
92+
sendFeedback(PGconn*conn,int64now,intnode_id)
9393
{
9494
charreplybuf[1+8+8+8+8+1];
9595
intlen=0;
96-
XLogRecPtroutput_applied_lsn=replorigin_get_progress(originId, true);
96+
XLogRecPtroutput_applied_lsn=output_written_lsn;
97+
XLogRecPtroutput_flushed_lsn=MtmGetFlushPosition(node_id);
9798

9899
replybuf[len]='r';
99100
len+=1;
100101
fe_sendint64(output_written_lsn,&replybuf[len]);/* write */
101102
len+=8;
102-
fe_sendint64(output_applied_lsn,&replybuf[len]);/* flush */
103+
fe_sendint64(output_flushed_lsn,&replybuf[len]);/* flush */
103104
len+=8;
104105
fe_sendint64(output_applied_lsn,&replybuf[len]);/* apply */
105106
len+=8;
@@ -421,7 +422,7 @@ pglogical_receiver_main(Datum main_arg)
421422
int64now=feGetCurrentTimestamp();
422423

423424
/* Leave is feedback is not sent properly */
424-
if (!sendFeedback(conn,now,originId))
425+
if (!sendFeedback(conn,now,args->remote_node))
425426
proc_exit(1);
426427
}
427428
continue;

‎tests/dtmbench.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ void* writer(void* arg)
150150
for (int i =0; i < cfg.nIterations; i++)
151151
{
152152
//work
153-
transaction<repeatable_read>txn(*conns[random() % conns.size()]);
154-
//transaction<read_committed> txn(*conns[random() % conns.size()]);
153+
//transaction<repeatable_read> txn(*conns[random() % conns.size()]);
154+
transaction<read_committed>txn(*conns[random() % conns.size()]);
155155
int srcAcc =random() % cfg.nAccounts;
156156
int dstAcc =random() % cfg.nAccounts;
157157
if (cfg.scatter) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp