Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit33f01b4

Browse files
knizhnikkelvich
authored andcommitted
Support RDMA for multimaster
1 parente7a5da3 commit33f01b4

File tree

4 files changed

+64
-38
lines changed

4 files changed

+64
-38
lines changed

‎arbiter.c

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
#include<time.h>
1919
#include<fcntl.h>
2020

21+
#ifdefWITH_RSOCKET
22+
#include<rdma/rsocket.h>
23+
#endif
24+
2125
#include"postgres.h"
2226
#include"fmgr.h"
2327
#include"miscadmin.h"
28+
#include"pg_socket.h"
2429
#include"postmaster/postmaster.h"
2530
#include"postmaster/bgworker.h"
2631
#include"storage/s_lock.h"
@@ -58,6 +63,7 @@
5863
#include"tcop/utility.h"
5964
#include"libpq/ip.h"
6065

66+
6167
#ifndefUSE_EPOLL
6268
#ifdef__linux__
6369
#defineUSE_EPOLL 0
@@ -185,7 +191,7 @@ static void MtmUnregisterSocket(int fd)
185191
staticvoidMtmDisconnect(intnode)
186192
{
187193
MtmUnregisterSocket(sockets[node]);
188-
close(sockets[node]);
194+
pg_closesocket(sockets[node],MtmUseRDMA);
189195
sockets[node]=-1;
190196
MtmOnNodeDisconnect(node+1);
191197
}
@@ -208,7 +214,7 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
208214
FD_SET(sd,&set);
209215
tv.tv_sec= (deadline-now)/USECS_PER_SEC;
210216
tv.tv_usec= (deadline-now)%USECS_PER_SEC;
211-
}while ((rc=select(sd+1,forWrite ?NULL :&set,forWrite ?&set :NULL,NULL,&tv))<0&&errno==EINTR);
217+
}while ((rc=pg_select([sd+1,forWrite ?NULL :&set,forWrite ?&set :NULL,NULL,&tv,MtmUseRDMA))<0&&errno==EINTR);
212218

213219
returnrc;
214220
}
@@ -219,7 +225,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
219225
while (size!=0) {
220226
intrc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
221227
if (rc==1) {
222-
while ((rc=send(sd,src,size,0))<0&&errno==EINTR);
228+
while ((rc=pg_send(sd,src,size,0,MtmUseRDMA))<0&&errno==EINTR);
223229
if (rc<0) {
224230
if (errno==EINPROGRESS) {
225231
continue;
@@ -238,11 +244,11 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
238244
staticintMtmReadSocket(intsd,void*buf,intbuf_size)
239245
{
240246
intrc;
241-
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
247+
while ((rc=pg_recv(sd,buf,buf_size,0,MtmUseRDMA))<0&&errno==EINTR);
242248
if (rc <=0&& (errno==EAGAIN||errno==EINPROGRESS)) {
243249
rc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
244250
if (rc==1) {
245-
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
251+
while ((rc=pg_recv(sd,buf,buf_size,0,MtmUseRDMA))<0&&errno==EINTR);
246252
}
247253
}
248254
returnrc;
@@ -254,25 +260,25 @@ static void MtmSetSocketOptions(int sd)
254260
{
255261
#ifdefTCP_NODELAY
256262
inton=1;
257-
if (setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&on,sizeof(on))<0) {
263+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&on,sizeof(on),MtmUseRDMA)<0) {
258264
MTM_ELOG(WARNING,"Failed to set TCP_NODELAY: %m");
259265
}
260266
#endif
261-
if (setsockopt(sd,SOL_SOCKET,SO_KEEPALIVE, (charconst*)&on,sizeof(on))<0) {
267+
if (pg_setsockopt(sd,SOL_SOCKET,SO_KEEPALIVE, (charconst*)&on,sizeof(on),MtmUseRDMA)<0) {
262268
MTM_ELOG(WARNING,"Failed to set SO_KEEPALIVE: %m");
263269
}
264270

265271
if (tcp_keepalives_idle) {
266272
#ifdefTCP_KEEPIDLE
267-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPIDLE,
268-
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle))<0)
273+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPIDLE,
274+
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle),MtmUseRDMA)<0)
269275
{
270276
MTM_ELOG(WARNING,"Failed to set TCP_KEEPIDLE: %m");
271277
}
272278
#else
273279
#ifdefTCP_KEEPALIVE
274-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPALIVE,
275-
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle))<0)
280+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPALIVE,
281+
(char*)&tcp_keepalives_idle,sizeof(tcp_keepalives_idle),MtmUseRDMA)<0)
276282
{
277283
MTM_ELOG(WARNING,"Failed to set TCP_KEEPALIVE: %m");
278284
}
@@ -281,17 +287,17 @@ static void MtmSetSocketOptions(int sd)
281287
}
282288
#ifdefTCP_KEEPINTVL
283289
if (tcp_keepalives_interval) {
284-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPINTVL,
285-
(char*)&tcp_keepalives_interval,sizeof(tcp_keepalives_interval))<0)
290+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPINTVL,
291+
(char*)&tcp_keepalives_interval,sizeof(tcp_keepalives_interval),MtmUseRDMA)<0)
286292
{
287293
MTM_ELOG(WARNING,"Failed to set TCP_KEEPINTVL: %m");
288294
}
289295
}
290296
#endif
291297
#ifdefTCP_KEEPCNT
292298
if (tcp_keepalives_count) {
293-
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPCNT,
294-
(char*)&tcp_keepalives_count,sizeof(tcp_keepalives_count))<0)
299+
if (pg_setsockopt(sd,IPPROTO_TCP,TCP_KEEPCNT,
300+
(char*)&tcp_keepalives_count,sizeof(tcp_keepalives_count),MtmUseRDMA)<0)
295301
{
296302
MTM_ELOG(WARNING,"Failed to set TCP_KEEPCNT: %m");
297303
}
@@ -375,7 +381,7 @@ static void MtmSendHeartbeat()
375381
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
376382
if (BIT_CHECK(SELF_CONNECTIVITY_MASK,i)) {
377383
MTM_LOG1("Force reconnect to node %d",i+1);
378-
close(sockets[i]);
384+
pg_closesocket(sockets[i],MtmUseRDMA);
379385
sockets[i]=-1;
380386
MtmReconnectNode(i+1);/* set reconnect mask to force node reconnent */
381387
}
@@ -436,20 +442,20 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
436442
Retry:
437443
while (1) {
438444
intrc=-1;
439-
sd=socket(AF_INET,SOCK_STREAM,0);
445+
sd=pg_socket(AF_INET,SOCK_STREAM,0,MtmUseRDMA);
440446
if (sd<0) {
441447
MTM_ELOG(LOG,"Arbiter failed to create socket: %d",errno);
442448
gotoError;
443449
}
444-
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
450+
rc=pg_fcntl(sd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
445451
if (rc<0) {
446452
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
447453
gotoError;
448454
}
449455
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
450456
{
451457
do {
452-
rc=connect(sd,addr->ai_addr,addr->ai_addrlen);
458+
rc=pg_connect(sd,addr->ai_addr,addr->ai_addrlen,MtmUseRDMA);
453459
}while (rc<0&&errno==EINTR);
454460

455461
if (rc >=0||errno==EINPROGRESS) {
@@ -479,7 +485,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
479485
}else {
480486
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: rc=%d, error=%d",host,port,rc,errno);
481487
}
482-
close(sd);
488+
pg_closesocket(sd,MtmUseRDMA);
483489
afterWait=MtmGetSystemTime();
484490
if (afterWait<beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
485491
MtmSleep(beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)-afterWait);
@@ -495,17 +501,17 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
495501
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
496502
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
497503
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
498-
close(sd);
504+
pg_closesocket(sd,MtmUseRDMA);
499505
gotoRetry;
500506
}
501507
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
502508
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: errno=%d",host,port,errno);
503-
close(sd);
509+
pg_closesocket(sd,MtmUseRDMA);
504510
gotoRetry;
505511
}
506512
if (resp.code!=MSG_STATUS||resp.dxid!=HANDSHAKE_MAGIC) {
507513
MTM_ELOG(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",resp.code,host,port);
508-
close(sd);
514+
pg_closesocket(sd,MtmUseRDMA);
509515
gotoRetry;
510516
}
511517
if (addrs)
@@ -524,7 +530,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
524530
Error:
525531
busy_mask=save_mask;
526532
if (sd >=0) {
527-
close(sd);
533+
pg_closesocket(sd,MtmUseRDMA);
528534
}
529535
if (addrs) {
530536
pg_freeaddrinfo_all(hint.ai_family,addrs);
@@ -572,7 +578,7 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
572578
*/
573579
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
574580
MTM_ELOG(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
575-
close(sockets[node]);
581+
pg_closesocket(sockets[node],MtmUseRDMA);
576582
sockets[node]=-1;
577583
}
578584
#endif
@@ -584,7 +590,7 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
584590
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
585591
if (sockets[node] >=0) {
586592
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %d",node+1,errno);
587-
close(sockets[node]);
593+
pg_closesocket(sockets[node],MtmUseRDMA);
588594
sockets[node]=-1;
589595
}
590596
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort,reconnectTimeout);
@@ -615,23 +621,23 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
615621

616622
staticvoidMtmAcceptOneConnection()
617623
{
618-
intfd=accept(gateway,NULL,NULL);
624+
intfd=pg_accept(gateway,NULL,NULL,MtmUseRDMA);
619625
if (fd<0) {
620626
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %d",errno);
621627
}else {
622628
MtmHandshakeMessagereq;
623629
MtmArbiterMessageresp;
624-
intrc=fcntl(fd,F_SETFL,O_NONBLOCK);
630+
intrc=pg_fcntl(fd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
625631
if (rc<0) {
626632
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
627633
}
628634
rc=MtmReadSocket(fd,&req,sizeofreq);
629635
if (rc<sizeof(req)) {
630636
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
631-
close(fd);
637+
pg_closesocket(fd,MtmUseRDMA);
632638
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
633639
MTM_ELOG(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
634-
close(fd);
640+
pg_closesocket(fd,MtmUseRDMA);
635641
}else {
636642
intnode=req.hdr.node-1;
637643
Assert(node >=0&&node<Mtm->nAllNodes&&node+1!=MtmNodeId);
@@ -648,7 +654,7 @@ static void MtmAcceptOneConnection()
648654
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con,req.connStr);
649655
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
650656
MTM_ELOG(WARNING,"Arbiter failed to write response for handshake message to node %d",node+1);
651-
close(fd);
657+
pg_closesocket(fd,MtmUseRDMA);
652658
}else {
653659
MTM_LOG1("Arbiter established connection with node %d",node+1);
654660
if (sockets[node] >=0) {
@@ -678,18 +684,18 @@ static void MtmAcceptIncomingConnections()
678684
sock_inet.sin_addr.s_addr=htonl(INADDR_ANY);
679685
sock_inet.sin_port=htons(MtmArbiterPort);
680686

681-
gateway=socket(sock_inet.sin_family,SOCK_STREAM,0);
687+
gateway=pg_socket(sock_inet.sin_family,SOCK_STREAM,0,MtmUseRDMA);
682688
if (gateway<0) {
683689
MTM_ELOG(ERROR,"Arbiter failed to create socket: %s",strerror(errno));
684690
}
685-
if (setsockopt(gateway,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon)<0) {
691+
if (pg_setsockopt(gateway,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon)<0) {
686692
MTM_ELOG(ERROR,"Arbiter failed to set options for socket: %s",strerror(errno));
687693
}
688694

689-
if (bind(gateway, (structsockaddr*)&sock_inet,sizeof(sock_inet))<0) {
695+
if (pg_bind(gateway, (structsockaddr*)&sock_inet,sizeof(sock_inet),MtmUseRDMA)<0) {
690696
MTM_ELOG(ERROR,"Arbiter failed to bind socket: %s",strerror(errno));
691697
}
692-
if (listen(gateway,nNodes)<0) {
698+
if (pg_listen(gateway,nNodes,MtmUseRDMA)<0) {
693699
MTM_ELOG(ERROR,"Arbiter failed to listen socket: %s",strerror(errno));
694700
}
695701

@@ -790,7 +796,7 @@ static bool MtmRecovery()
790796
fd_settryset;
791797
FD_ZERO(&tryset);
792798
FD_SET(sd,&tryset);
793-
if (select(sd+1,&tryset,NULL,NULL,&tm)<0) {
799+
if (pg_select(sd+1,&tryset,NULL,NULL,&tm,MtmUseRDMA)<0) {
794800
MTM_ELOG(WARNING,"Arbiter lost connection with node %d",i+1);
795801
MtmDisconnect(i);
796802
recovered= true;
@@ -883,7 +889,7 @@ static void MtmReceiver(Datum arg)
883889
tv.tv_sec=selectTimeout/1000;
884890
tv.tv_usec=selectTimeout%1000*1000;
885891
do {
886-
n=select(max_fd+1,&events,NULL,NULL,&tv);
892+
n=pg_select(max_fd+1,&events,NULL,NULL,&tv,MtmUseRDMA);
887893
}while (n<0&&errno==EINTR);
888894
}while (n<0&&MtmRecovery());
889895

‎multimaster.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ int MtmHeartbeatRecvTimeout;
237237
intMtmMin2PCTimeout;
238238
intMtmMax2PCRatio;
239239
boolMtmUseDtm;
240+
boolMtmUseRDMA;
240241
boolMtmPreserveCommitOrder;
241242
boolMtmVolksWagenMode;/* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
242243

@@ -3091,6 +3092,19 @@ _PG_init(void)
30913092
NULL
30923093
);
30933094

3095+
DefineCustomBoolVariable(
3096+
"multimaster.use_rdma",
3097+
"Use RDMA sockets",
3098+
NULL,
3099+
&MtmUseRDMA,
3100+
false,
3101+
PGC_POSTMASTER,
3102+
0,
3103+
NULL,
3104+
NULL,
3105+
NULL
3106+
);
3107+
30943108
DefineCustomBoolVariable(
30953109
"multimaster.preserve_commit_order",
30963110
"Transactions from one node will be committed in same order on all nodes",

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ extern int MtmNodeDisableDelay;
349349
externintMtmTransSpillThreshold;
350350
externintMtmHeartbeatSendTimeout;
351351
externintMtmHeartbeatRecvTimeout;
352+
externboolMtmUseRDMA;
352353
externboolMtmUseDtm;
353354
externboolMtmPreserveCommitOrder;
354355
externHTAB*MtmXid2State;

‎pglogical_receiver.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
*/
1515

1616
/* Some general headers for custom bgworker facility */
17+
#ifdefWITH_RSOCKET
18+
#include<rdma/rsocket.h>
19+
#endif
20+
1721
#include<unistd.h>
1822
#include"postgres.h"
1923
#include"fmgr.h"
2024
#include"miscadmin.h"
25+
#include"pg_socket.h"
2126
#include"libpq-fe.h"
2227
#include"pqexpbuffer.h"
2328
#include"access/xact.h"
@@ -635,7 +640,7 @@ pglogical_receiver_main(Datum main_arg)
635640
timeout.tv_usec=usecs;
636641
timeoutptr=&timeout;
637642

638-
r=select(PQsocket(conn)+1,&input_mask,NULL,NULL,timeoutptr);
643+
r=pg_select(PQsocket(conn)+1,&input_mask,NULL,NULL,timeoutptr,conn->isRsocket);
639644
if (r==0)
640645
{
641646
int64now=feGetCurrentTimestamp();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp