@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108
108
static void SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
109
109
XLogRecPtr * flushPtr ,
110
110
XLogRecPtr * applyPtr ,
111
- List * sync_standbys );
111
+ SyncRepStandbyData * sync_standbys ,
112
+ int num_standbys );
112
113
static void SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
113
114
XLogRecPtr * flushPtr ,
114
115
XLogRecPtr * applyPtr ,
115
- List * sync_standbys ,uint8 nth );
116
+ SyncRepStandbyData * sync_standbys ,
117
+ int num_standbys ,
118
+ uint8 nth );
116
119
static int SyncRepGetStandbyPriority (void );
117
120
static List * SyncRepGetSyncStandbysPriority (bool * am_sync );
118
121
static List * SyncRepGetSyncStandbysQuorum (bool * am_sync );
122
+ static int standby_priority_comparator (const void * a ,const void * b );
119
123
static int cmp_lsn (const void * a ,const void * b );
120
124
121
125
#ifdef USE_ASSERT_CHECKING
@@ -398,9 +402,10 @@ SyncRepInitConfig(void)
398
402
priority = SyncRepGetStandbyPriority ();
399
403
if (MyWalSnd -> sync_standby_priority != priority )
400
404
{
401
- LWLockAcquire ( SyncRepLock , LW_EXCLUSIVE );
405
+ SpinLockAcquire ( & MyWalSnd -> mutex );
402
406
MyWalSnd -> sync_standby_priority = priority ;
403
- LWLockRelease (SyncRepLock );
407
+ SpinLockRelease (& MyWalSnd -> mutex );
408
+
404
409
ereport (DEBUG1 ,
405
410
(errmsg ("standby \"%s\" now has synchronous standby priority %u" ,
406
411
application_name ,priority )));
@@ -451,7 +456,11 @@ SyncRepReleaseWaiters(void)
451
456
452
457
/*
453
458
* Check whether we are a sync standby or not, and calculate the synced
454
- * positions among all sync standbys.
459
+ * positions among all sync standbys. (Note: although this step does not
460
+ * of itself require holding SyncRepLock, it seems like a good idea to do
461
+ * it after acquiring the lock. This ensures that the WAL pointers we use
462
+ * to release waiters are newer than any previous execution of this
463
+ * routine used.)
455
464
*/
456
465
got_recptr = SyncRepGetSyncRecPtr (& writePtr ,& flushPtr ,& applyPtr ,& am_sync );
457
466
@@ -526,25 +535,41 @@ static bool
526
535
SyncRepGetSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
527
536
XLogRecPtr * applyPtr ,bool * am_sync )
528
537
{
529
- List * sync_standbys ;
538
+ SyncRepStandbyData * sync_standbys ;
539
+ int num_standbys ;
540
+ int i ;
530
541
542
+ /* Initialize default results */
531
543
* writePtr = InvalidXLogRecPtr ;
532
544
* flushPtr = InvalidXLogRecPtr ;
533
545
* applyPtr = InvalidXLogRecPtr ;
534
546
* am_sync = false;
535
547
548
+ /* Quick out if not even configured to be synchronous */
549
+ if (SyncRepConfig == NULL )
550
+ return false;
551
+
536
552
/* Get standbys that are considered as synchronous at this moment */
537
- sync_standbys = SyncRepGetSyncStandbys (am_sync );
553
+ num_standbys = SyncRepGetCandidateStandbys (& sync_standbys );
554
+
555
+ /* Am I among the candidate sync standbys? */
556
+ for (i = 0 ;i < num_standbys ;i ++ )
557
+ {
558
+ if (sync_standbys [i ].is_me )
559
+ {
560
+ * am_sync = true;
561
+ break ;
562
+ }
563
+ }
538
564
539
565
/*
540
- *Quick exit if we are not managing a sync standby or there are not
541
- * enough synchronous standbys.
566
+ *Nothing more to do if we are not managing a sync standby or there are
567
+ *not enough synchronous standbys.
542
568
*/
543
569
if (!(* am_sync )||
544
- SyncRepConfig == NULL ||
545
- list_length (sync_standbys )< SyncRepConfig -> num_sync )
570
+ num_standbys < SyncRepConfig -> num_sync )
546
571
{
547
- list_free (sync_standbys );
572
+ pfree (sync_standbys );
548
573
return false;
549
574
}
550
575
@@ -564,43 +589,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
564
589
if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY )
565
590
{
566
591
SyncRepGetOldestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
567
- sync_standbys );
592
+ sync_standbys , num_standbys );
568
593
}
569
594
else
570
595
{
571
596
SyncRepGetNthLatestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
572
- sync_standbys ,SyncRepConfig -> num_sync );
597
+ sync_standbys ,num_standbys ,
598
+ SyncRepConfig -> num_sync );
573
599
}
574
600
575
- list_free (sync_standbys );
601
+ pfree (sync_standbys );
576
602
return true;
577
603
}
578
604
579
605
/*
580
606
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
581
607
*/
582
608
static void
583
- SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
584
- XLogRecPtr * applyPtr ,List * sync_standbys )
609
+ SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
610
+ XLogRecPtr * flushPtr ,
611
+ XLogRecPtr * applyPtr ,
612
+ SyncRepStandbyData * sync_standbys ,
613
+ int num_standbys )
585
614
{
586
- ListCell * cell ;
615
+ int i ;
587
616
588
617
/*
589
618
* Scan through all sync standbys and calculate the oldest Write, Flush
590
- * and Apply positions.
619
+ * and Apply positions. We assume *writePtr et al were initialized to
620
+ * InvalidXLogRecPtr.
591
621
*/
592
- foreach ( cell , sync_standbys )
622
+ for ( i = 0 ; i < num_standbys ; i ++ )
593
623
{
594
- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
595
- XLogRecPtr write ;
596
- XLogRecPtr flush ;
597
- XLogRecPtr apply ;
598
-
599
- SpinLockAcquire (& walsnd -> mutex );
600
- write = walsnd -> write ;
601
- flush = walsnd -> flush ;
602
- apply = walsnd -> apply ;
603
- SpinLockRelease (& walsnd -> mutex );
624
+ XLogRecPtr write = sync_standbys [i ].write ;
625
+ XLogRecPtr flush = sync_standbys [i ].flush ;
626
+ XLogRecPtr apply = sync_standbys [i ].apply ;
604
627
605
628
if (XLogRecPtrIsInvalid (* writePtr )|| * writePtr > write )
606
629
* writePtr = write ;
@@ -616,38 +639,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
616
639
* standbys.
617
640
*/
618
641
static void
619
- SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
620
- XLogRecPtr * applyPtr ,List * sync_standbys ,uint8 nth )
642
+ SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
643
+ XLogRecPtr * flushPtr ,
644
+ XLogRecPtr * applyPtr ,
645
+ SyncRepStandbyData * sync_standbys ,
646
+ int num_standbys ,
647
+ uint8 nth )
621
648
{
622
- ListCell * cell ;
623
649
XLogRecPtr * write_array ;
624
650
XLogRecPtr * flush_array ;
625
651
XLogRecPtr * apply_array ;
626
- int len ;
627
- int i = 0 ;
652
+ int i ;
628
653
629
- len = list_length (sync_standbys );
630
- write_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
631
- flush_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
632
- apply_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
654
+ /* Should have enough candidates, or somebody messed up */
655
+ Assert (nth > 0 && nth <=num_standbys );
633
656
634
- foreach ( cell , sync_standbys )
635
- {
636
- WalSnd * walsnd = & WalSndCtl -> walsnds [ lfirst_int ( cell )] ;
657
+ write_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys );
658
+ flush_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys );
659
+ apply_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys ) ;
637
660
638
- SpinLockAcquire (& walsnd -> mutex );
639
- write_array [i ]= walsnd -> write ;
640
- flush_array [i ]= walsnd -> flush ;
641
- apply_array [i ]= walsnd -> apply ;
642
- SpinLockRelease (& walsnd -> mutex );
643
-
644
- i ++ ;
661
+ for (i = 0 ;i < num_standbys ;i ++ )
662
+ {
663
+ write_array [i ]= sync_standbys [i ].write ;
664
+ flush_array [i ]= sync_standbys [i ].flush ;
665
+ apply_array [i ]= sync_standbys [i ].apply ;
645
666
}
646
667
647
668
/* Sort each array in descending order */
648
- qsort (write_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
649
- qsort (flush_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
650
- qsort (apply_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
669
+ qsort (write_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
670
+ qsort (flush_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
671
+ qsort (apply_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
651
672
652
673
/* Get Nth latest Write, Flush, Apply positions */
653
674
* writePtr = write_array [nth - 1 ];
@@ -676,13 +697,122 @@ cmp_lsn(const void *a, const void *b)
676
697
return 1 ;
677
698
}
678
699
700
+ /*
701
+ * Return data about walsenders that are candidates to be sync standbys.
702
+ *
703
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
704
+ * and the number of valid entries (candidate sync senders) is returned.
705
+ * (This might be more or fewer than num_sync; caller must check.)
706
+ */
707
+ int
708
+ SyncRepGetCandidateStandbys (SyncRepStandbyData * * standbys )
709
+ {
710
+ int i ;
711
+ int n ;
712
+
713
+ /* Create result array */
714
+ * standbys = (SyncRepStandbyData * )
715
+ palloc (max_wal_senders * sizeof (SyncRepStandbyData ));
716
+
717
+ /* Quick exit if sync replication is not requested */
718
+ if (SyncRepConfig == NULL )
719
+ return 0 ;
720
+
721
+ /* Collect raw data from shared memory */
722
+ n = 0 ;
723
+ for (i = 0 ;i < max_wal_senders ;i ++ )
724
+ {
725
+ volatile WalSnd * walsnd ;/* Use volatile pointer to prevent code
726
+ * rearrangement */
727
+ SyncRepStandbyData * stby ;
728
+ WalSndState state ;/* not included in SyncRepStandbyData */
729
+
730
+ walsnd = & WalSndCtl -> walsnds [i ];
731
+ stby = * standbys + n ;
732
+
733
+ SpinLockAcquire (& walsnd -> mutex );
734
+ stby -> pid = walsnd -> pid ;
735
+ state = walsnd -> state ;
736
+ stby -> write = walsnd -> write ;
737
+ stby -> flush = walsnd -> flush ;
738
+ stby -> apply = walsnd -> apply ;
739
+ stby -> sync_standby_priority = walsnd -> sync_standby_priority ;
740
+ SpinLockRelease (& walsnd -> mutex );
741
+
742
+ /* Must be active */
743
+ if (stby -> pid == 0 )
744
+ continue ;
745
+
746
+ /* Must be streaming or stopping */
747
+ if (state != WALSNDSTATE_STREAMING &&
748
+ state != WALSNDSTATE_STOPPING )
749
+ continue ;
750
+
751
+ /* Must be synchronous */
752
+ if (stby -> sync_standby_priority == 0 )
753
+ continue ;
754
+
755
+ /* Must have a valid flush position */
756
+ if (XLogRecPtrIsInvalid (stby -> flush ))
757
+ continue ;
758
+
759
+ /* OK, it's a candidate */
760
+ stby -> walsnd_index = i ;
761
+ stby -> is_me = (walsnd == MyWalSnd );
762
+ n ++ ;
763
+ }
764
+
765
+ /*
766
+ * In quorum mode, we return all the candidates. In priority mode, if we
767
+ * have too many candidates then return only the num_sync ones of highest
768
+ * priority.
769
+ */
770
+ if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY &&
771
+ n > SyncRepConfig -> num_sync )
772
+ {
773
+ /* Sort by priority ... */
774
+ qsort (* standbys ,n ,sizeof (SyncRepStandbyData ),
775
+ standby_priority_comparator );
776
+ /* ... then report just the first num_sync ones */
777
+ n = SyncRepConfig -> num_sync ;
778
+ }
779
+
780
+ return n ;
781
+ }
782
+
783
+ /*
784
+ * qsort comparator to sort SyncRepStandbyData entries by priority
785
+ */
786
+ static int
787
+ standby_priority_comparator (const void * a ,const void * b )
788
+ {
789
+ const SyncRepStandbyData * sa = (const SyncRepStandbyData * )a ;
790
+ const SyncRepStandbyData * sb = (const SyncRepStandbyData * )b ;
791
+
792
+ /* First, sort by increasing priority value */
793
+ if (sa -> sync_standby_priority != sb -> sync_standby_priority )
794
+ return sa -> sync_standby_priority - sb -> sync_standby_priority ;
795
+
796
+ /*
797
+ * We might have equal priority values; arbitrarily break ties by position
798
+ * in the WALSnd array. (This is utterly bogus, since that is arrival
799
+ * order dependent, but there are regression tests that rely on it.)
800
+ */
801
+ return sa -> walsnd_index - sb -> walsnd_index ;
802
+ }
803
+
804
+
679
805
/*
680
806
* Return the list of sync standbys, or NIL if no sync standby is connected.
681
807
*
682
808
* The caller must hold SyncRepLock.
683
809
*
684
810
* On return, *am_sync is set to true if this walsender is connecting to
685
811
* sync standby. Otherwise it's set to false.
812
+ *
813
+ * XXX This function is BROKEN and should not be used in new code. It has
814
+ * an inherent race condition, since the returned list of integer indexes
815
+ * might no longer correspond to reality.
686
816
*/
687
817
List *
688
818
SyncRepGetSyncStandbys (bool * am_sync )
@@ -942,8 +1072,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
942
1072
priority = next_highest_priority ;
943
1073
}
944
1074
945
- /* never reached, but keep compiler quiet */
946
- Assert (false);
1075
+ /*
1076
+ * We might get here if the set of sync_standby_priority values in shared
1077
+ * memory is inconsistent, as can happen transiently after a change in the
1078
+ * synchronous_standby_names setting. In that case, just return the
1079
+ * incomplete list we have so far. That will cause the caller to decide
1080
+ * there aren't enough synchronous candidates, which should be a safe
1081
+ * choice until the priority values become consistent again.
1082
+ */
1083
+ list_free (pending );
947
1084
return result ;
948
1085
}
949
1086