@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108108static void SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
109109XLogRecPtr * flushPtr ,
110110XLogRecPtr * applyPtr ,
111- List * sync_standbys );
111+ SyncRepStandbyData * sync_standbys ,
112+ int num_standbys );
112113static void SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
113114XLogRecPtr * flushPtr ,
114115XLogRecPtr * applyPtr ,
115- List * sync_standbys ,uint8 nth );
116+ SyncRepStandbyData * sync_standbys ,
117+ int num_standbys ,
118+ uint8 nth );
116119static int SyncRepGetStandbyPriority (void );
117120static List * SyncRepGetSyncStandbysPriority (bool * am_sync );
118121static List * SyncRepGetSyncStandbysQuorum (bool * am_sync );
122+ static int standby_priority_comparator (const void * a ,const void * b );
119123static int cmp_lsn (const void * a ,const void * b );
120124
121125#ifdef USE_ASSERT_CHECKING
@@ -400,9 +404,10 @@ SyncRepInitConfig(void)
400404priority = SyncRepGetStandbyPriority ();
401405if (MyWalSnd -> sync_standby_priority != priority )
402406{
403- LWLockAcquire ( SyncRepLock , LW_EXCLUSIVE );
407+ SpinLockAcquire ( & MyWalSnd -> mutex );
404408MyWalSnd -> sync_standby_priority = priority ;
405- LWLockRelease (SyncRepLock );
409+ SpinLockRelease (& MyWalSnd -> mutex );
410+
406411ereport (DEBUG1 ,
407412(errmsg ("standby \"%s\" now has synchronous standby priority %u" ,
408413application_name ,priority )));
@@ -453,7 +458,11 @@ SyncRepReleaseWaiters(void)
453458
454459/*
455460 * Check whether we are a sync standby or not, and calculate the synced
456- * positions among all sync standbys.
461+ * positions among all sync standbys. (Note: although this step does not
462+ * of itself require holding SyncRepLock, it seems like a good idea to do
463+ * it after acquiring the lock. This ensures that the WAL pointers we use
464+ * to release waiters are newer than any previous execution of this
465+ * routine used.)
457466 */
458467got_recptr = SyncRepGetSyncRecPtr (& writePtr ,& flushPtr ,& applyPtr ,& am_sync );
459468
@@ -528,25 +537,41 @@ static bool
528537SyncRepGetSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
529538XLogRecPtr * applyPtr ,bool * am_sync )
530539{
531- List * sync_standbys ;
540+ SyncRepStandbyData * sync_standbys ;
541+ int num_standbys ;
542+ int i ;
532543
544+ /* Initialize default results */
533545* writePtr = InvalidXLogRecPtr ;
534546* flushPtr = InvalidXLogRecPtr ;
535547* applyPtr = InvalidXLogRecPtr ;
536548* am_sync = false;
537549
550+ /* Quick out if not even configured to be synchronous */
551+ if (SyncRepConfig == NULL )
552+ return false;
553+
538554/* Get standbys that are considered as synchronous at this moment */
539- sync_standbys = SyncRepGetSyncStandbys (am_sync );
555+ num_standbys = SyncRepGetCandidateStandbys (& sync_standbys );
556+
557+ /* Am I among the candidate sync standbys? */
558+ for (i = 0 ;i < num_standbys ;i ++ )
559+ {
560+ if (sync_standbys [i ].is_me )
561+ {
562+ * am_sync = true;
563+ break ;
564+ }
565+ }
540566
541567/*
542- *Quick exit if we are not managing a sync standby or there are not
543- * enough synchronous standbys.
568+ *Nothing more to do if we are not managing a sync standby or there are
569+ *not enough synchronous standbys.
544570 */
545571if (!(* am_sync )||
546- SyncRepConfig == NULL ||
547- list_length (sync_standbys )< SyncRepConfig -> num_sync )
572+ num_standbys < SyncRepConfig -> num_sync )
548573{
549- list_free (sync_standbys );
574+ pfree (sync_standbys );
550575return false;
551576}
552577
@@ -566,43 +591,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
566591if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY )
567592{
568593SyncRepGetOldestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
569- sync_standbys );
594+ sync_standbys , num_standbys );
570595}
571596else
572597{
573598SyncRepGetNthLatestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
574- sync_standbys ,SyncRepConfig -> num_sync );
599+ sync_standbys ,num_standbys ,
600+ SyncRepConfig -> num_sync );
575601}
576602
577- list_free (sync_standbys );
603+ pfree (sync_standbys );
578604return true;
579605}
580606
581607/*
582608 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
583609 */
584610static void
585- SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
586- XLogRecPtr * applyPtr ,List * sync_standbys )
611+ SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
612+ XLogRecPtr * flushPtr ,
613+ XLogRecPtr * applyPtr ,
614+ SyncRepStandbyData * sync_standbys ,
615+ int num_standbys )
587616{
588- ListCell * cell ;
617+ int i ;
589618
590619/*
591620 * Scan through all sync standbys and calculate the oldest Write, Flush
592- * and Apply positions.
621+ * and Apply positions. We assume *writePtr et al were initialized to
622+ * InvalidXLogRecPtr.
593623 */
594- foreach ( cell , sync_standbys )
624+ for ( i = 0 ; i < num_standbys ; i ++ )
595625{
596- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
597- XLogRecPtr write ;
598- XLogRecPtr flush ;
599- XLogRecPtr apply ;
600-
601- SpinLockAcquire (& walsnd -> mutex );
602- write = walsnd -> write ;
603- flush = walsnd -> flush ;
604- apply = walsnd -> apply ;
605- SpinLockRelease (& walsnd -> mutex );
626+ XLogRecPtr write = sync_standbys [i ].write ;
627+ XLogRecPtr flush = sync_standbys [i ].flush ;
628+ XLogRecPtr apply = sync_standbys [i ].apply ;
606629
607630if (XLogRecPtrIsInvalid (* writePtr )|| * writePtr > write )
608631* writePtr = write ;
@@ -618,38 +641,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
618641 * standbys.
619642 */
620643static void
621- SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
622- XLogRecPtr * applyPtr ,List * sync_standbys ,uint8 nth )
644+ SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
645+ XLogRecPtr * flushPtr ,
646+ XLogRecPtr * applyPtr ,
647+ SyncRepStandbyData * sync_standbys ,
648+ int num_standbys ,
649+ uint8 nth )
623650{
624- ListCell * cell ;
625651XLogRecPtr * write_array ;
626652XLogRecPtr * flush_array ;
627653XLogRecPtr * apply_array ;
628- int len ;
629- int i = 0 ;
654+ int i ;
630655
631- len = list_length (sync_standbys );
632- write_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
633- flush_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
634- apply_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
656+ /* Should have enough candidates, or somebody messed up */
657+ Assert (nth > 0 && nth <=num_standbys );
635658
636- foreach ( cell , sync_standbys )
637- {
638- WalSnd * walsnd = & WalSndCtl -> walsnds [ lfirst_int ( cell )] ;
659+ write_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys );
660+ flush_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys );
661+ apply_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys ) ;
639662
640- SpinLockAcquire (& walsnd -> mutex );
641- write_array [i ]= walsnd -> write ;
642- flush_array [i ]= walsnd -> flush ;
643- apply_array [i ]= walsnd -> apply ;
644- SpinLockRelease (& walsnd -> mutex );
645-
646- i ++ ;
663+ for (i = 0 ;i < num_standbys ;i ++ )
664+ {
665+ write_array [i ]= sync_standbys [i ].write ;
666+ flush_array [i ]= sync_standbys [i ].flush ;
667+ apply_array [i ]= sync_standbys [i ].apply ;
647668}
648669
649670/* Sort each array in descending order */
650- qsort (write_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
651- qsort (flush_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
652- qsort (apply_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
671+ qsort (write_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
672+ qsort (flush_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
673+ qsort (apply_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
653674
654675/* Get Nth latest Write, Flush, Apply positions */
655676* writePtr = write_array [nth - 1 ];
@@ -678,13 +699,122 @@ cmp_lsn(const void *a, const void *b)
678699return 1 ;
679700}
680701
702+ /*
703+ * Return data about walsenders that are candidates to be sync standbys.
704+ *
705+ * *standbys is set to a palloc'd array of structs of per-walsender data,
706+ * and the number of valid entries (candidate sync senders) is returned.
707+ * (This might be more or fewer than num_sync; caller must check.)
708+ */
709+ int
710+ SyncRepGetCandidateStandbys (SyncRepStandbyData * * standbys )
711+ {
712+ int i ;
713+ int n ;
714+
715+ /* Create result array */
716+ * standbys = (SyncRepStandbyData * )
717+ palloc (max_wal_senders * sizeof (SyncRepStandbyData ));
718+
719+ /* Quick exit if sync replication is not requested */
720+ if (SyncRepConfig == NULL )
721+ return 0 ;
722+
723+ /* Collect raw data from shared memory */
724+ n = 0 ;
725+ for (i = 0 ;i < max_wal_senders ;i ++ )
726+ {
727+ volatile WalSnd * walsnd ;/* Use volatile pointer to prevent code
728+ * rearrangement */
729+ SyncRepStandbyData * stby ;
730+ WalSndState state ;/* not included in SyncRepStandbyData */
731+
732+ walsnd = & WalSndCtl -> walsnds [i ];
733+ stby = * standbys + n ;
734+
735+ SpinLockAcquire (& walsnd -> mutex );
736+ stby -> pid = walsnd -> pid ;
737+ state = walsnd -> state ;
738+ stby -> write = walsnd -> write ;
739+ stby -> flush = walsnd -> flush ;
740+ stby -> apply = walsnd -> apply ;
741+ stby -> sync_standby_priority = walsnd -> sync_standby_priority ;
742+ SpinLockRelease (& walsnd -> mutex );
743+
744+ /* Must be active */
745+ if (stby -> pid == 0 )
746+ continue ;
747+
748+ /* Must be streaming or stopping */
749+ if (state != WALSNDSTATE_STREAMING &&
750+ state != WALSNDSTATE_STOPPING )
751+ continue ;
752+
753+ /* Must be synchronous */
754+ if (stby -> sync_standby_priority == 0 )
755+ continue ;
756+
757+ /* Must have a valid flush position */
758+ if (XLogRecPtrIsInvalid (stby -> flush ))
759+ continue ;
760+
761+ /* OK, it's a candidate */
762+ stby -> walsnd_index = i ;
763+ stby -> is_me = (walsnd == MyWalSnd );
764+ n ++ ;
765+ }
766+
767+ /*
768+ * In quorum mode, we return all the candidates. In priority mode, if we
769+ * have too many candidates then return only the num_sync ones of highest
770+ * priority.
771+ */
772+ if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY &&
773+ n > SyncRepConfig -> num_sync )
774+ {
775+ /* Sort by priority ... */
776+ qsort (* standbys ,n ,sizeof (SyncRepStandbyData ),
777+ standby_priority_comparator );
778+ /* ... then report just the first num_sync ones */
779+ n = SyncRepConfig -> num_sync ;
780+ }
781+
782+ return n ;
783+ }
784+
785+ /*
786+ * qsort comparator to sort SyncRepStandbyData entries by priority
787+ */
788+ static int
789+ standby_priority_comparator (const void * a ,const void * b )
790+ {
791+ const SyncRepStandbyData * sa = (const SyncRepStandbyData * )a ;
792+ const SyncRepStandbyData * sb = (const SyncRepStandbyData * )b ;
793+
794+ /* First, sort by increasing priority value */
795+ if (sa -> sync_standby_priority != sb -> sync_standby_priority )
796+ return sa -> sync_standby_priority - sb -> sync_standby_priority ;
797+
798+ /*
799+ * We might have equal priority values; arbitrarily break ties by position
800+ * in the WALSnd array. (This is utterly bogus, since that is arrival
801+ * order dependent, but there are regression tests that rely on it.)
802+ */
803+ return sa -> walsnd_index - sb -> walsnd_index ;
804+ }
805+
806+
681807/*
682808 * Return the list of sync standbys, or NIL if no sync standby is connected.
683809 *
684810 * The caller must hold SyncRepLock.
685811 *
686812 * On return, *am_sync is set to true if this walsender is connecting to
687813 * sync standby. Otherwise it's set to false.
814+ *
815+ * XXX This function is BROKEN and should not be used in new code. It has
816+ * an inherent race condition, since the returned list of integer indexes
817+ * might no longer correspond to reality.
688818 */
689819List *
690820SyncRepGetSyncStandbys (bool * am_sync )
@@ -944,8 +1074,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
9441074priority = next_highest_priority ;
9451075}
9461076
947- /* never reached, but keep compiler quiet */
948- Assert (false);
1077+ /*
1078+ * We might get here if the set of sync_standby_priority values in shared
1079+ * memory is inconsistent, as can happen transiently after a change in the
1080+ * synchronous_standby_names setting. In that case, just return the
1081+ * incomplete list we have so far. That will cause the caller to decide
1082+ * there aren't enough synchronous candidates, which should be a safe
1083+ * choice until the priority values become consistent again.
1084+ */
1085+ list_free (pending );
9491086return result ;
9501087}
9511088