Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit572d6ee

Browse files
committed
Fix locking in WAL receiver/sender shmem state structs
In WAL receiver and WAL server, some accesses to their correspondingshared memory control structs were done without holding any kind oflock, which could lead to inconsistent and possibly insecure results.In walsender, fix by clarifying the locking rules and following themcorrectly, as documented in the new comment in walsender_private.h;namely that some members can be read in walsender itself without a lock,because the only writes occur in the same process. The rest of thestruct requires spinlock for accesses, as usual.In walreceiver, fix by always holding spinlock while accessing thestruct.While there is potentially a problem in all branches, it is minor instable ones. This only became a real problem in pg10 because of quorumcommit in synchronous replication (commit3901fd7), and a potentialsecurity problem in walreceiver because a superuser() check was removedby default monitoring roles (commit25fff40). Thus, no backpatch.In passing, clean up some leftover braces which were used to createunconditional blocks. Once upon a time these were used forvolatile-izing accesses to those shmem structs, which is no longerrequired. Many other occurrences of this pattern remain.Author: Michaël PaquierReported-by: Michaël PaquierReviewed-by: Masahiko Sawada, Kyotaro Horiguchi, Thomas Munro,Robert HaasDiscussion:https://postgr.es/m/CAB7nPqTWYqtzD=LN_oDaf9r-hAjUEPAy0B9yRkhcsLdRN8fzrw@mail.gmail.com
1 parent898d24a commit572d6ee

File tree

5 files changed

+80
-55
lines changed

5 files changed

+80
-55
lines changed

‎src/backend/replication/syncrep.c

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -711,22 +711,32 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
711711

712712
for (i=0;i<max_wal_senders;i++)
713713
{
714+
XLogRecPtrflush;
715+
WalSndStatestate;
716+
intpid;
717+
714718
walsnd=&WalSndCtl->walsnds[i];
715719

720+
SpinLockAcquire(&walsnd->mutex);
721+
pid=walsnd->pid;
722+
flush=walsnd->flush;
723+
state=walsnd->state;
724+
SpinLockRelease(&walsnd->mutex);
725+
716726
/* Must be active */
717-
if (walsnd->pid==0)
727+
if (pid==0)
718728
continue;
719729

720730
/* Must be streaming */
721-
if (walsnd->state!=WALSNDSTATE_STREAMING)
731+
if (state!=WALSNDSTATE_STREAMING)
722732
continue;
723733

724734
/* Must be synchronous */
725735
if (walsnd->sync_standby_priority==0)
726736
continue;
727737

728738
/* Must have a valid flush position */
729-
if (XLogRecPtrIsInvalid(walsnd->flush))
739+
if (XLogRecPtrIsInvalid(flush))
730740
continue;
731741

732742
/*
@@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
780790
*/
781791
for (i=0;i<max_wal_senders;i++)
782792
{
793+
XLogRecPtrflush;
794+
WalSndStatestate;
795+
intpid;
796+
783797
walsnd=&WalSndCtl->walsnds[i];
784798

799+
SpinLockAcquire(&walsnd->mutex);
800+
pid=walsnd->pid;
801+
flush=walsnd->flush;
802+
state=walsnd->state;
803+
SpinLockRelease(&walsnd->mutex);
804+
785805
/* Must be active */
786-
if (walsnd->pid==0)
806+
if (pid==0)
787807
continue;
788808

789809
/* Must be streaming */
790-
if (walsnd->state!=WALSNDSTATE_STREAMING)
810+
if (state!=WALSNDSTATE_STREAMING)
791811
continue;
792812

793813
/* Must be synchronous */
@@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
796816
continue;
797817

798818
/* Must have a valid flush position */
799-
if (XLogRecPtrIsInvalid(walsnd->flush))
819+
if (XLogRecPtrIsInvalid(flush))
800820
continue;
801821

802822
/*

‎src/backend/replication/walreceiver.c

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,7 +1379,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
13791379
TupleDesctupdesc;
13801380
Datum*values;
13811381
bool*nulls;
1382-
WalRcvData*walrcv=WalRcv;
1382+
intpid;
1383+
boolready_to_display;
13831384
WalRcvStatestate;
13841385
XLogRecPtrreceive_start_lsn;
13851386
TimeLineIDreceive_start_tli;
@@ -1392,11 +1393,28 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
13921393
char*slotname;
13931394
char*conninfo;
13941395

1396+
/* Take a lock to ensure value consistency */
1397+
SpinLockAcquire(&WalRcv->mutex);
1398+
pid= (int)WalRcv->pid;
1399+
ready_to_display=WalRcv->ready_to_display;
1400+
state=WalRcv->walRcvState;
1401+
receive_start_lsn=WalRcv->receiveStart;
1402+
receive_start_tli=WalRcv->receiveStartTLI;
1403+
received_lsn=WalRcv->receivedUpto;
1404+
received_tli=WalRcv->receivedTLI;
1405+
last_send_time=WalRcv->lastMsgSendTime;
1406+
last_receipt_time=WalRcv->lastMsgReceiptTime;
1407+
latest_end_lsn=WalRcv->latestWalEnd;
1408+
latest_end_time=WalRcv->latestWalEndTime;
1409+
slotname=pstrdup(WalRcv->slotname);
1410+
conninfo=pstrdup(WalRcv->conninfo);
1411+
SpinLockRelease(&WalRcv->mutex);
1412+
13951413
/*
13961414
* No WAL receiver (or not ready yet), just return a tuple with NULL
13971415
* values
13981416
*/
1399-
if (walrcv->pid==0|| !walrcv->ready_to_display)
1417+
if (pid==0|| !ready_to_display)
14001418
PG_RETURN_NULL();
14011419

14021420
/* determine result type */
@@ -1406,23 +1424,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
14061424
values=palloc0(sizeof(Datum)*tupdesc->natts);
14071425
nulls=palloc0(sizeof(bool)*tupdesc->natts);
14081426

1409-
/* Take a lock to ensure value consistency */
1410-
SpinLockAcquire(&walrcv->mutex);
1411-
state=walrcv->walRcvState;
1412-
receive_start_lsn=walrcv->receiveStart;
1413-
receive_start_tli=walrcv->receiveStartTLI;
1414-
received_lsn=walrcv->receivedUpto;
1415-
received_tli=walrcv->receivedTLI;
1416-
last_send_time=walrcv->lastMsgSendTime;
1417-
last_receipt_time=walrcv->lastMsgReceiptTime;
1418-
latest_end_lsn=walrcv->latestWalEnd;
1419-
latest_end_time=walrcv->latestWalEndTime;
1420-
slotname=pstrdup(walrcv->slotname);
1421-
conninfo=pstrdup(walrcv->conninfo);
1422-
SpinLockRelease(&walrcv->mutex);
1423-
14241427
/* Fetch values */
1425-
values[0]=Int32GetDatum(walrcv->pid);
1428+
values[0]=Int32GetDatum(pid);
14261429

14271430
if (!is_member_of_role(GetUserId(),DEFAULT_ROLE_READ_ALL_STATS))
14281431
{
@@ -1473,6 +1476,5 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
14731476
}
14741477

14751478
/* Returns the record as Datum */
1476-
PG_RETURN_DATUM(HeapTupleGetDatum(
1477-
heap_form_tuple(tupdesc,values,nulls)));
1479+
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc,values,nulls)));
14781480
}

‎src/backend/replication/walsender.c

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -668,13 +668,9 @@ StartReplication(StartReplicationCmd *cmd)
668668
sentPtr=cmd->startpoint;
669669

670670
/* Initialize shared memory status, too */
671-
{
672-
WalSnd*walsnd=MyWalSnd;
673-
674-
SpinLockAcquire(&walsnd->mutex);
675-
walsnd->sentPtr=sentPtr;
676-
SpinLockRelease(&walsnd->mutex);
677-
}
671+
SpinLockAcquire(&MyWalSnd->mutex);
672+
MyWalSnd->sentPtr=sentPtr;
673+
SpinLockRelease(&MyWalSnd->mutex);
678674

679675
SyncRepInitConfig();
680676

@@ -1093,13 +1089,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10931089
sentPtr=MyReplicationSlot->data.confirmed_flush;
10941090

10951091
/* Also update the sent position status in shared memory */
1096-
{
1097-
WalSnd*walsnd=MyWalSnd;
1098-
1099-
SpinLockAcquire(&walsnd->mutex);
1100-
walsnd->sentPtr=MyReplicationSlot->data.restart_lsn;
1101-
SpinLockRelease(&walsnd->mutex);
1102-
}
1092+
SpinLockAcquire(&MyWalSnd->mutex);
1093+
MyWalSnd->sentPtr=MyReplicationSlot->data.restart_lsn;
1094+
SpinLockRelease(&MyWalSnd->mutex);
11031095

11041096
replication_active= true;
11051097

@@ -2892,10 +2884,12 @@ WalSndRqstFileReload(void)
28922884
{
28932885
WalSnd*walsnd=&WalSndCtl->walsnds[i];
28942886

2887+
SpinLockAcquire(&walsnd->mutex);
28952888
if (walsnd->pid==0)
2889+
{
2890+
SpinLockRelease(&walsnd->mutex);
28962891
continue;
2897-
2898-
SpinLockAcquire(&walsnd->mutex);
2892+
}
28992893
walsnd->needreload= true;
29002894
SpinLockRelease(&walsnd->mutex);
29012895
}
@@ -3071,7 +3065,6 @@ WalSndWaitStopping(void)
30713065

30723066
for (i=0;i<max_wal_senders;i++)
30733067
{
3074-
WalSndStatestate;
30753068
WalSnd*walsnd=&WalSndCtl->walsnds[i];
30763069

30773070
SpinLockAcquire(&walsnd->mutex);
@@ -3082,14 +3075,13 @@ WalSndWaitStopping(void)
30823075
continue;
30833076
}
30843077

3085-
state=walsnd->state;
3086-
SpinLockRelease(&walsnd->mutex);
3087-
3088-
if (state!=WALSNDSTATE_STOPPING)
3078+
if (walsnd->state!=WALSNDSTATE_STOPPING)
30893079
{
30903080
all_stopped= false;
3081+
SpinLockRelease(&walsnd->mutex);
30913082
break;
30923083
}
3084+
SpinLockRelease(&walsnd->mutex);
30933085
}
30943086

30953087
/* safe to leave if confirmation is done for all WAL senders */
@@ -3210,14 +3202,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
32103202
TimeOffsetflushLag;
32113203
TimeOffsetapplyLag;
32123204
intpriority;
3205+
intpid;
32133206
WalSndStatestate;
32143207
Datumvalues[PG_STAT_GET_WAL_SENDERS_COLS];
32153208
boolnulls[PG_STAT_GET_WAL_SENDERS_COLS];
32163209

3210+
SpinLockAcquire(&walsnd->mutex);
32173211
if (walsnd->pid==0)
3212+
{
3213+
SpinLockRelease(&walsnd->mutex);
32183214
continue;
3219-
3220-
SpinLockAcquire(&walsnd->mutex);
3215+
}
3216+
pid=walsnd->pid;
32213217
sentPtr=walsnd->sentPtr;
32223218
state=walsnd->state;
32233219
write=walsnd->write;
@@ -3230,7 +3226,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
32303226
SpinLockRelease(&walsnd->mutex);
32313227

32323228
memset(nulls,0,sizeof(nulls));
3233-
values[0]=Int32GetDatum(walsnd->pid);
3229+
values[0]=Int32GetDatum(pid);
32343230

32353231
if (!superuser())
32363232
{
@@ -3265,7 +3261,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
32653261
* which always returns an invalid flush location, as an
32663262
* asynchronous standby.
32673263
*/
3268-
priority=XLogRecPtrIsInvalid(walsnd->flush) ?0 :priority;
3264+
priority=XLogRecPtrIsInvalid(flush) ?0 :priority;
32693265

32703266
if (writeLag<0)
32713267
nulls[6]= true;

‎src/include/replication/walreceiver.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ typedef struct
114114
*/
115115
charslotname[NAMEDATALEN];
116116

117+
/* set true once conninfo is ready to display (obfuscated pwds etc) */
118+
boolready_to_display;
119+
117120
slock_tmutex;/* locks shared variables shown above */
118121

119122
/*
@@ -122,9 +125,6 @@ typedef struct
122125
*/
123126
boolforce_reply;
124127

125-
/* set true once conninfo is ready to display (obfuscated pwds etc) */
126-
boolready_to_display;
127-
128128
/*
129129
* Latch used by startup process to wake up walreceiver after telling it
130130
* where to start streaming (after setting receiveStart and

‎src/include/replication/walsender_private.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,17 @@ typedef enum WalSndState
3030

3131
/*
3232
* Each walsender has a WalSnd struct in shared memory.
33+
*
34+
* This struct is protected by 'mutex', with two exceptions: one is
35+
* sync_standby_priority as noted below. The other exception is that some
36+
* members are only written by the walsender process itself, and thus that
37+
* process is free to read those members without holding spinlock. pid and
38+
* needreload always require the spinlock to be held for all accesses.
3339
*/
3440
typedefstructWalSnd
3541
{
36-
pid_tpid;/* this walsender's process id, or 0 */
42+
pid_tpid;/* this walsender's PID, or 0 if not active */
43+
3744
WalSndStatestate;/* this walsender's state */
3845
XLogRecPtrsentPtr;/* WAL has been sent up to this point */
3946
boolneedreload;/* does currently-open file need to be

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp