Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5bd20f8

Browse files
knizhnikkelvich
authored andcommitted
Do not use epoll
1 parent8c5bcc9 commit5bd20f8

File tree

7 files changed

+79
-32
lines changed

7 files changed

+79
-32
lines changed

‎arbiter.c

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959

6060
#ifndefUSE_EPOLL
6161
#ifdef__linux__
62-
#defineUSE_EPOLL1
62+
#defineUSE_EPOLL0
6363
#else
6464
#defineUSE_EPOLL 0
6565
#endif
@@ -105,7 +105,7 @@ typedef struct
105105
staticint*sockets;
106106
staticintgateway;
107107
staticboolsend_heartbeat;
108-
statictimestamp_tlast_sent_hearbeat;
108+
statictimestamp_tlast_sent_heartbeat;
109109
staticTimeoutIdheartbeat_timer;
110110
staticintbusy_socket;
111111

@@ -266,17 +266,20 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266266

267267
staticintMtmReadSocket(intsd,void*buf,intbuf_size)
268268
{
269-
intrc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
270-
if (rc==1) {
271-
intrc=recv(sd,buf,buf_size,0);
272-
if (rc <=0) {
273-
Assert(errno!=EINTR);/* should not happen in non-blocking call */
274-
return-1;
269+
intrc=recv(sd,buf,buf_size,0);
270+
if (rc<0&&errno==EAGAIN) {
271+
rc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
272+
if (rc==1) {
273+
rc=recv(sd,buf,buf_size,0);
274+
if (rc<0) {
275+
Assert(errno!=EINTR);/* should not happen in non-blocking call */
276+
return-1;
277+
}
278+
}else {
279+
return0;
275280
}
276-
returnrc;
277-
}else {
278-
return0;
279281
}
282+
returnrc;
280283
}
281284

282285

@@ -343,7 +346,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
343346

344347
staticvoidMtmScheduleHeartbeat()
345348
{
346-
//Assert(!last_sent_hearbeat ||last_sent_hearbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
349+
//Assert(!last_sent_heartbeat ||last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
347350
enable_timeout_after(heartbeat_timer,MtmHeartbeatSendTimeout);
348351
send_heartbeat= true;
349352
PGSemaphoreUnlock(&Mtm->votingSemaphore);
@@ -353,11 +356,16 @@ static void MtmSendHeartbeat()
353356
{
354357
inti;
355358
MtmArbiterMessagemsg;
359+
timestamp_tnow=MtmGetSystemTime();
356360
msg.code=MSG_HEARTBEAT;
357361
msg.disabledNodeMask=Mtm->disabledNodeMask;
358362
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
359363
msg.node=MtmNodeId;
360-
last_sent_hearbeat=MtmGetSystemTime();
364+
msg.csn=now;
365+
if (last_sent_heartbeat+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
366+
MTM_LOG1("More than %ld microseconds since last heartbeat",now-last_sent_heartbeat);
367+
}
368+
last_sent_heartbeat=now;
361369

362370
for (i=0;i<Mtm->nAllNodes;i++)
363371
{
@@ -366,6 +374,8 @@ static void MtmSendHeartbeat()
366374
{
367375
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
368376
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
377+
}else {
378+
MTM_LOG1("Send heartbeat to node %d with timestamp %ld",i+1,now);
369379
}
370380
}
371381
}
@@ -558,7 +568,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
558568
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)
559569
{
560570
intrc=MtmReadSocket(sockets[node],buf,buf_size);
561-
if (rc <=0) {
571+
if (rc<0) {
562572
elog(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
563573
MtmDisconnect(node);
564574
}
@@ -812,6 +822,8 @@ static void MtmTransReceiver(Datum arg)
812822
}
813823

814824
while (!stop) {
825+
timestamp_tstartPolling=MtmGetSystemTime();
826+
timestamp_tstopPolling;
815827
#ifUSE_EPOLL
816828
n=epoll_wait(epollfd,events,nNodes,MtmHeartbeatRecvTimeout);
817829
if (n<0) {
@@ -820,13 +832,17 @@ static void MtmTransReceiver(Datum arg)
820832
}
821833
elog(ERROR,"Arbiter failed to poll sockets: %d",errno);
822834
}
835+
stopPolling=MtmGetSystemTime();
836+
823837
for (j=0;j<n;j++) {
824838
i=events[j].data.u32;
825839
if (events[j].events&EPOLLERR) {
826840
elog(WARNING,"Arbiter lost connection with node %d",i+1);
827841
MtmDisconnect(i);
828842
}
829-
elseif (events[j].events&EPOLLIN)
843+
}
844+
for (j=0;j<n;j++) {
845+
if (events[j].events&EPOLLIN)
830846
#else
831847
fd_setevents;
832848
do {
@@ -842,6 +858,8 @@ static void MtmTransReceiver(Datum arg)
842858
if (n<0) {
843859
elog(ERROR,"Arbiter failed to select sockets: %d",errno);
844860
}
861+
stopPolling=MtmGetSystemTime();
862+
845863
for (i=0;i<nNodes;i++) {
846864
if (sockets[i] >=0&&FD_ISSET(sockets[i],&events))
847865
#endif
@@ -871,7 +889,8 @@ static void MtmTransReceiver(Datum arg)
871889
Mtm->nodes[msg->node-1].lastHeartbeat=MtmGetSystemTime();
872890

873891
if (msg->code==MSG_HEARTBEAT) {
874-
MTM_LOG3("Receive HEARTBEAT from node %d at %ld",msg->node,USEC_TO_MSEC(MtmGetSystemTime()));
892+
MTM_LOG1("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893+
msg->node,msg->csn,USEC_TO_MSEC(MtmGetSystemTime()-msg->csn));
875894
continue;
876895
}
877896
if (BIT_CHECK(msg->disabledNodeMask,msg->node-1)) {
@@ -985,7 +1004,14 @@ static void MtmTransReceiver(Datum arg)
9851004
}
9861005
now=MtmGetSystemTime();
9871006
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
988-
MtmWatchdog();
1007+
if (!MtmWatchdog(stopPolling)) {
1008+
for (i=0;i<nNodes;i++) {
1009+
if (Mtm->nodes[i].lastHeartbeat!=0&&sockets[i] >=0) {
1010+
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago",i+1,now-Mtm->nodes[i].lastHeartbeat);
1011+
}
1012+
}
1013+
MTM_LOG1("epoll started %ld and finished %ld microseconds ago",now-startPolling,now-stopPolling);
1014+
}
9891015
lastHeartbeatCheck=now;
9901016
}
9911017
if (n==0&&Mtm->disabledNodeMask!=0) {

‎multimaster.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,13 @@ void MtmLock(LWLockMode mode)
248248
#ifdefUSE_SPINLOCK
249249
SpinLockAcquire(&Mtm->spinlock);
250250
#else
251+
timestamp_tstart,stop;
252+
start=MtmGetSystemTime();
251253
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID],mode);
254+
stop=MtmGetSystemTime();
255+
if (stop>start+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
256+
MTM_LOG1("%d: obtaining %s lock takes %ld microseconds",MyProcPid, (mode==LW_EXCLUSIVE ?"exclusive" :"shared"),stop-start);
257+
}
252258
#endif
253259
Mtm->lastLockHolder=MyProcPid;
254260
}
@@ -819,10 +825,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
819825
/*
820826
* Check heartbeats
821827
*/
822-
voidMtmWatchdog(void)
828+
boolMtmWatchdog(timestamp_tnow)
823829
{
824830
inti,n=Mtm->nAllNodes;
825-
timestamp_tnow=MtmGetSystemTime();
831+
boolallAlive=true;
826832
for (i=0;i<n;i++) {
827833
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)) {
828834
if (Mtm->nodes[i].lastHeartbeat!=0
@@ -831,9 +837,11 @@ void MtmWatchdog(void)
831837
elog(WARNING,"Heartbeat is not received from node %d during %d msec",
832838
i+1, (int)USEC_TO_MSEC(now-Mtm->nodes[i].lastHeartbeat));
833839
MtmOnNodeDisconnect(i+1);
840+
allAlive= false;
834841
}
835842
}
836843
}
844+
returnallAlive;
837845
}
838846

839847

@@ -926,7 +934,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
926934
MtmLock(LW_EXCLUSIVE);
927935
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
928936
Assert(tm!=NULL&&tm->state!=NULL);
929-
MTM_LOG1("%ld:Abort prepared transaction %d with gid='%s'",MtmGetSystemTime(),x->xid,x->gid);
937+
MTM_LOG1("Abort prepared transaction %d with gid='%s'",x->xid,x->gid);
930938
MtmAbortTransaction(tm->state);
931939
MtmUnlock();
932940
x->status=TRANSACTION_STATUS_ABORTED;
@@ -1184,6 +1192,7 @@ static void MtmDisableNode(int nodeId)
11841192
staticvoidMtmEnableNode(intnodeId)
11851193
{
11861194
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1195+
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
11871196
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
11881197
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
11891198
Mtm->nLiveNodes+=1;
@@ -1569,6 +1578,7 @@ void MtmOnNodeConnect(int nodeId)
15691578
{
15701579
MtmLock(LW_EXCLUSIVE);
15711580
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
1581+
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
15721582
MtmUnlock();
15731583

15741584
MTM_LOG1("Reconnect node %d",nodeId);
@@ -1881,7 +1891,7 @@ _PG_init(void)
18811891
DefineCustomIntVariable(
18821892
"multimaster.heartbeat_send_timeout",
18831893
"Timeout in milliseconds of sending heartbeat messages",
1884-
"Period of broadcasting heartbeat messages byabiter to all nodes",
1894+
"Period of broadcasting heartbeat messages byarbiter to all nodes",
18851895
&MtmHeartbeatSendTimeout,
18861896
1000,
18871897
1,
@@ -2286,6 +2296,7 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
22862296
/* Choose for recovery first available slot */
22872297
MTM_LOG1("Start recovery from node %d",nodeId);
22882298
Mtm->recoverySlot=nodeId;
2299+
FinishAllPreparedTransactions(false);
22892300
returnSLOT_OPEN_EXISTED;
22902301
}
22912302
}

‎multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ extern void MtmMakeTableLocal(char* schema, char* name);
277277
externvoidMtmHandleApplyError(void);
278278
externvoidMtmUpdateLsnMapping(intnodeId,XLogRecPtrendLsn);
279279
externXLogRecPtrMtmGetFlushPosition(intnodeId);
280-
externvoidMtmWatchdog(void);
280+
externboolMtmWatchdog(timestamp_tnow);
281281
externvoidMtmCheckHeartbeat(void);
282282

283283

‎pglogical_apply.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -538,11 +538,11 @@ process_remote_commit(StringInfo in)
538538
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
539539
gid=pq_getmsgstring(in);
540540
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
541-
MTM_LOG1("%ld: avoidprepare of previously aborted global transaction %s",MtmGetSystemTime(),gid);
541+
MTM_LOG1("Avoidprepare of previously aborted global transaction %s",gid);
542542
AbortCurrentTransaction();
543543
}else {
544544
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545-
MTM_LOG2("%ld:PGLOGICAL_PREPARE commit: gid=%s",MtmGetSystemTime(),gid);
545+
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s",gid);
546546
BeginTransactionBlock();
547547
CommitTransactionCommand();
548548
StartTransactionCommand();
@@ -554,7 +554,7 @@ process_remote_commit(StringInfo in)
554554
CommitTransactionCommand();
555555

556556
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_UNKNOWN)==TRANSACTION_STATUS_ABORTED) {
557-
MTM_LOG1("%ld: performdelayed rollback of prepared global transaction %s",MtmGetSystemTime(),gid);
557+
MTM_LOG1("Performdelayed rollback of prepared global transaction %s",gid);
558558
StartTransactionCommand();
559559
MtmSetCurrentTransactionGID(gid);
560560
FinishPreparedTransaction(gid, false);
@@ -568,7 +568,7 @@ process_remote_commit(StringInfo in)
568568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569569
csn=pq_getmsgint64(in);
570570
gid=pq_getmsgstring(in);
571-
MTM_LOG2("%ld:PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",MtmGetSystemTime(),csn,gid);
571+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",csn,gid);
572572
StartTransactionCommand();
573573
MtmBeginSession();
574574
MtmSetCurrentTransactionCSN(csn);
@@ -581,9 +581,9 @@ process_remote_commit(StringInfo in)
581581
{
582582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
583583
gid=pq_getmsgstring(in);
584-
MTM_LOG2("%ld:PGLOGICAL_ABORT_PREPARED commit: gid=%s",MtmGetSystemTime(),gid);
584+
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s",gid);
585585
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
586-
MTM_LOG1("%ld:PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MtmGetSystemTime(),gid);
586+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
587587
StartTransactionCommand();
588588
MtmSetCurrentTransactionGID(gid);
589589
FinishPreparedTransaction(gid, false);

‎pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161161
}
162162
pq_sendbyte(out,'C');/* sending COMMIT */
163163

164-
MTM_LOG2("%ld:PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",MtmGetSystemTime(),flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
164+
MTM_LOG2("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
165165

166166
/* send the flags field */
167167
pq_sendbyte(out,flags);

‎pglogical_receiver.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
#include"spill.h"
4141

4242
#defineERRCODE_DUPLICATE_OBJECT_STR "42710"
43-
#defineRECEIVER_SUSPEND_TIMEOUT (10*USECS_PER_SEC)
43+
#defineRECEIVER_SUSPEND_TIMEOUT (1*USECS_PER_SEC)
4444

4545
/* Signal handling */
4646
staticvolatilesig_atomic_tgot_sigterm= false;
@@ -357,9 +357,10 @@ pglogical_receiver_main(Datum main_arg)
357357
if (rc&WL_POSTMASTER_DEATH)
358358
proc_exit(1);
359359

360-
if (Mtm->status==MTM_OFFLINE|| (Mtm->status==MTM_RECOVERY&&Mtm->recoverySlot!=nodeId)) {
360+
if (Mtm->status==MTM_OFFLINE|| (Mtm->status==MTM_RECOVERY&&Mtm->recoverySlot!=nodeId))
361+
{
361362
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode",worker_proc,MtmNodeStatusMnem[Mtm->status])));
362-
gotoOnError;
363+
break;
363364
}
364365

365366

@@ -578,6 +579,9 @@ pglogical_receiver_main(Datum main_arg)
578579
gotoOnError;
579580
}
580581
}
582+
PQfinish(conn);
583+
continue;
584+
581585
OnError:
582586
PQfinish(conn);
583587
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);

‎raftable.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
1919
voidRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait)
2020
{
2121
if (MtmUseRaftable) {
22+
timestamp_tstart,stop;
23+
start=MtmGetSystemTime();
2224
if (nowait) {
2325
raftable_set(key,value,size,0);
2426
}else {
2527
while (!raftable_set(key,value,size,MtmHeartbeatSendTimeout)) {
2628
MtmCheckHeartbeat();
2729
}
2830
}
31+
stop=MtmGetSystemTime();
32+
if (stop>start+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
33+
MTM_LOG1("Raftable set nowait=%d takes %ld microseconds",nowait,stop-start);
34+
}
2935
}
3036
}
3137

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp