Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e9daf8

Browse files
knizhnikkelvich
authored andcommitted
Avoid restart of server to perform recovery
1 parent524366f commit8e9daf8

File tree

9 files changed

+386
-348
lines changed

9 files changed

+386
-348
lines changed

‎Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ USER postgres
3131
ENV CFLAGS -O0
3232
WORKDIR /pg
3333

34-
ENV REBUILD5
34+
ENV REBUILD6
3535

3636
RUN cd /pg && \
3737
git clone https://github.com/postgrespro/postgres_cluster.git --depth 1 && \

‎arbiter.c

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ static void MtmRegisterSocket(int fd, int node)
195195
ev.events=EPOLLIN;
196196
ev.data.u32=node;
197197
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
198-
elog(ERROR,"Arbiter failed to add socket to epoll set: %d",errno);
198+
elog(LOG,"Arbiter failed to add socket to epoll set: %d",errno);
199199
}
200200
#else
201201
FD_SET(fd,&inset);
@@ -209,7 +209,7 @@ static void MtmUnregisterSocket(int fd)
209209
{
210210
#ifUSE_EPOLL
211211
if (epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL)<0) {
212-
elog(ERROR,"Arbiter failed to unregister socket from epoll set: %d",errno);
212+
elog(LOG,"Arbiter failed to unregister socket from epoll set: %d",errno);
213213
}
214214
#else
215215
FD_CLR(fd,&inset);
@@ -266,12 +266,17 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266266

267267
staticintMtmReadSocket(intsd,void*buf,intbuf_size)
268268
{
269-
intrc=recv(sd,buf,buf_size,0);
270-
if (rc <=0) {
271-
Assert(errno!=EINTR);/* should not happen in non-blocking call */
272-
return-1;
269+
intrc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
270+
if (rc==1) {
271+
intrc=recv(sd,buf,buf_size,0);
272+
if (rc <=0) {
273+
Assert(errno!=EINTR);/* should not happen in non-blocking call */
274+
return-1;
275+
}
276+
returnrc;
277+
}else {
278+
return0;
273279
}
274-
returnrc;
275280
}
276281

277282

@@ -325,7 +330,16 @@ static void MtmSetSocketOptions(int sd)
325330
#endif
326331
}
327332

328-
333+
staticvoidMtmCheckResponse(MtmArbiterMessage*resp)
334+
{
335+
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)) {
336+
elog(WARNING,"Node %d thinks that I was dead, while I am %s",resp->node,MtmNodeStatusMnem[Mtm->status]);
337+
if (Mtm->status!=MTM_RECOVERY) {
338+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
339+
MtmSwitchClusterMode(MTM_RECOVERY);
340+
}
341+
}
342+
}
329343

330344
staticvoidMtmScheduleHeartbeat()
331345
{
@@ -347,7 +361,8 @@ static void MtmSendHeartbeat()
347361

348362
for (i=0;i<Mtm->nAllNodes;i++)
349363
{
350-
if (sockets[i] >=0&&sockets[i]!=busy_socket&& !BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask,i))
364+
if (i+1!=MtmNodeId&&sockets[i]!=busy_socket
365+
&& ((sockets[i] >=0&& !BIT_CHECK(Mtm->disabledNodeMask,i))||BIT_CHECK(Mtm->reconnectMask,i)))
351366
{
352367
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
353368
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
@@ -382,7 +397,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
382397
sock_inet.sin_port=htons(port);
383398

384399
if (!MtmResolveHostByName(host,addrs,&n_addrs)) {
385-
elog(ERROR,"Arbiter failed to resolve host '%s' by name",host);
400+
elog(LOG,"Arbiter failed to resolve host '%s' by name",host);
401+
return-1;
386402
}
387403

388404
Retry:
@@ -391,11 +407,13 @@ static int MtmConnectSocket(int node, int port, int timeout)
391407

392408
sd=socket(AF_INET,SOCK_STREAM,0);
393409
if (sd<0) {
394-
elog(ERROR,"Arbiter failed to create socket: %d",errno);
410+
elog(LOG,"Arbiter failed to create socket: %d",errno);
411+
return-1;
395412
}
396413
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
397414
if (rc<0) {
398-
elog(ERROR,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
415+
elog(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
416+
return-1;
399417
}
400418
busy_socket=sd;
401419
for (i=0;i<n_addrs;++i) {
@@ -463,14 +481,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
463481
}
464482

465483
MtmLock(LW_EXCLUSIVE);
466-
467-
/* Some node considered that I am dead, so switch to recovery mode */
468-
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
469-
elog(WARNING,"Node %d thinks that I was dead",resp.node);
470-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
471-
MtmRollbackAllPreparedTransactions();
472-
MtmSwitchClusterMode(MTM_RECOVERY);
473-
}
484+
MtmCheckResponse(&resp);
474485
MtmUnlock();
475486

476487
returnsd;
@@ -493,7 +504,7 @@ static void MtmOpenConnections()
493504
charconst*arbiterPortStr=strstr(Mtm->nodes[i].con.connStr,"arbiterport=");
494505
if (arbiterPortStr!=NULL) {
495506
if (sscanf(arbiterPortStr+12,"%d",&arbiterPort)!=1) {
496-
elog(ERROR,"Invalid arbiter port: %s",arbiterPortStr+12);
507+
elog(ERROR,"Invalid arbiter port: %s",arbiterPortStr+12);
497508
}
498509
}else {
499510
arbiterPort=MtmArbiterPort+i+1;
@@ -518,11 +529,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
518529
while (true) {
519530
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
520531
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
532+
close(sockets[node]);
533+
sockets[node]=-1;
534+
}
535+
if (BIT_CHECK(Mtm->reconnectMask,node)) {
521536
MtmLock(LW_EXCLUSIVE);
522537
BIT_CLEAR(Mtm->reconnectMask,node);
523538
MtmUnlock();
524-
close(sockets[node]);
525-
sockets[node]=-1;
526539
}
527540
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
528541
if (sockets[node] >=0) {
@@ -535,9 +548,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
535548
MtmOnNodeDisconnect(node+1);
536549
return false;
537550
}
538-
MtmLock(LW_EXCLUSIVE);
539-
BIT_CLEAR(Mtm->reconnectMask,node);
540-
MtmUnlock();
541551
MTM_LOG3("Arbiter restablished connection with node %d",node+1);
542552
}else {
543553
return true;
@@ -563,14 +573,23 @@ static void MtmAcceptOneConnection()
563573
}else {
564574
MtmHandshakeMessagereq;
565575
MtmArbiterMessageresp;
566-
intrc=MtmReadSocket(fd,&req,sizeofreq);
576+
intrc=fcntl(fd,F_SETFL,O_NONBLOCK);
577+
if (rc<0) {
578+
elog(ERROR,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
579+
}
580+
rc=MtmReadSocket(fd,&req,sizeofreq);
567581
if (rc<sizeof(req)) {
568582
elog(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
569583
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
570584
elog(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
571585
close(fd);
572586
}else{
573587
Assert(req.hdr.node>0&&req.hdr.node <=Mtm->nAllNodes&&req.hdr.node!=MtmNodeId);
588+
589+
MtmLock(LW_EXCLUSIVE);
590+
MtmCheckResponse(&req.hdr);
591+
MtmUnlock();
592+
574593
resp.code=MSG_STATUS;
575594
resp.disabledNodeMask=Mtm->disabledNodeMask;
576595
resp.dxid=HANDSHAKE_MAGIC;
@@ -726,6 +745,7 @@ static void MtmTransSender(Datum arg)
726745
CHECK_FOR_INTERRUPTS();
727746
}
728747
elog(LOG,"Stop arbiter sender %d",MyProcPid);
748+
proc_exit(1);/* force restart of this bgwroker */
729749
}
730750

731751

@@ -863,9 +883,7 @@ static void MtmTransReceiver(Datum arg)
863883
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,msg->node);
864884
continue;
865885
}
866-
if (BIT_CHECK(msg->disabledNodeMask,MtmNodeId-1)&&Mtm->status!=MTM_RECOVERY) {
867-
elog(PANIC,"Node %d thinks that I was dead: perform hara-kiri not to be a zombie",msg->node);
868-
}
886+
MtmCheckResponse(msg);
869887

870888
if (MtmIsCoordinator(ts)) {
871889
switch (msg->code) {
@@ -975,5 +993,6 @@ static void MtmTransReceiver(Datum arg)
975993
MtmRefreshClusterStatus(false);
976994
}
977995
}
996+
proc_exit(1);/* force restart of this bgwroker */
978997
}
979998

‎docker/docker-entrypoint.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ cat >> "$PGDATA/postgresql.conf" <<-EOF
2525
max_worker_processes = 100
2626
wal_level = logical
2727
fsync = off
28+
log_line_prefix = '%t: '
2829
max_wal_senders = 10
2930
wal_sender_timeout = 0
3031
max_replication_slots = 10

‎multimaster.c

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,9 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
913913
{
914914
MtmTransMap*tm;
915915

916+
if (Mtm->status==MTM_RECOVERY) {
917+
return;
918+
}
916919
if (x->status!=TRANSACTION_STATUS_ABORTED) {
917920
MtmLock(LW_EXCLUSIVE);
918921
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
@@ -947,7 +950,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
947950
}
948951
if (ts!=NULL) {
949952
if (commit) {
950-
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN);
953+
/* Assert(ts->status == TRANSACTION_STATUS_UNKNOWN); */
954+
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN
955+
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY));/* ??? Why there is commit without prepare */
951956
if (x->csn>ts->csn) {
952957
ts->csn=x->csn;
953958
MtmSyncClock(ts->csn);
@@ -1013,6 +1018,15 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
10131018
}
10141019
}
10151020

1021+
1022+
staticvoidMtmStartRecovery()
1023+
{
1024+
MtmLock(LW_EXCLUSIVE);
1025+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
1026+
MtmSwitchClusterMode(MTM_RECOVERY);
1027+
MtmUnlock();
1028+
}
1029+
10161030
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
10171031
{
10181032
MtmTx.gtid=*gtid;
@@ -1035,19 +1049,15 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
10351049
if (!TransactionIdIsValid(gtid->xid)) {
10361050
/* In case of recovery InvalidTransactionId is passed */
10371051
if (Mtm->status!=MTM_RECOVERY) {
1038-
elog(PANIC,"Node %d tries to recover node %d which is in %s mode",gtid->node,MtmNodeId,MtmNodeStatusMnem[Mtm->status]);
1052+
elog(WARNING,"Node %d tries to recover node %d which is in %s mode",gtid->node,MtmNodeId,MtmNodeStatusMnem[Mtm->status]);
1053+
MtmStartRecovery();
10391054
}
10401055
}elseif (Mtm->status==MTM_RECOVERY) {
10411056
/* When recovery is completed we get normal transaction ID and switch to normal mode */
10421057
MtmRecoveryCompleted();
10431058
}
10441059
}
10451060

1046-
voidMtmRollbackAllPreparedTransactions(void)
1047-
{
1048-
}
1049-
1050-
10511061
voidMtmSetCurrentTransactionGID(charconst*gid)
10521062
{
10531063
MTM_LOG3("Set current transaction xid=%d GID %s",MtmTx.xid,gid);
@@ -1440,6 +1450,8 @@ bool MtmRefreshClusterStatus(bool nowait)
14401450
}
14411451
}
14421452
#endif
1453+
Mtm->reconnectMask |=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
1454+
14431455
if (disabled) {
14441456
MtmCheckQuorum();
14451457
}
@@ -1470,8 +1482,8 @@ bool MtmRefreshClusterStatus(bool nowait)
14701482
MtmSwitchClusterMode(MTM_OFFLINE);
14711483
}
14721484
}elseif (Mtm->status==MTM_OFFLINE) {
1473-
/* Should we somehow restart logical receivers? */
1474-
MtmSwitchClusterMode(MTM_RECOVERY);
1485+
/* Should we somehow restart logical receivers? */
1486+
MtmStartRecovery();
14751487
}
14761488
}else {
14771489
MTM_LOG1("Clique %lx has no quorum", (long)clique);
@@ -1500,7 +1512,13 @@ void MtmOnNodeDisconnect(int nodeId)
15001512
{
15011513
MtmTransState*ts;
15021514

1503-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)>MtmGetSystemTime()) {
1515+
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
1516+
{
1517+
/* Node is already disabled */
1518+
return;
1519+
}
1520+
if (Mtm->nodes[nodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)>MtmGetSystemTime())
1521+
{
15041522
/* Avoid false detection of node failure and prevent node status blinking */
15051523
return;
15061524
}

‎multimaster.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ typedef enum
119119
{
120120
SLOT_CREATE_NEW,/* create new slot (drop existed) */
121121
SLOT_OPEN_EXISTED,/* open existed slot */
122-
SLOT_OPEN_ALWAYS,/* open existed slot or create new ifnoty exists */
122+
SLOT_OPEN_ALWAYS,/* open existed slot or create new ifnot exists */
123123
}MtmSlotMode;
124124

125125
typedefstruct
@@ -279,7 +279,6 @@ extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
279279
externXLogRecPtrMtmGetFlushPosition(intnodeId);
280280
externvoidMtmWatchdog(void);
281281
externvoidMtmCheckHeartbeat(void);
282-
externvoidMtmRollbackAllPreparedTransactions(void);
283282

284283

285284

‎pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ process_remote_commit(StringInfo in)
582582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
583583
gid=pq_getmsgstring(in);
584584
MTM_LOG2("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MtmGetSystemTime(),gid);
585-
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
585+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
586586
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MtmGetSystemTime(),gid);
587587
StartTransactionCommand();
588588
MtmSetCurrentTransactionGID(gid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp