Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb3fa6d0

Browse files
committed
Fix race conditions in synchronous standby management.
We have repeatedly seen the buildfarm reach the Assert(false) inSyncRepGetSyncStandbysPriority. This apparently is due to failing toconsider the possibility that the sync_standby_priority values inshared memory might be inconsistent; but they will be whenever onlysome of the walsenders have updated their values after a change inthe synchronous_standby_names setting. That function is vastly toocomplex for what it does, anyway, so rewriting it seems better thantrying to apply a band-aid fix.Furthermore, the API of SyncRepGetSyncStandbys is broken by design:it returns a list of WalSnd array indexes, but there is nothingguaranteeing that the contents of the WalSnd array remain stable.Thus, if some walsender exits and then a new walsender processtakes over that WalSnd array slot, a caller might make use ofWAL position data that it should not, potentially leading toincorrect decisions about whether to release transactions thatare waiting for synchronous commit.To fix, replace SyncRepGetSyncStandbys with a new functionSyncRepGetCandidateStandbys that copies all the required datafrom shared memory while holding the relevant mutexes. If theassociated walsender process then exits, this data is still safe tomake release decisions with, since we know that that much WAL *was*sent to a valid standby server. This incidentally means that we nolonger need to treat sync_standby_priority as protected by theSyncRepLock rather than the per-walsender mutex.SyncRepGetSyncStandbys is no longer used by the core code, so removeit entirely in HEAD. However, it seems possible that external code isrelying on that function, so do not remove it from the back branches.Instead, just remove the known-incorrect Assert. When the bug occurs,the function will return a too-short list, which callers should treatas meaning there are not enough sync standbys, which seems like areasonably safe fallback until the inconsistent state is resolved.Moreover it's bug-compatible with what has been happening in non-assertbuilds. We cannot do anything about the walsender-replacement racecondition without an API/ABI break.The bogus assertion exists back to 9.6, but 9.6 is sufficientlydifferent from the later branches that the patch doesn't apply at all.I chose to just remove the bogus assertion in 9.6, feeling that theprobability of a bad outcome from the walsender-replacement racecondition is too low to justify rewriting the whole patch for 9.6.Discussion:https://postgr.es/m/21519.1585272409@sss.pgh.pa.us
1 parent7588142 commitb3fa6d0

File tree

4 files changed

+243
-66
lines changed

4 files changed

+243
-66
lines changed

‎src/backend/replication/syncrep.c

Lines changed: 191 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108108
staticvoidSyncRepGetOldestSyncRecPtr(XLogRecPtr*writePtr,
109109
XLogRecPtr*flushPtr,
110110
XLogRecPtr*applyPtr,
111-
List*sync_standbys);
111+
SyncRepStandbyData*sync_standbys,
112+
intnum_standbys);
112113
staticvoidSyncRepGetNthLatestSyncRecPtr(XLogRecPtr*writePtr,
113114
XLogRecPtr*flushPtr,
114115
XLogRecPtr*applyPtr,
115-
List*sync_standbys,uint8nth);
116+
SyncRepStandbyData*sync_standbys,
117+
intnum_standbys,
118+
uint8nth);
116119
staticintSyncRepGetStandbyPriority(void);
117120
staticList*SyncRepGetSyncStandbysPriority(bool*am_sync);
118121
staticList*SyncRepGetSyncStandbysQuorum(bool*am_sync);
122+
staticintstandby_priority_comparator(constvoid*a,constvoid*b);
119123
staticintcmp_lsn(constvoid*a,constvoid*b);
120124

121125
#ifdefUSE_ASSERT_CHECKING
@@ -398,9 +402,10 @@ SyncRepInitConfig(void)
398402
priority=SyncRepGetStandbyPriority();
399403
if (MyWalSnd->sync_standby_priority!=priority)
400404
{
401-
LWLockAcquire(SyncRepLock,LW_EXCLUSIVE);
405+
SpinLockAcquire(&MyWalSnd->mutex);
402406
MyWalSnd->sync_standby_priority=priority;
403-
LWLockRelease(SyncRepLock);
407+
SpinLockRelease(&MyWalSnd->mutex);
408+
404409
ereport(DEBUG1,
405410
(errmsg("standby \"%s\" now has synchronous standby priority %u",
406411
application_name,priority)));
@@ -451,7 +456,11 @@ SyncRepReleaseWaiters(void)
451456

452457
/*
453458
* Check whether we are a sync standby or not, and calculate the synced
454-
* positions among all sync standbys.
459+
* positions among all sync standbys. (Note: although this step does not
460+
* of itself require holding SyncRepLock, it seems like a good idea to do
461+
* it after acquiring the lock. This ensures that the WAL pointers we use
462+
* to release waiters are newer than any previous execution of this
463+
* routine used.)
455464
*/
456465
got_recptr=SyncRepGetSyncRecPtr(&writePtr,&flushPtr,&applyPtr,&am_sync);
457466

@@ -526,25 +535,41 @@ static bool
526535
SyncRepGetSyncRecPtr(XLogRecPtr*writePtr,XLogRecPtr*flushPtr,
527536
XLogRecPtr*applyPtr,bool*am_sync)
528537
{
529-
List*sync_standbys;
538+
SyncRepStandbyData*sync_standbys;
539+
intnum_standbys;
540+
inti;
530541

542+
/* Initialize default results */
531543
*writePtr=InvalidXLogRecPtr;
532544
*flushPtr=InvalidXLogRecPtr;
533545
*applyPtr=InvalidXLogRecPtr;
534546
*am_sync= false;
535547

548+
/* Quick out if not even configured to be synchronous */
549+
if (SyncRepConfig==NULL)
550+
return false;
551+
536552
/* Get standbys that are considered as synchronous at this moment */
537-
sync_standbys=SyncRepGetSyncStandbys(am_sync);
553+
num_standbys=SyncRepGetCandidateStandbys(&sync_standbys);
554+
555+
/* Am I among the candidate sync standbys? */
556+
for (i=0;i<num_standbys;i++)
557+
{
558+
if (sync_standbys[i].is_me)
559+
{
560+
*am_sync= true;
561+
break;
562+
}
563+
}
538564

539565
/*
540-
*Quick exitif we are not managing a sync standby or there are not
541-
* enough synchronous standbys.
566+
*Nothing more to doif we are not managing a sync standby or there are
567+
*notenough synchronous standbys.
542568
*/
543569
if (!(*am_sync)||
544-
SyncRepConfig==NULL||
545-
list_length(sync_standbys)<SyncRepConfig->num_sync)
570+
num_standbys<SyncRepConfig->num_sync)
546571
{
547-
list_free(sync_standbys);
572+
pfree(sync_standbys);
548573
return false;
549574
}
550575

@@ -564,43 +589,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
564589
if (SyncRepConfig->syncrep_method==SYNC_REP_PRIORITY)
565590
{
566591
SyncRepGetOldestSyncRecPtr(writePtr,flushPtr,applyPtr,
567-
sync_standbys);
592+
sync_standbys,num_standbys);
568593
}
569594
else
570595
{
571596
SyncRepGetNthLatestSyncRecPtr(writePtr,flushPtr,applyPtr,
572-
sync_standbys,SyncRepConfig->num_sync);
597+
sync_standbys,num_standbys,
598+
SyncRepConfig->num_sync);
573599
}
574600

575-
list_free(sync_standbys);
601+
pfree(sync_standbys);
576602
return true;
577603
}
578604

579605
/*
580606
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
581607
*/
582608
staticvoid
583-
SyncRepGetOldestSyncRecPtr(XLogRecPtr*writePtr,XLogRecPtr*flushPtr,
584-
XLogRecPtr*applyPtr,List*sync_standbys)
609+
SyncRepGetOldestSyncRecPtr(XLogRecPtr*writePtr,
610+
XLogRecPtr*flushPtr,
611+
XLogRecPtr*applyPtr,
612+
SyncRepStandbyData*sync_standbys,
613+
intnum_standbys)
585614
{
586-
ListCell*cell;
615+
inti;
587616

588617
/*
589618
* Scan through all sync standbys and calculate the oldest Write, Flush
590-
* and Apply positions.
619+
* and Apply positions. We assume *writePtr et al were initialized to
620+
* InvalidXLogRecPtr.
591621
*/
592-
foreach(cell,sync_standbys)
622+
for (i=0;i<num_standbys;i++)
593623
{
594-
WalSnd*walsnd=&WalSndCtl->walsnds[lfirst_int(cell)];
595-
XLogRecPtrwrite;
596-
XLogRecPtrflush;
597-
XLogRecPtrapply;
598-
599-
SpinLockAcquire(&walsnd->mutex);
600-
write=walsnd->write;
601-
flush=walsnd->flush;
602-
apply=walsnd->apply;
603-
SpinLockRelease(&walsnd->mutex);
624+
XLogRecPtrwrite=sync_standbys[i].write;
625+
XLogRecPtrflush=sync_standbys[i].flush;
626+
XLogRecPtrapply=sync_standbys[i].apply;
604627

605628
if (XLogRecPtrIsInvalid(*writePtr)||*writePtr>write)
606629
*writePtr=write;
@@ -616,38 +639,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
616639
* standbys.
617640
*/
618641
staticvoid
619-
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr*writePtr,XLogRecPtr*flushPtr,
620-
XLogRecPtr*applyPtr,List*sync_standbys,uint8nth)
642+
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr*writePtr,
643+
XLogRecPtr*flushPtr,
644+
XLogRecPtr*applyPtr,
645+
SyncRepStandbyData*sync_standbys,
646+
intnum_standbys,
647+
uint8nth)
621648
{
622-
ListCell*cell;
623649
XLogRecPtr*write_array;
624650
XLogRecPtr*flush_array;
625651
XLogRecPtr*apply_array;
626-
intlen;
627-
inti=0;
652+
inti;
628653

629-
len=list_length(sync_standbys);
630-
write_array= (XLogRecPtr*)palloc(sizeof(XLogRecPtr)*len);
631-
flush_array= (XLogRecPtr*)palloc(sizeof(XLogRecPtr)*len);
632-
apply_array= (XLogRecPtr*)palloc(sizeof(XLogRecPtr)*len);
654+
/* Should have enough candidates, or somebody messed up */
655+
Assert(nth>0&&nth <=num_standbys);
633656

634-
foreach(cell,sync_standbys)
635-
{
636-
WalSnd*walsnd=&WalSndCtl->walsnds[lfirst_int(cell)];
657+
write_array= (XLogRecPtr*)palloc(sizeof(XLogRecPtr)*num_standbys);
658+
flush_array= (XLogRecPtr*)palloc(sizeof(XLogRecPtr)*num_standbys);
659+
apply_array= (XLogRecPtr*)palloc(sizeof(XLogRecPtr)*num_standbys);
637660

638-
SpinLockAcquire(&walsnd->mutex);
639-
write_array[i]=walsnd->write;
640-
flush_array[i]=walsnd->flush;
641-
apply_array[i]=walsnd->apply;
642-
SpinLockRelease(&walsnd->mutex);
643-
644-
i++;
661+
for (i=0;i<num_standbys;i++)
662+
{
663+
write_array[i]=sync_standbys[i].write;
664+
flush_array[i]=sync_standbys[i].flush;
665+
apply_array[i]=sync_standbys[i].apply;
645666
}
646667

647668
/* Sort each array in descending order */
648-
qsort(write_array,len,sizeof(XLogRecPtr),cmp_lsn);
649-
qsort(flush_array,len,sizeof(XLogRecPtr),cmp_lsn);
650-
qsort(apply_array,len,sizeof(XLogRecPtr),cmp_lsn);
669+
qsort(write_array,num_standbys,sizeof(XLogRecPtr),cmp_lsn);
670+
qsort(flush_array,num_standbys,sizeof(XLogRecPtr),cmp_lsn);
671+
qsort(apply_array,num_standbys,sizeof(XLogRecPtr),cmp_lsn);
651672

652673
/* Get Nth latest Write, Flush, Apply positions */
653674
*writePtr=write_array[nth-1];
@@ -676,13 +697,122 @@ cmp_lsn(const void *a, const void *b)
676697
return1;
677698
}
678699

700+
/*
701+
* Return data about walsenders that are candidates to be sync standbys.
702+
*
703+
* *standbys is set to a palloc'd array of structs of per-walsender data,
704+
* and the number of valid entries (candidate sync senders) is returned.
705+
* (This might be more or fewer than num_sync; caller must check.)
706+
*/
707+
int
708+
SyncRepGetCandidateStandbys(SyncRepStandbyData**standbys)
709+
{
710+
inti;
711+
intn;
712+
713+
/* Create result array */
714+
*standbys= (SyncRepStandbyData*)
715+
palloc(max_wal_senders*sizeof(SyncRepStandbyData));
716+
717+
/* Quick exit if sync replication is not requested */
718+
if (SyncRepConfig==NULL)
719+
return0;
720+
721+
/* Collect raw data from shared memory */
722+
n=0;
723+
for (i=0;i<max_wal_senders;i++)
724+
{
725+
volatileWalSnd*walsnd;/* Use volatile pointer to prevent code
726+
* rearrangement */
727+
SyncRepStandbyData*stby;
728+
WalSndStatestate;/* not included in SyncRepStandbyData */
729+
730+
walsnd=&WalSndCtl->walsnds[i];
731+
stby=*standbys+n;
732+
733+
SpinLockAcquire(&walsnd->mutex);
734+
stby->pid=walsnd->pid;
735+
state=walsnd->state;
736+
stby->write=walsnd->write;
737+
stby->flush=walsnd->flush;
738+
stby->apply=walsnd->apply;
739+
stby->sync_standby_priority=walsnd->sync_standby_priority;
740+
SpinLockRelease(&walsnd->mutex);
741+
742+
/* Must be active */
743+
if (stby->pid==0)
744+
continue;
745+
746+
/* Must be streaming or stopping */
747+
if (state!=WALSNDSTATE_STREAMING&&
748+
state!=WALSNDSTATE_STOPPING)
749+
continue;
750+
751+
/* Must be synchronous */
752+
if (stby->sync_standby_priority==0)
753+
continue;
754+
755+
/* Must have a valid flush position */
756+
if (XLogRecPtrIsInvalid(stby->flush))
757+
continue;
758+
759+
/* OK, it's a candidate */
760+
stby->walsnd_index=i;
761+
stby->is_me= (walsnd==MyWalSnd);
762+
n++;
763+
}
764+
765+
/*
766+
* In quorum mode, we return all the candidates. In priority mode, if we
767+
* have too many candidates then return only the num_sync ones of highest
768+
* priority.
769+
*/
770+
if (SyncRepConfig->syncrep_method==SYNC_REP_PRIORITY&&
771+
n>SyncRepConfig->num_sync)
772+
{
773+
/* Sort by priority ... */
774+
qsort(*standbys,n,sizeof(SyncRepStandbyData),
775+
standby_priority_comparator);
776+
/* ... then report just the first num_sync ones */
777+
n=SyncRepConfig->num_sync;
778+
}
779+
780+
returnn;
781+
}
782+
783+
/*
784+
* qsort comparator to sort SyncRepStandbyData entries by priority
785+
*/
786+
staticint
787+
standby_priority_comparator(constvoid*a,constvoid*b)
788+
{
789+
constSyncRepStandbyData*sa= (constSyncRepStandbyData*)a;
790+
constSyncRepStandbyData*sb= (constSyncRepStandbyData*)b;
791+
792+
/* First, sort by increasing priority value */
793+
if (sa->sync_standby_priority!=sb->sync_standby_priority)
794+
returnsa->sync_standby_priority-sb->sync_standby_priority;
795+
796+
/*
797+
* We might have equal priority values; arbitrarily break ties by position
798+
* in the WALSnd array. (This is utterly bogus, since that is arrival
799+
* order dependent, but there are regression tests that rely on it.)
800+
*/
801+
returnsa->walsnd_index-sb->walsnd_index;
802+
}
803+
804+
679805
/*
680806
* Return the list of sync standbys, or NIL if no sync standby is connected.
681807
*
682808
* The caller must hold SyncRepLock.
683809
*
684810
* On return, *am_sync is set to true if this walsender is connecting to
685811
* sync standby. Otherwise it's set to false.
812+
*
813+
* XXX This function is BROKEN and should not be used in new code. It has
814+
* an inherent race condition, since the returned list of integer indexes
815+
* might no longer correspond to reality.
686816
*/
687817
List*
688818
SyncRepGetSyncStandbys(bool*am_sync)
@@ -942,8 +1072,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
9421072
priority=next_highest_priority;
9431073
}
9441074

945-
/* never reached, but keep compiler quiet */
946-
Assert(false);
1075+
/*
1076+
* We might get here if the set of sync_standby_priority values in shared
1077+
* memory is inconsistent, as can happen transiently after a change in the
1078+
* synchronous_standby_names setting. In that case, just return the
1079+
* incomplete list we have so far. That will cause the caller to decide
1080+
* there aren't enough synchronous candidates, which should be a safe
1081+
* choice until the priority values become consistent again.
1082+
*/
1083+
list_free(pending);
9471084
returnresult;
9481085
}
9491086

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp