@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108
108
static void SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
109
109
XLogRecPtr * flushPtr ,
110
110
XLogRecPtr * applyPtr ,
111
- List * sync_standbys );
111
+ SyncRepStandbyData * sync_standbys ,
112
+ int num_standbys );
112
113
static void SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
113
114
XLogRecPtr * flushPtr ,
114
115
XLogRecPtr * applyPtr ,
115
- List * sync_standbys ,uint8 nth );
116
+ SyncRepStandbyData * sync_standbys ,
117
+ int num_standbys ,
118
+ uint8 nth );
116
119
static int SyncRepGetStandbyPriority (void );
117
120
static List * SyncRepGetSyncStandbysPriority (bool * am_sync );
118
121
static List * SyncRepGetSyncStandbysQuorum (bool * am_sync );
122
+ static int standby_priority_comparator (const void * a ,const void * b );
119
123
static int cmp_lsn (const void * a ,const void * b );
120
124
121
125
#ifdef USE_ASSERT_CHECKING
@@ -400,9 +404,10 @@ SyncRepInitConfig(void)
400
404
priority = SyncRepGetStandbyPriority ();
401
405
if (MyWalSnd -> sync_standby_priority != priority )
402
406
{
403
- LWLockAcquire ( SyncRepLock , LW_EXCLUSIVE );
407
+ SpinLockAcquire ( & MyWalSnd -> mutex );
404
408
MyWalSnd -> sync_standby_priority = priority ;
405
- LWLockRelease (SyncRepLock );
409
+ SpinLockRelease (& MyWalSnd -> mutex );
410
+
406
411
ereport (DEBUG1 ,
407
412
(errmsg ("standby \"%s\" now has synchronous standby priority %u" ,
408
413
application_name ,priority )));
@@ -453,7 +458,11 @@ SyncRepReleaseWaiters(void)
453
458
454
459
/*
455
460
* Check whether we are a sync standby or not, and calculate the synced
456
- * positions among all sync standbys.
461
+ * positions among all sync standbys. (Note: although this step does not
462
+ * of itself require holding SyncRepLock, it seems like a good idea to do
463
+ * it after acquiring the lock. This ensures that the WAL pointers we use
464
+ * to release waiters are newer than any previous execution of this
465
+ * routine used.)
457
466
*/
458
467
got_recptr = SyncRepGetSyncRecPtr (& writePtr ,& flushPtr ,& applyPtr ,& am_sync );
459
468
@@ -528,25 +537,41 @@ static bool
528
537
SyncRepGetSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
529
538
XLogRecPtr * applyPtr ,bool * am_sync )
530
539
{
531
- List * sync_standbys ;
540
+ SyncRepStandbyData * sync_standbys ;
541
+ int num_standbys ;
542
+ int i ;
532
543
544
+ /* Initialize default results */
533
545
* writePtr = InvalidXLogRecPtr ;
534
546
* flushPtr = InvalidXLogRecPtr ;
535
547
* applyPtr = InvalidXLogRecPtr ;
536
548
* am_sync = false;
537
549
550
+ /* Quick out if not even configured to be synchronous */
551
+ if (SyncRepConfig == NULL )
552
+ return false;
553
+
538
554
/* Get standbys that are considered as synchronous at this moment */
539
- sync_standbys = SyncRepGetSyncStandbys (am_sync );
555
+ num_standbys = SyncRepGetCandidateStandbys (& sync_standbys );
556
+
557
+ /* Am I among the candidate sync standbys? */
558
+ for (i = 0 ;i < num_standbys ;i ++ )
559
+ {
560
+ if (sync_standbys [i ].is_me )
561
+ {
562
+ * am_sync = true;
563
+ break ;
564
+ }
565
+ }
540
566
541
567
/*
542
- *Quick exit if we are not managing a sync standby or there are not
543
- * enough synchronous standbys.
568
+ *Nothing more to do if we are not managing a sync standby or there are
569
+ *not enough synchronous standbys.
544
570
*/
545
571
if (!(* am_sync )||
546
- SyncRepConfig == NULL ||
547
- list_length (sync_standbys )< SyncRepConfig -> num_sync )
572
+ num_standbys < SyncRepConfig -> num_sync )
548
573
{
549
- list_free (sync_standbys );
574
+ pfree (sync_standbys );
550
575
return false;
551
576
}
552
577
@@ -566,43 +591,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
566
591
if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY )
567
592
{
568
593
SyncRepGetOldestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
569
- sync_standbys );
594
+ sync_standbys , num_standbys );
570
595
}
571
596
else
572
597
{
573
598
SyncRepGetNthLatestSyncRecPtr (writePtr ,flushPtr ,applyPtr ,
574
- sync_standbys ,SyncRepConfig -> num_sync );
599
+ sync_standbys ,num_standbys ,
600
+ SyncRepConfig -> num_sync );
575
601
}
576
602
577
- list_free (sync_standbys );
603
+ pfree (sync_standbys );
578
604
return true;
579
605
}
580
606
581
607
/*
582
608
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
583
609
*/
584
610
static void
585
- SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
586
- XLogRecPtr * applyPtr ,List * sync_standbys )
611
+ SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
612
+ XLogRecPtr * flushPtr ,
613
+ XLogRecPtr * applyPtr ,
614
+ SyncRepStandbyData * sync_standbys ,
615
+ int num_standbys )
587
616
{
588
- ListCell * cell ;
617
+ int i ;
589
618
590
619
/*
591
620
* Scan through all sync standbys and calculate the oldest Write, Flush
592
- * and Apply positions.
621
+ * and Apply positions. We assume *writePtr et al were initialized to
622
+ * InvalidXLogRecPtr.
593
623
*/
594
- foreach ( cell , sync_standbys )
624
+ for ( i = 0 ; i < num_standbys ; i ++ )
595
625
{
596
- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
597
- XLogRecPtr write ;
598
- XLogRecPtr flush ;
599
- XLogRecPtr apply ;
600
-
601
- SpinLockAcquire (& walsnd -> mutex );
602
- write = walsnd -> write ;
603
- flush = walsnd -> flush ;
604
- apply = walsnd -> apply ;
605
- SpinLockRelease (& walsnd -> mutex );
626
+ XLogRecPtr write = sync_standbys [i ].write ;
627
+ XLogRecPtr flush = sync_standbys [i ].flush ;
628
+ XLogRecPtr apply = sync_standbys [i ].apply ;
606
629
607
630
if (XLogRecPtrIsInvalid (* writePtr )|| * writePtr > write )
608
631
* writePtr = write ;
@@ -618,38 +641,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
618
641
* standbys.
619
642
*/
620
643
static void
621
- SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,XLogRecPtr * flushPtr ,
622
- XLogRecPtr * applyPtr ,List * sync_standbys ,uint8 nth )
644
+ SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
645
+ XLogRecPtr * flushPtr ,
646
+ XLogRecPtr * applyPtr ,
647
+ SyncRepStandbyData * sync_standbys ,
648
+ int num_standbys ,
649
+ uint8 nth )
623
650
{
624
- ListCell * cell ;
625
651
XLogRecPtr * write_array ;
626
652
XLogRecPtr * flush_array ;
627
653
XLogRecPtr * apply_array ;
628
- int len ;
629
- int i = 0 ;
654
+ int i ;
630
655
631
- len = list_length (sync_standbys );
632
- write_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
633
- flush_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
634
- apply_array = (XLogRecPtr * )palloc (sizeof (XLogRecPtr )* len );
656
+ /* Should have enough candidates, or somebody messed up */
657
+ Assert (nth > 0 && nth <=num_standbys );
635
658
636
- foreach ( cell , sync_standbys )
637
- {
638
- WalSnd * walsnd = & WalSndCtl -> walsnds [ lfirst_int ( cell )] ;
659
+ write_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys );
660
+ flush_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys );
661
+ apply_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * num_standbys ) ;
639
662
640
- SpinLockAcquire (& walsnd -> mutex );
641
- write_array [i ]= walsnd -> write ;
642
- flush_array [i ]= walsnd -> flush ;
643
- apply_array [i ]= walsnd -> apply ;
644
- SpinLockRelease (& walsnd -> mutex );
645
-
646
- i ++ ;
663
+ for (i = 0 ;i < num_standbys ;i ++ )
664
+ {
665
+ write_array [i ]= sync_standbys [i ].write ;
666
+ flush_array [i ]= sync_standbys [i ].flush ;
667
+ apply_array [i ]= sync_standbys [i ].apply ;
647
668
}
648
669
649
670
/* Sort each array in descending order */
650
- qsort (write_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
651
- qsort (flush_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
652
- qsort (apply_array ,len ,sizeof (XLogRecPtr ),cmp_lsn );
671
+ qsort (write_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
672
+ qsort (flush_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
673
+ qsort (apply_array ,num_standbys ,sizeof (XLogRecPtr ),cmp_lsn );
653
674
654
675
/* Get Nth latest Write, Flush, Apply positions */
655
676
* writePtr = write_array [nth - 1 ];
@@ -678,13 +699,122 @@ cmp_lsn(const void *a, const void *b)
678
699
return 1 ;
679
700
}
680
701
702
+ /*
703
+ * Return data about walsenders that are candidates to be sync standbys.
704
+ *
705
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
706
+ * and the number of valid entries (candidate sync senders) is returned.
707
+ * (This might be more or fewer than num_sync; caller must check.)
708
+ */
709
+ int
710
+ SyncRepGetCandidateStandbys (SyncRepStandbyData * * standbys )
711
+ {
712
+ int i ;
713
+ int n ;
714
+
715
+ /* Create result array */
716
+ * standbys = (SyncRepStandbyData * )
717
+ palloc (max_wal_senders * sizeof (SyncRepStandbyData ));
718
+
719
+ /* Quick exit if sync replication is not requested */
720
+ if (SyncRepConfig == NULL )
721
+ return 0 ;
722
+
723
+ /* Collect raw data from shared memory */
724
+ n = 0 ;
725
+ for (i = 0 ;i < max_wal_senders ;i ++ )
726
+ {
727
+ volatile WalSnd * walsnd ;/* Use volatile pointer to prevent code
728
+ * rearrangement */
729
+ SyncRepStandbyData * stby ;
730
+ WalSndState state ;/* not included in SyncRepStandbyData */
731
+
732
+ walsnd = & WalSndCtl -> walsnds [i ];
733
+ stby = * standbys + n ;
734
+
735
+ SpinLockAcquire (& walsnd -> mutex );
736
+ stby -> pid = walsnd -> pid ;
737
+ state = walsnd -> state ;
738
+ stby -> write = walsnd -> write ;
739
+ stby -> flush = walsnd -> flush ;
740
+ stby -> apply = walsnd -> apply ;
741
+ stby -> sync_standby_priority = walsnd -> sync_standby_priority ;
742
+ SpinLockRelease (& walsnd -> mutex );
743
+
744
+ /* Must be active */
745
+ if (stby -> pid == 0 )
746
+ continue ;
747
+
748
+ /* Must be streaming or stopping */
749
+ if (state != WALSNDSTATE_STREAMING &&
750
+ state != WALSNDSTATE_STOPPING )
751
+ continue ;
752
+
753
+ /* Must be synchronous */
754
+ if (stby -> sync_standby_priority == 0 )
755
+ continue ;
756
+
757
+ /* Must have a valid flush position */
758
+ if (XLogRecPtrIsInvalid (stby -> flush ))
759
+ continue ;
760
+
761
+ /* OK, it's a candidate */
762
+ stby -> walsnd_index = i ;
763
+ stby -> is_me = (walsnd == MyWalSnd );
764
+ n ++ ;
765
+ }
766
+
767
+ /*
768
+ * In quorum mode, we return all the candidates. In priority mode, if we
769
+ * have too many candidates then return only the num_sync ones of highest
770
+ * priority.
771
+ */
772
+ if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY &&
773
+ n > SyncRepConfig -> num_sync )
774
+ {
775
+ /* Sort by priority ... */
776
+ qsort (* standbys ,n ,sizeof (SyncRepStandbyData ),
777
+ standby_priority_comparator );
778
+ /* ... then report just the first num_sync ones */
779
+ n = SyncRepConfig -> num_sync ;
780
+ }
781
+
782
+ return n ;
783
+ }
784
+
785
+ /*
786
+ * qsort comparator to sort SyncRepStandbyData entries by priority
787
+ */
788
+ static int
789
+ standby_priority_comparator (const void * a ,const void * b )
790
+ {
791
+ const SyncRepStandbyData * sa = (const SyncRepStandbyData * )a ;
792
+ const SyncRepStandbyData * sb = (const SyncRepStandbyData * )b ;
793
+
794
+ /* First, sort by increasing priority value */
795
+ if (sa -> sync_standby_priority != sb -> sync_standby_priority )
796
+ return sa -> sync_standby_priority - sb -> sync_standby_priority ;
797
+
798
+ /*
799
+ * We might have equal priority values; arbitrarily break ties by position
800
+ * in the WALSnd array. (This is utterly bogus, since that is arrival
801
+ * order dependent, but there are regression tests that rely on it.)
802
+ */
803
+ return sa -> walsnd_index - sb -> walsnd_index ;
804
+ }
805
+
806
+
681
807
/*
682
808
* Return the list of sync standbys, or NIL if no sync standby is connected.
683
809
*
684
810
* The caller must hold SyncRepLock.
685
811
*
686
812
* On return, *am_sync is set to true if this walsender is connecting to
687
813
* sync standby. Otherwise it's set to false.
814
+ *
815
+ * XXX This function is BROKEN and should not be used in new code. It has
816
+ * an inherent race condition, since the returned list of integer indexes
817
+ * might no longer correspond to reality.
688
818
*/
689
819
List *
690
820
SyncRepGetSyncStandbys (bool * am_sync )
@@ -944,8 +1074,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
944
1074
priority = next_highest_priority ;
945
1075
}
946
1076
947
- /* never reached, but keep compiler quiet */
948
- Assert (false);
1077
+ /*
1078
+ * We might get here if the set of sync_standby_priority values in shared
1079
+ * memory is inconsistent, as can happen transiently after a change in the
1080
+ * synchronous_standby_names setting. In that case, just return the
1081
+ * incomplete list we have so far. That will cause the caller to decide
1082
+ * there aren't enough synchronous candidates, which should be a safe
1083
+ * choice until the priority values become consistent again.
1084
+ */
1085
+ list_free (pending );
949
1086
return result ;
950
1087
}
951
1088