Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit597a87c

Browse files
committed
Use latch instead of select() in walreceiver
Replace use of poll()/select() by WaitLatchOrSocket(), which is moreportable and flexible.Also change walreceiver to use its procLatch instead of a custom latch.From: Petr Jelinek <petr@2ndquadrant.com>
1 parentb999c24 commit597a87c

File tree

6 files changed

+43
-89
lines changed

6 files changed

+43
-89
lines changed

‎src/backend/postmaster/pgstat.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
33383338
caseWAIT_EVENT_WAL_RECEIVER_WAIT_START:
33393339
event_name="WalReceiverWaitStart";
33403340
break;
3341+
caseWAIT_EVENT_LIBPQWALRECEIVER_READ:
3342+
event_name="LibPQWalReceiverRead";
3343+
break;
33413344
caseWAIT_EVENT_WAL_SENDER_WAIT_WAL:
33423345
event_name="WalSenderWaitForWAL";
33433346
break;

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 24 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,11 @@
2323
#include"pqexpbuffer.h"
2424
#include"access/xlog.h"
2525
#include"miscadmin.h"
26+
#include"pgstat.h"
2627
#include"replication/walreceiver.h"
28+
#include"storage/proc.h"
2729
#include"utils/builtins.h"
2830

29-
#ifdefHAVE_POLL_H
30-
#include<poll.h>
31-
#endif
32-
#ifdefHAVE_SYS_POLL_H
33-
#include<sys/poll.h>
34-
#endif
35-
#ifdefHAVE_SYS_SELECT_H
36-
#include<sys/select.h>
37-
#endif
38-
3931
PG_MODULE_MAGIC;
4032

4133
void_PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
5951
staticvoidlibpqrcv_disconnect(void);
6052

6153
/* Prototypes for private functions */
62-
staticboollibpq_select(inttimeout_ms);
6354
staticPGresult*libpqrcv_PQexec(constchar*query);
6455

6556
/*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
366357
PQclear(res);
367358
}
368359

369-
/*
370-
* Wait until we can read WAL stream, or timeout.
371-
*
372-
* Returns true if data has become available for reading, false if timed out
373-
* or interrupted by signal.
374-
*
375-
* This is based on pqSocketCheck.
376-
*/
377-
staticbool
378-
libpq_select(inttimeout_ms)
379-
{
380-
intret;
381-
382-
Assert(streamConn!=NULL);
383-
if (PQsocket(streamConn)<0)
384-
ereport(ERROR,
385-
(errcode_for_socket_access(),
386-
errmsg("invalid socket: %s",PQerrorMessage(streamConn))));
387-
388-
/* We use poll(2) if available, otherwise select(2) */
389-
{
390-
#ifdefHAVE_POLL
391-
structpollfdinput_fd;
392-
393-
input_fd.fd=PQsocket(streamConn);
394-
input_fd.events=POLLIN |POLLERR;
395-
input_fd.revents=0;
396-
397-
ret=poll(&input_fd,1,timeout_ms);
398-
#else/* !HAVE_POLL */
399-
400-
fd_setinput_mask;
401-
structtimevaltimeout;
402-
structtimeval*ptr_timeout;
403-
404-
FD_ZERO(&input_mask);
405-
FD_SET(PQsocket(streamConn),&input_mask);
406-
407-
if (timeout_ms<0)
408-
ptr_timeout=NULL;
409-
else
410-
{
411-
timeout.tv_sec=timeout_ms /1000;
412-
timeout.tv_usec= (timeout_ms %1000)*1000;
413-
ptr_timeout=&timeout;
414-
}
415-
416-
ret=select(PQsocket(streamConn)+1,&input_mask,
417-
NULL,NULL,ptr_timeout);
418-
#endif/* HAVE_POLL */
419-
}
420-
421-
if (ret==0|| (ret<0&&errno==EINTR))
422-
return false;
423-
if (ret<0)
424-
ereport(ERROR,
425-
(errcode_for_socket_access(),
426-
errmsg("select() failed: %m")));
427-
return true;
428-
}
429-
430360
/*
431361
* Send a query and wait for the results by using the asynchronous libpq
432362
* functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
470400
*/
471401
while (PQisBusy(streamConn))
472402
{
403+
intrc;
404+
473405
/*
474406
* We don't need to break down the sleep into smaller increments,
475-
* and check for interrupts after each nap, since we can just
476-
* elog(FATAL) within SIGTERM signal handler if the signal arrives
477-
* in the middle of establishment of replication connection.
407+
* since we'll get interrupted by signals and can either handle
408+
* interrupts here or elog(FATAL) within SIGTERM signal handler if
409+
* the signal arrives in the middle of establishment of
410+
* replication connection.
478411
*/
479-
if (!libpq_select(-1))
480-
continue;/* interrupted */
412+
ResetLatch(&MyProc->procLatch);
413+
rc=WaitLatchOrSocket(&MyProc->procLatch,
414+
WL_POSTMASTER_DEATH |WL_SOCKET_READABLE |
415+
WL_LATCH_SET,
416+
PQsocket(streamConn),
417+
0,
418+
WAIT_EVENT_LIBPQWALRECEIVER_READ);
419+
if (rc&WL_POSTMASTER_DEATH)
420+
exit(1);
421+
422+
/* interrupted */
423+
if (rc&WL_LATCH_SET)
424+
{
425+
CHECK_FOR_INTERRUPTS();
426+
continue;
427+
}
481428
if (PQconsumeInput(streamConn)==0)
482429
returnNULL;/* trouble */
483430
}

‎src/backend/replication/walreceiver.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ WalReceiverMain(void)
261261
/* Arrange to clean up at walreceiver exit */
262262
on_shmem_exit(WalRcvDie,0);
263263

264-
OwnLatch(&walrcv->latch);
264+
walrcv->latch=&MyProc->procLatch;
265265

266266
/* Properly accept or ignore signals the postmaster might send us */
267267
pqsignal(SIGHUP,WalRcvSigHupHandler);/* set flag to read config
@@ -483,15 +483,15 @@ WalReceiverMain(void)
483483
* avoiding some system calls.
484484
*/
485485
Assert(wait_fd!=PGINVALID_SOCKET);
486-
rc=WaitLatchOrSocket(&walrcv->latch,
486+
rc=WaitLatchOrSocket(walrcv->latch,
487487
WL_POSTMASTER_DEATH |WL_SOCKET_READABLE |
488488
WL_TIMEOUT |WL_LATCH_SET,
489489
wait_fd,
490490
NAPTIME_PER_CYCLE,
491491
WAIT_EVENT_WAL_RECEIVER_MAIN);
492492
if (rc&WL_LATCH_SET)
493493
{
494-
ResetLatch(&walrcv->latch);
494+
ResetLatch(walrcv->latch);
495495
if (walrcv->force_reply)
496496
{
497497
/*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
652652
WakeupRecovery();
653653
for (;;)
654654
{
655-
ResetLatch(&walrcv->latch);
655+
ResetLatch(walrcv->latch);
656656

657657
/*
658658
* Emergency bailout if postmaster has died. This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
687687
}
688688
SpinLockRelease(&walrcv->mutex);
689689

690-
WaitLatch(&walrcv->latch,WL_LATCH_SET |WL_POSTMASTER_DEATH,0,
690+
WaitLatch(walrcv->latch,WL_LATCH_SET |WL_POSTMASTER_DEATH,0,
691691
WAIT_EVENT_WAL_RECEIVER_WAIT_START);
692692
}
693693

@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
763763
/* Ensure that all WAL records received are flushed to disk */
764764
XLogWalRcvFlush(true);
765765

766-
DisownLatch(&walrcv->latch);
766+
walrcv->latch=NULL;
767767

768768
SpinLockAcquire(&walrcv->mutex);
769769
Assert(walrcv->walRcvState==WALRCV_STREAMING||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
812812

813813
got_SIGTERM= true;
814814

815-
SetLatch(&WalRcv->latch);
815+
if (WalRcv->latch)
816+
SetLatch(WalRcv->latch);
816817

817818
/* Don't joggle the elbow of proc_exit */
818819
if (!proc_exit_inprogress&&WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
12971298
WalRcvForceReply(void)
12981299
{
12991300
WalRcv->force_reply= true;
1300-
SetLatch(&WalRcv->latch);
1301+
if (WalRcv->latch)
1302+
SetLatch(WalRcv->latch);
13011303
}
13021304

13031305
/*

‎src/backend/replication/walreceiverfuncs.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
6464
MemSet(WalRcv,0,WalRcvShmemSize());
6565
WalRcv->walRcvState=WALRCV_STOPPED;
6666
SpinLockInit(&WalRcv->mutex);
67-
InitSharedLatch(&WalRcv->latch);
67+
WalRcv->latch=NULL;
6868
}
6969
}
7070

@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
279279

280280
if (launch)
281281
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
282-
else
283-
SetLatch(&walrcv->latch);
282+
elseif (walrcv->latch)
283+
SetLatch(walrcv->latch);
284284
}
285285

286286
/*

‎src/include/pgstat.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ typedef enum
763763
WAIT_EVENT_CLIENT_WRITE,
764764
WAIT_EVENT_SSL_OPEN_SERVER,
765765
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
766+
WAIT_EVENT_LIBPQWALRECEIVER_READ,
766767
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
767768
WAIT_EVENT_WAL_SENDER_WRITE_DATA
768769
}WaitEventClient;

‎src/include/replication/walreceiver.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ typedef struct
127127
* where to start streaming (after setting receiveStart and
128128
* receiveStartTLI), and also to tell it to send apply feedback to the
129129
* primary whenever specially marked commit records are applied.
130+
* This is normally mapped to procLatch when walreceiver is running.
130131
*/
131-
Latchlatch;
132+
Latch*latch;
132133
}WalRcvData;
133134

134135
externWalRcvData*WalRcv;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp