Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commiteb90a7d

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' ofhttps://gitlab.postgrespro.ru/pgpro-dev/postgrespro into PGPROEE9_6_MULTIMASTER
2 parents2baf093 +9caeb00 commiteb90a7d

File tree

3 files changed

+64
-108
lines changed

3 files changed

+64
-108
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 62 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ static void MtmSender(Datum arg);
9191
staticvoidMtmReceiver(Datumarg);
9292
staticvoidMtmMonitor(Datumarg);
9393
staticvoidMtmSendHeartbeat(void);
94-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout);
94+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
9595

9696
charconst*constMtmMessageKindMnem[]=
9797
{
@@ -160,7 +160,7 @@ static void MtmRegisterSocket(int fd, int node)
160160
ev.events=EPOLLIN;
161161
ev.data.u32=node;
162162
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
163-
MTM_ELOG(LOG,"Arbiter failed to add socket to epoll set: %d",errno);
163+
MTM_ELOG(LOG,"Arbiter failed to add socket to epoll set: %s",strerror(errno));
164164
}
165165
#else
166166
FD_SET(fd,&inset);
@@ -174,7 +174,7 @@ static void MtmUnregisterSocket(int fd)
174174
{
175175
#ifUSE_EPOLL
176176
if (epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL)<0) {
177-
MTM_ELOG(LOG,"Arbiter failed to unregister socket from epoll set: %d",errno);
177+
MTM_ELOG(LOG,"Arbiter failed to unregister socket from epoll set: %s",strerror(errno));
178178
}
179179
#else
180180
FD_CLR(fd,&inset);
@@ -365,7 +365,7 @@ static void MtmSendHeartbeat()
365365
|| !BIT_CHECK(Mtm->disabledNodeMask,i)
366366
||BIT_CHECK(Mtm->reconnectMask,i)))
367367
{
368-
if (!MtmSendToNode(i,&msg,sizeof(msg),MtmHeartbeatRecvTimeout)) {
368+
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
369369
MTM_ELOG(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
370370
}else {
371371
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
@@ -402,7 +402,7 @@ void MtmCheckHeartbeat()
402402
}
403403

404404

405-
staticintMtmConnectSocket(intnode,intport,time_ttimeout)
405+
staticintMtmConnectSocket(intnode,intport)
406406
{
407407
structaddrinfo*addrs=NULL;
408408
structaddrinfo*addr;
@@ -411,12 +411,9 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
411411
MtmHandshakeMessagereq;
412412
MtmArbiterMessageresp;
413413
intsd=-1;
414-
intret;
415-
timestamp_tstart=MtmGetSystemTime();
414+
intrc;
416415
charconst*host=Mtm->nodes[node].con.hostName;
417416
nodemask_tsave_mask=busy_mask;
418-
timestamp_tafterWait;
419-
timestamp_tbeforeWait;
420417

421418
/* Initialize hint structure */
422419
MemSet(&hint,0,sizeof(hint));
@@ -425,67 +422,60 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
425422

426423
snprintf(portstr,sizeof(portstr),"%d",port);
427424

428-
ret=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
429-
if (ret!=0)
425+
rc=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
426+
if (rc!=0)
430427
{
431-
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name:(%d) %s",host,ret,gai_strerror(ret));
428+
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name:%s",host,gai_strerror(rc));
432429
return-1;
433430
}
434431
BIT_SET(busy_mask,node);
435432

436-
Retry:
437-
while (1) {
438-
intrc=-1;
439-
sd=socket(AF_INET,SOCK_STREAM,0);
440-
if (sd<0) {
441-
MTM_ELOG(LOG,"Arbiter failed to create socket: %d",errno);
442-
gotoError;
443-
}
444-
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
445-
if (rc<0) {
446-
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
447-
gotoError;
448-
}
449-
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
450-
{
451-
do {
452-
rc=connect(sd,addr->ai_addr,addr->ai_addrlen);
453-
}while (rc<0&&errno==EINTR);
433+
Retry:
454434

455-
if (rc >=0||errno==EINPROGRESS) {
456-
break;
457-
}
458-
}
459-
if (rc==0) {
435+
sd=socket(AF_INET,SOCK_STREAM,0);
436+
if (sd<0) {
437+
MTM_ELOG(LOG,"Arbiter failed to create socket: %s",strerror(errno));
438+
gotoError;
439+
}
440+
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
441+
if (rc<0) {
442+
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
443+
gotoError;
444+
}
445+
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
446+
{
447+
do {
448+
rc=connect(sd,addr->ai_addr,addr->ai_addrlen);
449+
}while (rc<0&&errno==EINTR);
450+
451+
if (rc >=0||errno==EINPROGRESS) {
460452
break;
461453
}
462-
beforeWait=MtmGetSystemTime();
463-
if (errno!=EINPROGRESS||start+MSEC_TO_USEC(timeout)<beforeWait ) {
464-
MTM_ELOG(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
465-
gotoError;
466-
}else {
467-
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
468-
if (rc==1) {
469-
socklen_toptlen=sizeof(int);
470-
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&rc,&optlen)<0) {
471-
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: error=%d",host,port,errno);
472-
gotoError;
473-
}
474-
if (rc==0) {
475-
break;
476-
}else {
477-
MTM_ELOG(WARNING,"Arbiter trying to connect to %s:%d: rc=%d, error=%d",host,port,rc,errno);
478-
}
479-
}else {
480-
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: rc=%d, error=%d",host,port,rc,errno);
454+
}
455+
456+
if (rc!=0&&errno==EINPROGRESS) {
457+
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
458+
if (rc==1) {
459+
socklen_toptlen=sizeof(int);
460+
interrcode;
461+
462+
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&errcode,&optlen)<0) {
463+
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: %s",host,port,strerror(errcode));
464+
gotoError;
481465
}
482-
close(sd);
483-
afterWait=MtmGetSystemTime();
484-
if (afterWait<beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
485-
MtmSleep(beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)-afterWait);
466+
if (errcode!=0) {
467+
MTM_ELOG(WARNING,"Arbiter trying to connect to %s:%d: %s",host,port,strerror(errcode));
468+
gotoError;
486469
}
470+
}else {
471+
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: %s",host,port,strerror(errno));
487472
}
488473
}
474+
elseif (rc!=0) {
475+
MTM_ELOG(WARNING,"Arbiter failed to connect to %s:%d: (%d) %s",host,port,rc,strerror(errno));
476+
gotoError;
477+
}
478+
489479
MtmSetSocketOptions(sd);
490480
MtmInitMessage(&req.hdr,MSG_HANDSHAKE);
491481
req.hdr.node=MtmNodeId;
@@ -494,12 +484,12 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
494484
req.hdr.csn=MtmGetCurrentTime();
495485
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
496486
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
497-
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
487+
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %s",host,port,strerror(errno));
498488
close(sd);
499489
gotoRetry;
500490
}
501491
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
502-
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:errno=%d",host,port,errno);
492+
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:%s",host,port,strerror(errno));
503493
close(sd);
504494
gotoRetry;
505495
}
@@ -521,7 +511,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
521511

522512
returnsd;
523513

524-
Error:
514+
Error:
525515
busy_mask=save_mask;
526516
if (sd >=0) {
527517
close(sd);
@@ -545,7 +535,7 @@ static void MtmOpenConnections()
545535
}
546536
for (i=0;i<nNodes;i++) {
547537
if (i+1!=MtmNodeId&&i<Mtm->nAllNodes) {
548-
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort,MtmConnectTimeout);
538+
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort);
549539
if (sockets[i]<0) {
550540
MtmOnNodeDisconnect(i+1);
551541
}
@@ -560,7 +550,7 @@ static void MtmOpenConnections()
560550
}
561551

562552

563-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout)
553+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
564554
{
565555
boolresult= true;
566556
nodemask_tsave_mask=busy_mask;
@@ -583,11 +573,11 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
583573
}
584574
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
585575
if (sockets[node] >=0) {
586-
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %d",node+1,errno);
576+
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %s",node+1,strerror(errno));
587577
close(sockets[node]);
588578
sockets[node]=-1;
589579
}
590-
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort,reconnectTimeout);
580+
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort);
591581
if (sockets[node]<0) {
592582
MtmOnNodeDisconnect(node+1);
593583
result= false;
@@ -607,7 +597,7 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
607597
{
608598
intrc=MtmReadSocket(sockets[node],buf,buf_size);
609599
if (rc <=0) {
610-
MTM_ELOG(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
600+
MTM_ELOG(WARNING,"Arbiter failed to read from node=%d: %s",node+1,strerror(errno));
611601
MtmDisconnect(node);
612602
}
613603
returnrc;
@@ -617,17 +607,17 @@ static void MtmAcceptOneConnection()
617607
{
618608
intfd=accept(gateway,NULL,NULL);
619609
if (fd<0) {
620-
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %d",errno);
610+
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %s",strerror(errno));
621611
}else {
622612
MtmHandshakeMessagereq;
623613
MtmArbiterMessageresp;
624614
intrc=fcntl(fd,F_SETFL,O_NONBLOCK);
625615
if (rc<0) {
626-
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
616+
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
627617
}
628618
rc=MtmReadSocket(fd,&req,sizeofreq);
629619
if (rc<sizeof(req)) {
630-
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
620+
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %s",strerror(errno));
631621
close(fd);
632622
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
633623
MTM_ELOG(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
@@ -764,7 +754,7 @@ static void MtmSender(Datum arg)
764754

765755
for (i=0;i<Mtm->nAllNodes;i++) {
766756
if (txBuffer[i].used!=0) {
767-
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage),MtmReconnectTimeout);
757+
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage));
768758
txBuffer[i].used=0;
769759
}
770760
}
@@ -864,7 +854,7 @@ static void MtmReceiver(Datum arg)
864854
if (errno==EINTR) {
865855
continue;
866856
}
867-
MTM_ELOG(ERROR,"Arbiter failed to poll sockets: %d",errno);
857+
MTM_ELOG(ERROR,"Arbiter failed to poll sockets: %s",strerror(errno));
868858
}
869859
for (j=0;j<n;j++) {
870860
i=events[j].data.u32;
@@ -888,7 +878,7 @@ static void MtmReceiver(Datum arg)
888878
}while (n<0&&MtmRecovery());
889879

890880
if (n<0) {
891-
MTM_ELOG(ERROR,"Arbiter failed to select sockets: %d",errno);
881+
MTM_ELOG(ERROR,"Arbiter failed to select sockets: %s",strerror(errno));
892882
}
893883
for (i=0;i<nNodes;i++) {
894884
if (sockets[i] >=0&&FD_ISSET(sockets[i],&events))

‎contrib/mmts/multimaster.c

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ char const* const MtmNodeStatusMnem[] =
204204
{
205205
"Initialization",
206206
"Offline",
207-
"Connected",
207+
"Connecting",
208208
"Online",
209209
"Recovery",
210210
"Recovered",
@@ -229,8 +229,6 @@ int MtmNodes;
229229
intMtmNodeId;
230230
intMtmReplicationNodeId;
231231
intMtmArbiterPort;
232-
intMtmConnectTimeout;
233-
intMtmReconnectTimeout;
234232
intMtmNodeDisableDelay;
235233
intMtmTransSpillThreshold;
236234
intMtmMaxNodes;
@@ -3278,36 +3276,6 @@ _PG_init(void)
32783276
NULL
32793277
);
32803278

3281-
DefineCustomIntVariable(
3282-
"multimaster.connect_timeout",
3283-
"Multimaster nodes connect timeout",
3284-
"Interval in milliseconds for establishing connection with cluster node",
3285-
&MtmConnectTimeout,
3286-
10000,/* 10 seconds */
3287-
1,
3288-
INT_MAX,
3289-
PGC_BACKEND,
3290-
0,
3291-
NULL,
3292-
NULL,
3293-
NULL
3294-
);
3295-
3296-
DefineCustomIntVariable(
3297-
"multimaster.reconnect_timeout",
3298-
"Multimaster nodes reconnect timeout",
3299-
"Interval in milliseconds for reestablishing connection with cluster node",
3300-
&MtmReconnectTimeout,
3301-
5000,/* 5 seconds */
3302-
1,
3303-
INT_MAX,
3304-
PGC_BACKEND,
3305-
0,
3306-
NULL,
3307-
NULL,
3308-
NULL
3309-
);
3310-
33113279
if (!ConfigIsSane()) {
33123280
MTM_ELOG(ERROR,"Multimaster config is insane, refusing to work");
33133281
}
@@ -5182,7 +5150,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51825150
if (relid!=InvalidOid) {
51835151
Oidconstraint_oid;
51845152
Bitmapset*pk=get_primary_key_attnos(relid, true,&constraint_oid);
5185-
if (pk==NULL) {
5153+
if (pk==NULL&& !MtmVolksWagenMode) {
51865154
elog(WARNING,
51875155
MtmIgnoreTablesWithoutPk
51885156
?"Table %s.%s without primary will not be replicated"

‎contrib/mmts/multimaster.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,6 @@ extern int MtmNodes;
343343
externintMtmArbiterPort;
344344
externchar*MtmDatabaseName;
345345
externchar*MtmDatabaseUser;
346-
externintMtmConnectTimeout;
347-
externintMtmReconnectTimeout;
348346
externintMtmNodeDisableDelay;
349347
externintMtmTransSpillThreshold;
350348
externintMtmHeartbeatSendTimeout;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp