@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108108static void SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
109109XLogRecPtr * flushPtr ,
110110XLogRecPtr * applyPtr ,
111- List * sync_standbys );
111+ SyncRepStandbyData * sync_standbys ,
112+ int num_standbys );
112113static void SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
113114XLogRecPtr * flushPtr ,
114115XLogRecPtr * applyPtr ,
115- List * sync_standbys ,uint8 nth );
116+ SyncRepStandbyData * sync_standbys ,
117+ int num_standbys ,
118+ uint8 nth );
116119static int SyncRepGetStandbyPriority (void );
117120static List * SyncRepGetSyncStandbysPriority (bool * am_sync );
118121static List * SyncRepGetSyncStandbysQuorum (bool * am_sync );
122+ static int standby_priority_comparator (const void * a ,const void * b );
119123static int cmp_lsn (const void * a ,const void * b );
120124
121125#ifdef USE_ASSERT_CHECKING
@@ -399,9 +403,10 @@ SyncRepInitConfig(void)
399403priority = SyncRepGetStandbyPriority ();
400404if (MyWalSnd -> sync_standby_priority != priority )
401405{
402- LWLockAcquire ( SyncRepLock , LW_EXCLUSIVE );
406+ SpinLockAcquire ( & MyWalSnd -> mutex );
403407MyWalSnd -> sync_standby_priority = priority ;
404- LWLockRelease (SyncRepLock );
408+ SpinLockRelease (& MyWalSnd -> mutex );
409+
405410ereport (DEBUG1 ,
406411(errmsg ("standby \"%s\" now has synchronous standby priority %u" ,
407412application_name ,priority )));
@@ -452,7 +457,11 @@ SyncRepReleaseWaiters(void)
452457
453458/*
454459 * Check whether we are a sync standby or not, and calculate the synced
455- * positions among all sync standbys.
460+ * positions among all sync standbys. (Note: although this step does not
461+ * of itself require holding SyncRepLock, it seems like a good idea to do
462+ * it after acquiring the lock. This ensures that the WAL pointers we use
463+ * to release waiters are newer than any previous execution of this
464+ * routine used.)
456465 */
457466got_recptr = SyncRepGetSyncRecPtr (& writePtr ,& flushPtr ,& applyPtr ,& am_sync );
458467
@@ -527,25 +536,41 @@ static bool
527536SyncRepGetSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
528537XLogRecPtr * applyPtr ,bool * am_sync )
529538{
530- List * sync_standbys ;
539+ SyncRepStandbyData * sync_standbys ;
540+ int num_standbys ;
541+ int i ;
531542
543+ /* Initialize default results */
532544* writePtr = InvalidXLogRecPtr ;
533545* flushPtr = InvalidXLogRecPtr ;
534546* applyPtr = InvalidXLogRecPtr ;
535547* am_sync = false;
536548
549+ /* Quick out if not even configured to be synchronous */
550+ if (SyncRepConfig == NULL )
551+ return false;
552+
537553/* Get standbys that are considered as synchronous at this moment */
538- sync_standbys = SyncRepGetSyncStandbys (am_sync );
554+ num_standbys = SyncRepGetCandidateStandbys (& sync_standbys );
555+
556+ /* Am I among the candidate sync standbys? */
557+ for (i = 0 ;i < num_standbys ;i ++ )
558+ {
559+ if (sync_standbys [i ].is_me )
560+ {
561+ * am_sync = true;
562+ break ;
563+ }
564+ }
539565
540566/*
541- *Quick exit if we are not managing a sync standby or there are not
542- * enough synchronous standbys.
567+ *Nothing more to do if we are not managing a sync standby or there are
568+ *not enough synchronous standbys.
543569 */
544570if (!(* am_sync )||
545- SyncRepConfig == NULL ||
546- list_length (sync_standbys )< SyncRepConfig -> num_sync )
571+ num_standbys < SyncRepConfig -> num_sync )
547572{
548- list_free (sync_standbys );
573+ pfree (sync_standbys );
549574return false;
550575}
551576
@@ -565,43 +590,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
565590if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY )
566591{
567592SyncRepGetOldestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
568- sync_standbys );
593+ sync_standbys , num_standbys );
569594}
570595else
571596{
572597SyncRepGetNthLatestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
573- sync_standbys ,SyncRepConfig -> num_sync );
598+ sync_standbys ,num_standbys ,
599+ SyncRepConfig -> num_sync );
574600}
575601
576- list_free (sync_standbys );
602+ pfree (sync_standbys );
577603return true;
578604}
579605
580606/*
581607 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
582608 */
583609static void
584- SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
585- XLogRecPtr * applyPtr ,List * sync_standbys )
610+ SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
611+ XLogRecPtr * flushPtr ,
612+ XLogRecPtr * applyPtr ,
613+ SyncRepStandbyData * sync_standbys ,
614+ int num_standbys )
586615{
587- ListCell * cell ;
616+ int i ;
588617
589618/*
590619 * Scan through all sync standbys and calculate the oldest Write, Flush
591- * and Apply positions.
620+ * and Apply positions. We assume *writePtr et al were initialized to
621+ * InvalidXLogRecPtr.
592622 */
593- foreach ( cell , sync_standbys )
623+ for ( i = 0 ; i < num_standbys ; i ++ )
594624{
595- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
596- XLogRecPtr write ;
597- XLogRecPtr flush ;
598- XLogRecPtr apply ;
599-
600- SpinLockAcquire (& walsnd -> mutex );
601- write = walsnd -> write ;
602- flush = walsnd -> flush ;
603- apply = walsnd -> apply ;
604- SpinLockRelease (& walsnd -> mutex );
625+ XLogRecPtr write = sync_standbys [i ].write ;
626+ XLogRecPtr flush = sync_standbys [i ].flush ;
627+ XLogRecPtr apply = sync_standbys [i ].apply ;
605628
606629if (XLogRecPtrIsInvalid (* writePtr )|| * writePtr > write )
607630* writePtr = write ;
@@ -617,38 +640,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
617640 * standbys.
618641 */
619642static void
620- SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
621- XLogRecPtr * applyPtr ,List * sync_standbys ,uint8 nth )
643+ SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
644+ XLogRecPtr * flushPtr ,
645+ XLogRecPtr * applyPtr ,
646+ SyncRepStandbyData * sync_standbys ,
647+ int num_standbys ,
648+ uint8 nth )
622649{
623- ListCell * cell ;
624650XLogRecPtr * write_array ;
625651XLogRecPtr * flush_array ;
626652XLogRecPtr * apply_array ;
627- int len ;
628- int i = 0 ;
629-
630- len = list_length (sync_standbys );
631- write_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
632- flush_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
633- apply_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
653+ int i ;
634654
635- foreach (cell ,sync_standbys )
636- {
637- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
655+ /* Should have enough candidates, or somebody messed up */
656+ Assert (nth > 0 && nth <=num_standbys );
638657
639- SpinLockAcquire (& walsnd -> mutex );
640- write_array [i ]= walsnd -> write ;
641- flush_array [i ]= walsnd -> flush ;
642- apply_array [i ]= walsnd -> apply ;
643- SpinLockRelease (& walsnd -> mutex );
658+ write_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* num_standbys );
659+ flush_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* num_standbys );
660+ apply_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* num_standbys );
644661
645- i ++ ;
662+ for (i = 0 ;i < num_standbys ;i ++ )
663+ {
664+ write_array [i ]= sync_standbys [i ].write ;
665+ flush_array [i ]= sync_standbys [i ].flush ;
666+ apply_array [i ]= sync_standbys [i ].apply ;
646667}
647668
648669/* Sort each array in descending order */
649- qsort (write_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
650- qsort (flush_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
651- qsort (apply_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
670+ qsort (write_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
671+ qsort (flush_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
672+ qsort (apply_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
652673
653674/* Get Nth latest Write, Flush, Apply positions */
654675* writePtr = write_array [nth - 1 ];
@@ -677,13 +698,122 @@ cmp_lsn(const void *a, const void *b)
677698return 1 ;
678699}
679700
701+ /*
702+ * Return data about walsenders that are candidates to be sync standbys.
703+ *
704+ * *standbys is set to a palloc'd array of structs of per-walsender data,
705+ * and the number of valid entries (candidate sync senders) is returned.
706+ * (This might be more or fewer than num_sync; caller must check.)
707+ */
708+ int
709+ SyncRepGetCandidateStandbys (SyncRepStandbyData * * standbys )
710+ {
711+ int i ;
712+ int n ;
713+
714+ /* Create result array */
715+ * standbys = (SyncRepStandbyData * )
716+ palloc (max_wal_senders * sizeof (SyncRepStandbyData ));
717+
718+ /* Quick exit if sync replication is not requested */
719+ if (SyncRepConfig == NULL )
720+ return 0 ;
721+
722+ /* Collect raw data from shared memory */
723+ n = 0 ;
724+ for (i = 0 ;i < max_wal_senders ;i ++ )
725+ {
726+ volatile WalSnd * walsnd ;/* Use volatile pointer to prevent code
727+ * rearrangement */
728+ SyncRepStandbyData * stby ;
729+ WalSndState state ;/* not included in SyncRepStandbyData */
730+
731+ walsnd = & WalSndCtl -> walsnds [i ];
732+ stby = * standbys + n ;
733+
734+ SpinLockAcquire (& walsnd -> mutex );
735+ stby -> pid = walsnd -> pid ;
736+ state = walsnd -> state ;
737+ stby -> write = walsnd -> write ;
738+ stby -> flush = walsnd -> flush ;
739+ stby -> apply = walsnd -> apply ;
740+ stby -> sync_standby_priority = walsnd -> sync_standby_priority ;
741+ SpinLockRelease (& walsnd -> mutex );
742+
743+ /* Must be active */
744+ if (stby -> pid == 0 )
745+ continue ;
746+
747+ /* Must be streaming or stopping */
748+ if (state != WALSNDSTATE_STREAMING &&
749+ state != WALSNDSTATE_STOPPING )
750+ continue ;
751+
752+ /* Must be synchronous */
753+ if (stby -> sync_standby_priority == 0 )
754+ continue ;
755+
756+ /* Must have a valid flush position */
757+ if (XLogRecPtrIsInvalid (stby -> flush ))
758+ continue ;
759+
760+ /* OK, it's a candidate */
761+ stby -> walsnd_index = i ;
762+ stby -> is_me = (walsnd == MyWalSnd );
763+ n ++ ;
764+ }
765+
766+ /*
767+ * In quorum mode, we return all the candidates. In priority mode, if we
768+ * have too many candidates then return only the num_sync ones of highest
769+ * priority.
770+ */
771+ if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY &&
772+ n > SyncRepConfig -> num_sync )
773+ {
774+ /* Sort by priority ... */
775+ qsort (* standbys ,n ,sizeof (SyncRepStandbyData ),
776+ standby_priority_comparator );
777+ /* ... then report just the first num_sync ones */
778+ n = SyncRepConfig -> num_sync ;
779+ }
780+
781+ return n ;
782+ }
783+
784+ /*
785+ * qsort comparator to sort SyncRepStandbyData entries by priority
786+ */
787+ static int
788+ standby_priority_comparator (const void * a ,const void * b )
789+ {
790+ const SyncRepStandbyData * sa = (const SyncRepStandbyData * )a ;
791+ const SyncRepStandbyData * sb = (const SyncRepStandbyData * )b ;
792+
793+ /* First, sort by increasing priority value */
794+ if (sa -> sync_standby_priority != sb -> sync_standby_priority )
795+ return sa -> sync_standby_priority - sb -> sync_standby_priority ;
796+
797+ /*
798+ * We might have equal priority values; arbitrarily break ties by position
799+ * in the WALSnd array. (This is utterly bogus, since that is arrival
800+ * order dependent, but there are regression tests that rely on it.)
801+ */
802+ return sa -> walsnd_index - sb -> walsnd_index ;
803+ }
804+
805+
680806/*
681807 * Return the list of sync standbys, or NIL if no sync standby is connected.
682808 *
683809 * The caller must hold SyncRepLock.
684810 *
685811 * On return, *am_sync is set to true if this walsender is connecting to
686812 * sync standby. Otherwise it's set to false.
813+ *
814+ * XXX This function is BROKEN and should not be used in new code. It has
815+ * an inherent race condition, since the returned list of integer indexes
816+ * might no longer correspond to reality.
687817 */
688818List *
689819SyncRepGetSyncStandbys (bool * am_sync )
@@ -943,8 +1073,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
9431073priority = next_highest_priority ;
9441074}
9451075
946- /* never reached, but keep compiler quiet */
947- Assert (false);
1076+ /*
1077+ * We might get here if the set of sync_standby_priority values in shared
1078+ * memory is inconsistent, as can happen transiently after a change in the
1079+ * synchronous_standby_names setting. In that case, just return the
1080+ * incomplete list we have so far. That will cause the caller to decide
1081+ * there aren't enough synchronous candidates, which should be a safe
1082+ * choice until the priority values become consistent again.
1083+ */
1084+ list_free (pending );
9481085return result ;
9491086}
9501087