Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9fcf670

Browse files
committed
Fix signal handling in logical replication workers
The logical replication worker processes now use the normal die()handler for SIGTERM and CHECK_FOR_INTERRUPTS() instead of custom code.One problem before was that the apply worker would not exit promptlywhen a subscription was dropped, which could lead to deadlocks.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parentacbd837 commit9fcf670

File tree

6 files changed

+50
-21
lines changed

6 files changed

+50
-21
lines changed

‎src/backend/replication/logical/launcher.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
8080
staticvoidlogicalrep_worker_cleanup(LogicalRepWorker*worker);
8181

8282
/* Flags set by signal handlers */
83-
volatilesig_atomic_tgot_SIGHUP= false;
84-
volatilesig_atomic_tgot_SIGTERM= false;
83+
staticvolatilesig_atomic_tgot_SIGHUP= false;
84+
staticvolatilesig_atomic_tgot_SIGTERM= false;
8585

8686
staticboolon_commit_launcher_wakeup= false;
8787

@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg)
624624
}
625625

626626
/* SIGTERM: set flag to exit at next convenient time */
627-
void
628-
logicalrep_worker_sigterm(SIGNAL_ARGS)
627+
staticvoid
628+
logicalrep_launcher_sigterm(SIGNAL_ARGS)
629629
{
630630
intsave_errno=errno;
631631

@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
638638
}
639639

640640
/* SIGHUP: set flag to reload configuration at next convenient time */
641-
void
642-
logicalrep_worker_sighup(SIGNAL_ARGS)
641+
staticvoid
642+
logicalrep_launcher_sighup(SIGNAL_ARGS)
643643
{
644644
intsave_errno=errno;
645645

@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
799799
before_shmem_exit(logicalrep_launcher_onexit, (Datum)0);
800800

801801
/* Establish signal handlers. */
802-
pqsignal(SIGHUP,logicalrep_worker_sighup);
803-
pqsignal(SIGTERM,logicalrep_worker_sigterm);
802+
pqsignal(SIGHUP,logicalrep_launcher_sighup);
803+
pqsignal(SIGTERM,logicalrep_launcher_sigterm);
804804
BackgroundWorkerUnblockSignals();
805805

806806
/* Make it easy to identify our processes. */

‎src/backend/replication/logical/tablesync.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
154154
intrc;
155155
charstate=origstate;
156156

157-
while (!got_SIGTERM)
157+
for (;;)
158158
{
159159
LogicalRepWorker*worker;
160160

161+
CHECK_FOR_INTERRUPTS();
162+
161163
LWLockAcquire(LogicalRepWorkerLock,LW_SHARED);
162164
worker=logicalrep_worker_find(MyLogicalRepWorker->subid,
163165
relid, false);
@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
525527
bytesread+=avail;
526528
}
527529

528-
while (!got_SIGTERM&&maxread>0&&bytesread<minread)
530+
while (maxread>0&&bytesread<minread)
529531
{
530532
pgsocketfd=PGINVALID_SOCKET;
531533
intrc;
@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
579581
ResetLatch(&MyProc->procLatch);
580582
}
581583

582-
/* Check for exit condition. */
583-
if (got_SIGTERM)
584-
proc_exit(0);
585-
586584
returnbytesread;
587585
}
588586

‎src/backend/replication/logical/worker.c

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@
7272
#include"storage/proc.h"
7373
#include"storage/procarray.h"
7474

75+
#include"tcop/tcopprot.h"
76+
7577
#include"utils/builtins.h"
7678
#include"utils/catcache.h"
7779
#include"utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
118120

119121
staticvoidreread_subscription(void);
120122

123+
/* Flags set by signal handlers */
124+
staticvolatilesig_atomic_tgot_SIGHUP= false;
125+
121126
/*
122127
* Should this worker apply changes for given relation.
123128
*
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10051010
/* mark as idle, before starting to loop */
10061011
pgstat_report_activity(STATE_IDLE,NULL);
10071012

1008-
while (!got_SIGTERM)
1013+
for (;;)
10091014
{
10101015
pgsocketfd=PGINVALID_SOCKET;
10111016
intrc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10151020
TimestampTzlast_recv_timestamp=GetCurrentTimestamp();
10161021
boolping_sent= false;
10171022

1023+
CHECK_FOR_INTERRUPTS();
1024+
10181025
MemoryContextSwitchTo(ApplyMessageContext);
10191026

10201027
len=walrcv_receive(wrconn,&buf,&fd);
@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
14371444
MySubscriptionValid= false;
14381445
}
14391446

1447+
/* SIGHUP: set flag to reload configuration at next convenient time */
1448+
staticvoid
1449+
logicalrep_worker_sighup(SIGNAL_ARGS)
1450+
{
1451+
intsave_errno=errno;
1452+
1453+
got_SIGHUP= true;
1454+
1455+
/* Waken anything waiting on the process latch */
1456+
SetLatch(MyLatch);
1457+
1458+
errno=save_errno;
1459+
}
14401460

14411461
/* Logical Replication Apply worker entry point */
14421462
void
@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
14541474

14551475
/* Setup signal handling */
14561476
pqsignal(SIGHUP,logicalrep_worker_sighup);
1457-
pqsignal(SIGTERM,logicalrep_worker_sigterm);
1477+
pqsignal(SIGTERM,die);
14581478
BackgroundWorkerUnblockSignals();
14591479

14601480
/* Initialise stats to a sanish value */
@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
16041624
/* Run the main loop. */
16051625
LogicalRepApplyLoop(origin_startpos);
16061626

1607-
/* We should only get here if we received SIGTERM */
16081627
proc_exit(0);
16091628
}
1629+
1630+
/*
1631+
* Is current process a logical replication worker?
1632+
*/
1633+
bool
1634+
IsLogicalWorker(void)
1635+
{
1636+
returnMyLogicalRepWorker!=NULL;
1637+
}

‎src/backend/tcop/postgres.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include"pg_getopt.h"
5656
#include"postmaster/autovacuum.h"
5757
#include"postmaster/postmaster.h"
58+
#include"replication/logicalworker.h"
5859
#include"replication/slot.h"
5960
#include"replication/walsender.h"
6061
#include"rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
28452846
ereport(FATAL,
28462847
(errcode(ERRCODE_ADMIN_SHUTDOWN),
28472848
errmsg("terminating autovacuum process due to administrator command")));
2849+
elseif (IsLogicalWorker())
2850+
ereport(FATAL,
2851+
(errcode(ERRCODE_ADMIN_SHUTDOWN),
2852+
errmsg("terminating logical replication worker due to administrator command")));
28482853
elseif (RecoveryConflictPending&&RecoveryConflictRetryable)
28492854
{
28502855
pgstat_report_recovery_conflict(RecoveryConflictReason);

‎src/include/replication/logicalworker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@
1414

1515
externvoidApplyWorkerMain(Datummain_arg);
1616

17+
externboolIsLogicalWorker(void);
18+
1719
#endif/* LOGICALWORKER_H */

‎src/include/replication/worker_internal.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
6767
externLogicalRepWorker*MyLogicalRepWorker;
6868

6969
externboolin_remote_transaction;
70-
externvolatilesig_atomic_tgot_SIGHUP;
71-
externvolatilesig_atomic_tgot_SIGTERM;
7270

7371
externvoidlogicalrep_worker_attach(intslot);
7472
externLogicalRepWorker*logicalrep_worker_find(Oidsubid,Oidrelid,
@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
8179

8280
externintlogicalrep_sync_worker_count(Oidsubid);
8381

84-
externvoidlogicalrep_worker_sighup(SIGNAL_ARGS);
85-
externvoidlogicalrep_worker_sigterm(SIGNAL_ARGS);
8682
externchar*LogicalRepSyncTableStart(XLogRecPtr*origin_startpos);
8783
voidprocess_syncing_tables(XLogRecPtrcurrent_lsn);
8884
voidinvalidate_syncing_table_states(Datumarg,intcacheid,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp