Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd44ba2d

Browse files
committed
Merge branch 'PGPROEE9_6_RDMA' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into PGPROEE9_6_RDMA
Fixes from MULTIMASTER branch merged
2 parentsb21424b +548fa34 commitd44ba2d

File tree

7 files changed

+146
-112
lines changed

7 files changed

+146
-112
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 61 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static void MtmSender(Datum arg);
9797
staticvoidMtmReceiver(Datumarg);
9898
staticvoidMtmMonitor(Datumarg);
9999
staticvoidMtmSendHeartbeat(void);
100-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout);
100+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
101101

102102
charconst*constMtmMessageKindMnem[]=
103103
{
@@ -166,7 +166,7 @@ static void MtmRegisterSocket(int fd, int node)
166166
ev.events=EPOLLIN;
167167
ev.data.u32=node;
168168
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
169-
MTM_ELOG(LOG,"Arbiter failed to add socket to epoll set: %d",errno);
169+
MTM_ELOG(LOG,"Arbiter failed to add socket to epoll set: %s",strerror(errno));
170170
}
171171
#else
172172
FD_SET(fd,&inset);
@@ -180,7 +180,7 @@ static void MtmUnregisterSocket(int fd)
180180
{
181181
#ifUSE_EPOLL
182182
if (epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL)<0) {
183-
MTM_ELOG(LOG,"Arbiter failed to unregister socket from epoll set: %d",errno);
183+
MTM_ELOG(LOG,"Arbiter failed to unregister socket from epoll set: %s",strerror(errno));
184184
}
185185
#else
186186
FD_CLR(fd,&inset);
@@ -371,7 +371,7 @@ static void MtmSendHeartbeat()
371371
|| !BIT_CHECK(Mtm->disabledNodeMask,i)
372372
||BIT_CHECK(Mtm->reconnectMask,i)))
373373
{
374-
if (!MtmSendToNode(i,&msg,sizeof(msg),MtmHeartbeatRecvTimeout)) {
374+
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
375375
MTM_ELOG(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
376376
}else {
377377
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
@@ -408,7 +408,7 @@ void MtmCheckHeartbeat()
408408
}
409409

410410

411-
staticintMtmConnectSocket(intnode,intport,time_ttimeout)
411+
staticintMtmConnectSocket(intnode,intport)
412412
{
413413
structaddrinfo*addrs=NULL;
414414
structaddrinfo*addr;
@@ -417,12 +417,9 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
417417
MtmHandshakeMessagereq;
418418
MtmArbiterMessageresp;
419419
intsd=-1;
420-
intret;
421-
timestamp_tstart=MtmGetSystemTime();
420+
intrc;
422421
charconst*host=Mtm->nodes[node].con.hostName;
423422
nodemask_tsave_mask=busy_mask;
424-
timestamp_tafterWait;
425-
timestamp_tbeforeWait;
426423

427424
/* Initialize hint structure */
428425
MemSet(&hint,0,sizeof(hint));
@@ -431,67 +428,60 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
431428

432429
snprintf(portstr,sizeof(portstr),"%d",port);
433430

434-
ret=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
435-
if (ret!=0)
431+
rc=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
432+
if (rc!=0)
436433
{
437-
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name:(%d) %s",host,ret,gai_strerror(ret));
434+
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name:%s",host,gai_strerror(rc));
438435
return-1;
439436
}
440437
BIT_SET(busy_mask,node);
441438

442439
Retry:
443-
while (1) {
444-
intrc=-1;
445-
sd=pg_socket(AF_INET,SOCK_STREAM,0,MtmUseRDMA);
446-
if (sd<0) {
447-
MTM_ELOG(LOG,"Arbiter failed to create socket: %d",errno);
448-
gotoError;
449-
}
450-
rc=pg_fcntl(sd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
451-
if (rc<0) {
452-
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
453-
gotoError;
454-
}
455-
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
456-
{
457-
do {
458-
rc=pg_connect(sd,addr->ai_addr,addr->ai_addrlen,MtmUseRDMA);
459-
}while (rc<0&&errno==EINTR);
460440

461-
if (rc >=0||errno==EINPROGRESS) {
462-
break;
463-
}
464-
}
465-
if (rc==0) {
441+
sd=pg_socket(AF_INET,SOCK_STREAM,0,MtmUseRDMA);
442+
if (sd<0) {
443+
MTM_ELOG(LOG,"Arbiter failed to create socket: %s",strerror(errno));
444+
gotoError;
445+
}
446+
rc=pg_fcntl(sd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
447+
if (rc<0) {
448+
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
449+
gotoError;
450+
}
451+
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
452+
{
453+
do {
454+
rc=pg_connect(sd,addr->ai_addr,addr->ai_addrlen,MtmUseRDMA);
455+
}while (rc<0&&errno==EINTR);
456+
457+
if (rc >=0||errno==EINPROGRESS) {
466458
break;
467459
}
468-
beforeWait=MtmGetSystemTime();
469-
if (errno!=EINPROGRESS||start+MSEC_TO_USEC(timeout)<beforeWait ) {
470-
MTM_ELOG(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
471-
gotoError;
472-
}else {
473-
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
474-
if (rc==1) {
475-
socklen_toptlen=sizeof(int);
476-
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&rc,&optlen)<0) {
477-
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: error=%d",host,port,errno);
478-
gotoError;
479-
}
480-
if (rc==0) {
481-
break;
482-
}else {
483-
MTM_ELOG(WARNING,"Arbiter trying to connect to %s:%d: rc=%d, error=%d",host,port,rc,errno);
484-
}
485-
}else {
486-
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: rc=%d, error=%d",host,port,rc,errno);
460+
}
461+
462+
if (rc!=0&&errno==EINPROGRESS) {
463+
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
464+
if (rc==1) {
465+
socklen_toptlen=sizeof(int);
466+
interrcode;
467+
468+
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&errcode,&optlen)<0) {
469+
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: %s",host,port,strerror(errcode));
470+
gotoError;
487471
}
488-
pg_closesocket(sd,MtmUseRDMA);
489-
afterWait=MtmGetSystemTime();
490-
if (afterWait<beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
491-
MtmSleep(beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)-afterWait);
472+
if (errcode!=0) {
473+
MTM_ELOG(WARNING,"Arbiter trying to connect to %s:%d: %s",host,port,strerror(errcode));
474+
gotoError;
492475
}
476+
}else {
477+
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: %s",host,port,strerror(errno));
493478
}
494479
}
480+
elseif (rc!=0) {
481+
MTM_ELOG(WARNING,"Arbiter failed to connect to %s:%d: (%d) %s",host,port,rc,strerror(errno));
482+
gotoError;
483+
}
484+
495485
MtmSetSocketOptions(sd);
496486
MtmInitMessage(&req.hdr,MSG_HANDSHAKE);
497487
req.hdr.node=MtmNodeId;
@@ -500,12 +490,12 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
500490
req.hdr.csn=MtmGetCurrentTime();
501491
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
502492
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
503-
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
493+
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %s",host,port,strerror(errno));
504494
pg_closesocket(sd,MtmUseRDMA);
505495
gotoRetry;
506496
}
507497
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
508-
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:errno=%d",host,port,errno);
498+
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:%s",host,port,strerror(errno));
509499
pg_closesocket(sd,MtmUseRDMA);
510500
gotoRetry;
511501
}
@@ -527,7 +517,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
527517

528518
returnsd;
529519

530-
Error:
520+
Error:
531521
busy_mask=save_mask;
532522
if (sd >=0) {
533523
pg_closesocket(sd,MtmUseRDMA);
@@ -551,7 +541,7 @@ static void MtmOpenConnections()
551541
}
552542
for (i=0;i<nNodes;i++) {
553543
if (i+1!=MtmNodeId&&i<Mtm->nAllNodes) {
554-
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort,MtmConnectTimeout);
544+
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort);
555545
if (sockets[i]<0) {
556546
MtmOnNodeDisconnect(i+1);
557547
}
@@ -566,7 +556,7 @@ static void MtmOpenConnections()
566556
}
567557

568558

569-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout)
559+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
570560
{
571561
boolresult= true;
572562
nodemask_tsave_mask=busy_mask;
@@ -589,11 +579,11 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
589579
}
590580
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
591581
if (sockets[node] >=0) {
592-
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %d",node+1,errno);
582+
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %s",node+1,strerror(errno));
593583
pg_closesocket(sockets[node],MtmUseRDMA);
594584
sockets[node]=-1;
595585
}
596-
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort,reconnectTimeout);
586+
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort);
597587
if (sockets[node]<0) {
598588
MtmOnNodeDisconnect(node+1);
599589
result= false;
@@ -613,7 +603,7 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
613603
{
614604
intrc=MtmReadSocket(sockets[node],buf,buf_size);
615605
if (rc <=0) {
616-
MTM_ELOG(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
606+
MTM_ELOG(WARNING,"Arbiter failed to read from node=%d: %s",node+1,strerror(errno));
617607
MtmDisconnect(node);
618608
}
619609
returnrc;
@@ -623,20 +613,20 @@ static void MtmAcceptOneConnection()
623613
{
624614
intfd=pg_accept(gateway,NULL,NULL,MtmUseRDMA);
625615
if (fd<0) {
626-
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %d",errno);
616+
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %s",strerror(errno));
627617
}else {
628618
MtmHandshakeMessagereq;
629619
MtmArbiterMessageresp;
630620
intrc=pg_fcntl(fd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
631621
if (rc<0) {
632-
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
622+
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
633623
}
634624
rc=MtmReadSocket(fd,&req,sizeofreq);
635625
if (rc<sizeof(req)) {
636626
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
637627
pg_closesocket(fd,MtmUseRDMA);
638628
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
639-
MTM_ELOG(WARNING,"Arbiterget unexpected handshakemessage %d",req.hdr.code);
629+
MTM_ELOG(WARNING,"Arbiterfailed to handshakesocket: %s",strerror(errno));
640630
pg_closesocket(fd,MtmUseRDMA);
641631
}else {
642632
intnode=req.hdr.node-1;
@@ -770,7 +760,7 @@ static void MtmSender(Datum arg)
770760

771761
for (i=0;i<Mtm->nAllNodes;i++) {
772762
if (txBuffer[i].used!=0) {
773-
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage),MtmReconnectTimeout);
763+
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage));
774764
txBuffer[i].used=0;
775765
}
776766
}
@@ -870,7 +860,7 @@ static void MtmReceiver(Datum arg)
870860
if (errno==EINTR) {
871861
continue;
872862
}
873-
MTM_ELOG(ERROR,"Arbiter failed to poll sockets: %d",errno);
863+
MTM_ELOG(ERROR,"Arbiter failed to poll sockets: %s",strerror(errno));
874864
}
875865
for (j=0;j<n;j++) {
876866
i=events[j].data.u32;
@@ -894,7 +884,7 @@ static void MtmReceiver(Datum arg)
894884
}while (n<0&&MtmRecovery());
895885

896886
if (n<0) {
897-
MTM_ELOG(ERROR,"Arbiter failed to select sockets: %d",errno);
887+
MTM_ELOG(ERROR,"Arbiter failed to select sockets: %s",strerror(errno));
898888
}
899889
for (i=0;i<nNodes;i++) {
900890
if (sockets[i] >=0&&FD_ISSET(sockets[i],&events))

‎contrib/mmts/multimaster--1.0.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use"CREATE EXTENSION multimaster" to load this file. \quit
33

4+
-- check that multimaster shared library is really loaded
5+
DO $$
6+
BEGIN
7+
IF strpos(current_setting('shared_preload_libraries'),'multimaster')=0 THEN
8+
RAISE EXCEPTION'Multimaster must be loaded via shared_preload_libraries. Refusing to proceed.';
9+
END IF;
10+
END
11+
$$;
12+
413
CREATEFUNCTIONmtm.start_replication() RETURNS void
514
AS'MODULE_PATHNAME','mtm_start_replication'
615
LANGUAGE C;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp