Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7834d20

Browse files
committed
Avoid slow shutdown of pg_basebackup.
pg_basebackup's child process did not pay any attention to the pipefrom its parent while waiting for input from the source server.If no server data was arriving, it would only wake up and check thepipe every standby_message_timeout or so. This creates a problemsince the parent process might determine and send the desired stopposition only after the server has reached end-of-WAL and stoppedsending data. In the src/test/recovery regression tests, the timingis repeatably such that it takes nearly 10 seconds for the childprocess to realize that it should shut down. It's not clear howoften that would happen in real-world cases, but it sure seems likea bug --- and if the user turns off standby_message_timeout or setsit very large, the delay could be a lot worse.To fix, expand the StreamCtl API to allow the pipe input FD to bepassed down to the low-level wait routine, and watch both socketswhen sleeping.(Note: AFAICS this issue doesn't affect the Windows port, sinceit doesn't rely on a pipe to transfer the stop position to thechild thread.)Discussion:https://postgr.es/m/6456.1493263884@sss.pgh.pa.us
1 parent9f11fce commit7834d20

File tree

4 files changed

+59
-28
lines changed

4 files changed

+59
-28
lines changed

‎src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,11 @@ LogStreamerMain(logstreamer_param *param)
480480
stream.timeline=param->timeline;
481481
stream.sysidentifier=param->sysidentifier;
482482
stream.stream_stop=reached_end_position;
483+
#ifndefWIN32
484+
stream.stop_socket=bgpipe[0];
485+
#else
486+
stream.stop_socket=PGINVALID_SOCKET;
487+
#endif
483488
stream.standby_message_timeout=standby_message_timeout;
484489
stream.synchronous= false;
485490
stream.do_sync=do_sync;

‎src/bin/pg_basebackup/pg_receivewal.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ StreamLog(void)
409409
stream.timeline);
410410

411411
stream.stream_stop=stop_streaming;
412+
stream.stop_socket=PGINVALID_SOCKET;
412413
stream.standby_message_timeout=standby_message_timeout;
413414
stream.synchronous=synchronous;
414415
stream.do_sync= true;

‎src/bin/pg_basebackup/receivelog.c

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ static bool still_sending = true;/* feedback still needs to be sent? */
3939

4040
staticPGresult*HandleCopyStream(PGconn*conn,StreamCtl*stream,
4141
XLogRecPtr*stoppos);
42-
staticintCopyStreamPoll(PGconn*conn,longtimeout_ms);
43-
staticintCopyStreamReceive(PGconn*conn,longtimeout,char**buffer);
42+
staticintCopyStreamPoll(PGconn*conn,longtimeout_ms,pgsocketstop_socket);
43+
staticintCopyStreamReceive(PGconn*conn,longtimeout,pgsocketstop_socket,
44+
char**buffer);
4445
staticboolProcessKeepaliveMsg(PGconn*conn,StreamCtl*stream,char*copybuf,
4546
intlen,XLogRecPtrblockpos,TimestampTz*last_status);
4647
staticboolProcessXLogDataMsg(PGconn*conn,StreamCtl*stream,char*copybuf,intlen,
@@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn)
417418
* return. As long as it returns false, streaming will continue
418419
* indefinitely.
419420
*
421+
* If stream_stop() checks for external input, stop_socket should be set to
422+
* the FD it checks. This will allow such input to be detected promptly
423+
* rather than after standby_message_timeout (which might be indefinite).
424+
* Note that signals will interrupt waits for input as well, but that is
425+
* race-y since a signal received while busy won't interrupt the wait.
426+
*
420427
* standby_message_timeout controls how often we send a message
421428
* back to the master letting it know our progress, in milliseconds.
429+
* Zero means no messages are sent.
422430
* This message will only contain the write location, and never
423431
* flush or replay.
424432
*
@@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
825833
sleeptime=CalculateCopyStreamSleeptime(now,stream->standby_message_timeout,
826834
last_status);
827835

828-
r=CopyStreamReceive(conn,sleeptime,&copybuf);
836+
r=CopyStreamReceive(conn,sleeptime,stream->stop_socket,&copybuf);
829837
while (r!=0)
830838
{
831839
if (r==-1)
@@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
870878
* Process the received data, and any subsequent data we can read
871879
* without blocking.
872880
*/
873-
r=CopyStreamReceive(conn,0,&copybuf);
881+
r=CopyStreamReceive(conn,0,stream->stop_socket,&copybuf);
874882
}
875883
}
876884

@@ -881,28 +889,39 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
881889
}
882890

883891
/*
884-
* Wait until we can read CopyData message, or timeout.
892+
* Wait until we can read a CopyData message,
893+
* or timeout, or occurrence of a signal or input on the stop_socket.
894+
* (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
885895
*
886896
* Returns 1 if data has become available for reading, 0 if timed out
887-
* or interrupted by signal, and -1 on an error.
897+
* or interrupted by signal or stop_socket input, and -1 on an error.
888898
*/
889899
staticint
890-
CopyStreamPoll(PGconn*conn,longtimeout_ms)
900+
CopyStreamPoll(PGconn*conn,longtimeout_ms,pgsocketstop_socket)
891901
{
892902
intret;
893903
fd_setinput_mask;
904+
intconnsocket;
905+
intmaxfd;
894906
structtimevaltimeout;
895907
structtimeval*timeoutptr;
896908

897-
if (PQsocket(conn)<0)
909+
connsocket=PQsocket(conn);
910+
if (connsocket<0)
898911
{
899912
fprintf(stderr,_("%s: invalid socket: %s"),progname,
900913
PQerrorMessage(conn));
901914
return-1;
902915
}
903916

904917
FD_ZERO(&input_mask);
905-
FD_SET(PQsocket(conn),&input_mask);
918+
FD_SET(connsocket,&input_mask);
919+
maxfd=connsocket;
920+
if (stop_socket!=PGINVALID_SOCKET)
921+
{
922+
FD_SET(stop_socket,&input_mask);
923+
maxfd=Max(maxfd,stop_socket);
924+
}
906925

907926
if (timeout_ms<0)
908927
timeoutptr=NULL;
@@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
913932
timeoutptr=&timeout;
914933
}
915934

916-
ret=select(PQsocket(conn)+1,&input_mask,NULL,NULL,timeoutptr);
917-
if (ret==0|| (ret<0&&errno==EINTR))
918-
return0;/* Got a timeout or signal */
919-
elseif (ret<0)
935+
ret=select(maxfd+1,&input_mask,NULL,NULL,timeoutptr);
936+
937+
if (ret<0)
920938
{
939+
if (errno==EINTR)
940+
return0;/* Got a signal, so not an error */
921941
fprintf(stderr,_("%s: select() failed: %s\n"),
922942
progname,strerror(errno));
923943
return-1;
924944
}
945+
if (ret>0&&FD_ISSET(connsocket,&input_mask))
946+
return1;/* Got input on connection socket */
925947

926-
return1;
948+
return0;/* Got timeout or input on stop_socket */
927949
}
928950

929951
/*
@@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
934956
* point to a buffer holding the received message. The buffer is only valid
935957
* until the next CopyStreamReceive call.
936958
*
937-
* 0 if no data was available within timeout, or wait was interrupted
938-
* by signal. -1 on error. -2 if the server ended the COPY.
959+
* Returns 0 if no data was available within timeout, or if wait was
960+
* interrupted by signal or stop_socket input.
961+
* -1 on error. -2 if the server ended the COPY.
939962
*/
940963
staticint
941-
CopyStreamReceive(PGconn*conn,longtimeout,char**buffer)
964+
CopyStreamReceive(PGconn*conn,longtimeout,pgsocketstop_socket,
965+
char**buffer)
942966
{
943967
char*copybuf=NULL;
944968
intrawlen;
@@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
951975
rawlen=PQgetCopyData(conn,&copybuf,1);
952976
if (rawlen==0)
953977
{
978+
intret;
979+
954980
/*
955-
* No data available. Wait for some to appear, but not longer than the
956-
* specified timeout, so that we can ping the server.
981+
* No data available. Wait for some to appear, but not longer than
982+
* the specified timeout, so that we can ping the server. Also stop
983+
* waiting if input appears on stop_socket.
957984
*/
958-
if (timeout!=0)
959-
{
960-
intret;
961-
962-
ret=CopyStreamPoll(conn,timeout);
963-
if (ret <=0)
964-
returnret;
965-
}
985+
ret=CopyStreamPoll(conn,timeout,stop_socket);
986+
if (ret <=0)
987+
returnret;
966988

967-
/*Else there is actually data on the socket */
989+
/*Now there is actually data on the socket */
968990
if (PQconsumeInput(conn)==0)
969991
{
970992
fprintf(stderr,

‎src/bin/pg_basebackup/receivelog.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ typedef struct StreamCtl
4242

4343
stream_stop_callbackstream_stop;/* Stop streaming when returns true */
4444

45+
pgsocketstop_socket;/* if valid, watch for input on this socket
46+
* and check stream_stop() when there is any */
47+
4548
WalWriteMethod*walmethod;/* How to write the WAL */
4649
char*partial_suffix;/* Suffix appended to partially received files */
4750
char*replication_slot;/* Replication slot to use, or NULL */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp