Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitac8f2e1

Browse files
committed
In walreceiver, don't try to do ereport() in a signal handler.
This is quite unsafe, even for the case of ereport(FATAL) where we won'treturn control to the interrupted code, and despite this code's use ofa flag to restrict the areas where we'd try to do it. It's possiblefor example that we interrupt malloc or free while that's holding a lockthat's meant to protect against cross-thread interference. Then, anyattempt to do malloc or free within ereport() will result in a deadlock,preventing the walreceiver process from exiting in response to SIGTERM.We hypothesize that this explains some hard-to-reproduce failures seenin the buildfarm.Hence, get rid of the immediate-exit code in WalRcvShutdownHandler,as well as the logic associated with WalRcvImmediateInterruptOK.Instead, we need to take care that potentially-blocking operationsin the walreceiver's data transmission logic (libpqwalreceiver.c)will respond reasonably promptly to the process's latch becomingset and then call ProcessWalRcvInterrupts. Much of the needed codefor that was already present in libpqwalreceiver.c. I refactoredthings a bit so that all the uses of PQgetResult use latch-awarewaiting, but didn't need to do much more.These changes should be enough to ensure that libpqwalreceiver.cwill respond promptly to SIGTERM whenever it's waiting to receivedata. In principle, it could block for a long time while waitingto send data too, and this patch does nothing to guard against that.I think that that hazard is mostly theoretical though: such blockingshould occur only if we fill the kernel's data transmission buffers,and we don't generally send enough data to make that happen withoutwaiting for input. If we find out that the hazard isn't justtheoretical, we could fix it by using PQsetnonblocking, but thatwould require more ticklish changes than I care to make now.Back-patch of commita1a789e. This problem goes all the way backto the origins of walreceiver; but given the substantial reworkingthe module received during the v10 cycle, it seems unsafe to assumethat our testing on HEAD validates this patch for pre-v10 branches.And we'd need to back-patch some prerequisite patches (at least597a87c and its followups, maybe other things), increasing the riskof problems. Given the dearth of field reports matching this problem,it's not worth much risk. Hence back-patch to v10 and v11 only.Patch by me; thanks to Thomas Munro for review.Discussion:https://postgr.es/m/20190416070119.GK2673@paquier.xyz
1 parent2981e5a commitac8f2e1

File tree

3 files changed

+89
-105
lines changed

3 files changed

+89
-105
lines changed

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 71 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
9595

9696
/* Prototypes for private functions */
9797
staticPGresult*libpqrcv_PQexec(PGconn*streamConn,constchar*query);
98+
staticPGresult*libpqrcv_PQgetResult(PGconn*streamConn);
9899
staticchar*stringlist_to_identifierstr(PGconn*conn,List*strings);
99100

100101
/*
@@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
196197
if (rc&WL_LATCH_SET)
197198
{
198199
ResetLatch(MyLatch);
199-
CHECK_FOR_INTERRUPTS();
200+
ProcessWalRcvInterrupts();
200201
}
201202

202203
/* If socket is ready, advance the libpq state machine */
@@ -427,6 +428,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
427428
{
428429
PGresult*res;
429430

431+
/*
432+
* Send copy-end message. As in libpqrcv_PQexec, this could theoretically
433+
* block, but the risk seems small.
434+
*/
430435
if (PQputCopyEnd(conn->streamConn,NULL) <=0||
431436
PQflush(conn->streamConn))
432437
ereport(ERROR,
@@ -443,7 +448,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
443448
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
444449
* also possible in case we aborted the copy in mid-stream.
445450
*/
446-
res=PQgetResult(conn->streamConn);
451+
res=libpqrcv_PQgetResult(conn->streamConn);
447452
if (PQresultStatus(res)==PGRES_TUPLES_OK)
448453
{
449454
/*
@@ -457,7 +462,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
457462
PQclear(res);
458463

459464
/* the result set should be followed by CommandComplete */
460-
res=PQgetResult(conn->streamConn);
465+
res=libpqrcv_PQgetResult(conn->streamConn);
461466
}
462467
elseif (PQresultStatus(res)==PGRES_COPY_OUT)
463468
{
@@ -470,7 +475,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
470475
pchomp(PQerrorMessage(conn->streamConn)))));
471476

472477
/* CommandComplete should follow */
473-
res=PQgetResult(conn->streamConn);
478+
res=libpqrcv_PQgetResult(conn->streamConn);
474479
}
475480

476481
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
@@ -480,7 +485,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
480485
PQclear(res);
481486

482487
/* Verify that there are no more results */
483-
res=PQgetResult(conn->streamConn);
488+
res=libpqrcv_PQgetResult(conn->streamConn);
484489
if (res!=NULL)
485490
ereport(ERROR,
486491
(errmsg("unexpected result after CommandComplete: %s",
@@ -543,12 +548,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
543548
* The function is modeled on PQexec() in libpq, but only implements
544549
* those parts that are in use in the walreceiver api.
545550
*
546-
*Queries are always executed on the connection in streamConn.
551+
*May return NULL, rather than an error result, on failure.
547552
*/
548553
staticPGresult*
549554
libpqrcv_PQexec(PGconn*streamConn,constchar*query)
550555
{
551-
PGresult*result=NULL;
552556
PGresult*lastResult=NULL;
553557

554558
/*
@@ -559,64 +563,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
559563
*/
560564

561565
/*
562-
* Submit a query. Since we don't use non-blocking mode, this also can
563-
* block. But its risk is relatively small, so we ignore that for now.
566+
* Submit the query. Since we don't use non-blocking mode, this could
567+
* theoretically block. In practice, since we don't send very long query
568+
* strings, the risk seems negligible.
564569
*/
565570
if (!PQsendQuery(streamConn,query))
566571
returnNULL;
567572

568573
for (;;)
569574
{
570-
/*
571-
* Receive data until PQgetResult is ready to get the result without
572-
* blocking.
573-
*/
574-
while (PQisBusy(streamConn))
575-
{
576-
intrc;
577-
578-
/*
579-
* We don't need to break down the sleep into smaller increments,
580-
* since we'll get interrupted by signals and can either handle
581-
* interrupts here or elog(FATAL) within SIGTERM signal handler if
582-
* the signal arrives in the middle of establishment of
583-
* replication connection.
584-
*/
585-
rc=WaitLatchOrSocket(MyLatch,
586-
WL_POSTMASTER_DEATH |WL_SOCKET_READABLE |
587-
WL_LATCH_SET,
588-
PQsocket(streamConn),
589-
0,
590-
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
591-
592-
/* Emergency bailout? */
593-
if (rc&WL_POSTMASTER_DEATH)
594-
exit(1);
595-
596-
/* Interrupted? */
597-
if (rc&WL_LATCH_SET)
598-
{
599-
ResetLatch(MyLatch);
600-
CHECK_FOR_INTERRUPTS();
601-
}
575+
/* Wait for, and collect, the next PGresult. */
576+
PGresult*result;
602577

603-
/* Consume whatever data is available from the socket */
604-
if (PQconsumeInput(streamConn)==0)
605-
{
606-
/* trouble; drop whatever we had and return NULL */
607-
PQclear(lastResult);
608-
returnNULL;
609-
}
610-
}
578+
result=libpqrcv_PQgetResult(streamConn);
579+
if (result==NULL)
580+
break;/* query is complete, or failure */
611581

612582
/*
613583
* Emulate PQexec()'s behavior of returning the last result when there
614584
* are many. We are fine with returning just last error message.
615585
*/
616-
result=PQgetResult(streamConn);
617-
if (result==NULL)
618-
break;/* query is complete */
619-
620586
PQclear(lastResult);
621587
lastResult=result;
622588

@@ -630,6 +596,55 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
630596
returnlastResult;
631597
}
632598

599+
/*
600+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
601+
*/
602+
staticPGresult*
603+
libpqrcv_PQgetResult(PGconn*streamConn)
604+
{
605+
/*
606+
* Collect data until PQgetResult is ready to get the result without
607+
* blocking.
608+
*/
609+
while (PQisBusy(streamConn))
610+
{
611+
intrc;
612+
613+
/*
614+
* We don't need to break down the sleep into smaller increments,
615+
* since we'll get interrupted by signals and can handle any
616+
* interrupts here.
617+
*/
618+
rc=WaitLatchOrSocket(MyLatch,
619+
WL_POSTMASTER_DEATH |WL_SOCKET_READABLE |
620+
WL_LATCH_SET,
621+
PQsocket(streamConn),
622+
0,
623+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
624+
625+
/* Emergency bailout? */
626+
if (rc&WL_POSTMASTER_DEATH)
627+
exit(1);
628+
629+
/* Interrupted? */
630+
if (rc&WL_LATCH_SET)
631+
{
632+
ResetLatch(MyLatch);
633+
ProcessWalRcvInterrupts();
634+
}
635+
636+
/* Consume whatever data is available from the socket */
637+
if (PQconsumeInput(streamConn)==0)
638+
{
639+
/* trouble; return NULL */
640+
returnNULL;
641+
}
642+
}
643+
644+
/* Now we can collect and return the next PGresult */
645+
returnPQgetResult(streamConn);
646+
}
647+
633648
/*
634649
* Disconnect connection to primary, if any.
635650
*/
@@ -691,13 +706,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
691706
{
692707
PGresult*res;
693708

694-
res=PQgetResult(conn->streamConn);
709+
res=libpqrcv_PQgetResult(conn->streamConn);
695710
if (PQresultStatus(res)==PGRES_COMMAND_OK)
696711
{
697712
PQclear(res);
698713

699714
/* Verify that there are no more results. */
700-
res=PQgetResult(conn->streamConn);
715+
res=libpqrcv_PQgetResult(conn->streamConn);
701716
if (res!=NULL)
702717
{
703718
PQclear(res);
@@ -861,7 +876,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
861876
{
862877
char*cstrs[MaxTupleAttributeNumber];
863878

864-
CHECK_FOR_INTERRUPTS();
879+
ProcessWalRcvInterrupts();
865880

866881
/* Do the allocations in temporary context. */
867882
oldcontext=MemoryContextSwitchTo(rowcontext);

‎src/backend/replication/walreceiver.c

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -111,28 +111,7 @@ static struct
111111
staticStringInfoDatareply_message;
112112
staticStringInfoDataincoming_message;
113113

114-
/*
115-
* About SIGTERM handling:
116-
*
117-
* We can't just exit(1) within SIGTERM signal handler, because the signal
118-
* might arrive in the middle of some critical operation, like while we're
119-
* holding a spinlock. We also can't just set a flag in signal handler and
120-
* check it in the main loop, because we perform some blocking operations
121-
* like libpqrcv_PQexec(), which can take a long time to finish.
122-
*
123-
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
124-
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
125-
* sets got_SIGTERM flag, which is checked in the main loop when convenient.
126-
*
127-
* This is very much like what regular backends do with ImmediateInterruptOK,
128-
* ProcessInterrupts() etc.
129-
*/
130-
staticvolatileboolWalRcvImmediateInterruptOK= false;
131-
132114
/* Prototypes for private functions */
133-
staticvoidProcessWalRcvInterrupts(void);
134-
staticvoidEnableWalRcvImmediateExit(void);
135-
staticvoidDisableWalRcvImmediateExit(void);
136115
staticvoidWalRcvFetchTimeLineHistoryFiles(TimeLineIDfirst,TimeLineIDlast);
137116
staticvoidWalRcvWaitForStartPosition(XLogRecPtr*startpoint,TimeLineID*startpointTLI);
138117
staticvoidWalRcvDie(intcode,Datumarg);
@@ -150,7 +129,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
150129
staticvoidWalRcvQuickDieHandler(SIGNAL_ARGS);
151130

152131

153-
staticvoid
132+
/*
133+
* Process any interrupts the walreceiver process may have received.
134+
* This should be called any time the process's latch has become set.
135+
*
136+
* Currently, only SIGTERM is of interest. We can't just exit(1) within the
137+
* SIGTERM signal handler, because the signal might arrive in the middle of
138+
* some critical operation, like while we're holding a spinlock. Instead, the
139+
* signal handler sets a flag variable as well as setting the process's latch.
140+
* We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
141+
* latch has become set. Operations that could block for a long time, such as
142+
* reading from a remote server, must pay attention to the latch too; see
143+
* libpqrcv_PQgetResult for example.
144+
*/
145+
void
154146
ProcessWalRcvInterrupts(void)
155147
{
156148
/*
@@ -162,26 +154,12 @@ ProcessWalRcvInterrupts(void)
162154

163155
if (got_SIGTERM)
164156
{
165-
WalRcvImmediateInterruptOK= false;
166157
ereport(FATAL,
167158
(errcode(ERRCODE_ADMIN_SHUTDOWN),
168159
errmsg("terminating walreceiver process due to administrator command")));
169160
}
170161
}
171162

172-
staticvoid
173-
EnableWalRcvImmediateExit(void)
174-
{
175-
WalRcvImmediateInterruptOK= true;
176-
ProcessWalRcvInterrupts();
177-
}
178-
179-
staticvoid
180-
DisableWalRcvImmediateExit(void)
181-
{
182-
WalRcvImmediateInterruptOK= false;
183-
ProcessWalRcvInterrupts();
184-
}
185163

186164
/* Main entry point for walreceiver process */
187165
void
@@ -299,12 +277,10 @@ WalReceiverMain(void)
299277
PG_SETMASK(&UnBlockSig);
300278

301279
/* Establish the connection to the primary for XLOG streaming */
302-
EnableWalRcvImmediateExit();
303280
wrconn=walrcv_connect(conninfo, false,"walreceiver",&err);
304281
if (!wrconn)
305282
ereport(ERROR,
306283
(errmsg("could not connect to the primary server: %s",err)));
307-
DisableWalRcvImmediateExit();
308284

309285
/*
310286
* Save user-visible connection string. This clobbers the original
@@ -333,7 +309,6 @@ WalReceiverMain(void)
333309
* Check that we're connected to a valid server using the
334310
* IDENTIFY_SYSTEM replication command.
335311
*/
336-
EnableWalRcvImmediateExit();
337312
primary_sysid=walrcv_identify_system(wrconn,&primaryTLI,
338313
&server_version);
339314

@@ -346,7 +321,6 @@ WalReceiverMain(void)
346321
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
347322
primary_sysid,standby_sysid)));
348323
}
349-
DisableWalRcvImmediateExit();
350324

351325
/*
352326
* Confirm that the current timeline of the primary is the same or
@@ -507,6 +481,8 @@ WalReceiverMain(void)
507481
if (rc&WL_LATCH_SET)
508482
{
509483
ResetLatch(walrcv->latch);
484+
ProcessWalRcvInterrupts();
485+
510486
if (walrcv->force_reply)
511487
{
512488
/*
@@ -584,9 +560,7 @@ WalReceiverMain(void)
584560
* The backend finished streaming. Exit streaming COPY-mode from
585561
* our side, too.
586562
*/
587-
EnableWalRcvImmediateExit();
588563
walrcv_endstreaming(wrconn,&primaryTLI);
589-
DisableWalRcvImmediateExit();
590564

591565
/*
592566
* If the server had switched to a new timeline that we didn't
@@ -740,9 +714,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
740714
(errmsg("fetching timeline history file for timeline %u from primary server",
741715
tli)));
742716

743-
EnableWalRcvImmediateExit();
744717
walrcv_readtimelinehistoryfile(wrconn,tli,&fname,&content,&len);
745-
DisableWalRcvImmediateExit();
746718

747719
/*
748720
* Check that the filename on the master matches what we
@@ -819,7 +791,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
819791
errno=save_errno;
820792
}
821793

822-
/* SIGTERM: set flag formain loop, or shutdown immediately if safe */
794+
/* SIGTERM: set flag forProcessWalRcvInterrupts */
823795
staticvoid
824796
WalRcvShutdownHandler(SIGNAL_ARGS)
825797
{
@@ -830,10 +802,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
830802
if (WalRcv->latch)
831803
SetLatch(WalRcv->latch);
832804

833-
/* Don't joggle the elbow of proc_exit */
834-
if (!proc_exit_inprogress&&WalRcvImmediateInterruptOK)
835-
ProcessWalRcvInterrupts();
836-
837805
errno=save_errno;
838806
}
839807

‎src/include/replication/walreceiver.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ walrcv_clear_result(WalRcvExecResult *walres)
285285

286286
/* prototypes for functions in walreceiver.c */
287287
externvoidWalReceiverMain(void)pg_attribute_noreturn();
288+
externvoidProcessWalRcvInterrupts(void);
288289

289290
/* prototypes for functions in walreceiverfuncs.c */
290291
externSizeWalRcvShmemSize(void);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp