Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb1332e9

Browse files
committed
Put the logic to decide which synchronous standby is active into a function.
This avoids duplicating the code.Michael Paquier, reviewed by Simon Riggs and me
1 parent7afc233 commitb1332e9

File tree

3 files changed

+79
-65
lines changed

3 files changed

+79
-65
lines changed

‎src/backend/replication/syncrep.c

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* Synchronous replication is new as of PostgreSQL 9.1.
66
*
77
* If requested, transaction commits wait until their commit LSN is
8-
* acknowledged by thesync standby.
8+
* acknowledged by thesynchronous standby.
99
*
1010
* This module contains the code for waiting and release of backends.
1111
* All code in this module executes on the primary. The core streaming
@@ -357,6 +357,60 @@ SyncRepInitConfig(void)
357357
}
358358
}
359359

360+
/*
361+
* Find the WAL sender servicing the synchronous standby with the lowest
362+
* priority value, or NULL if no synchronous standby is connected. If there
363+
* are multiple standbys with the same lowest priority value, the first one
364+
* found is selected. The caller must hold SyncRepLock.
365+
*/
366+
WalSnd*
367+
SyncRepGetSynchronousStandby(void)
368+
{
369+
WalSnd*result=NULL;
370+
intresult_priority=0;
371+
inti;
372+
373+
for (i=0;i<max_wal_senders;i++)
374+
{
375+
/* Use volatile pointer to prevent code rearrangement */
376+
volatileWalSnd*walsnd=&WalSndCtl->walsnds[i];
377+
intthis_priority;
378+
379+
/* Must be active */
380+
if (walsnd->pid==0)
381+
continue;
382+
383+
/* Must be streaming */
384+
if (walsnd->state!=WALSNDSTATE_STREAMING)
385+
continue;
386+
387+
/* Must be synchronous */
388+
this_priority=walsnd->sync_standby_priority;
389+
if (this_priority==0)
390+
continue;
391+
392+
/* Must have a lower priority value than any previous ones */
393+
if (result!=NULL&&result_priority <=this_priority)
394+
continue;
395+
396+
/* Must have a valid flush position */
397+
if (XLogRecPtrIsInvalid(walsnd->flush))
398+
continue;
399+
400+
result= (WalSnd*)walsnd;
401+
result_priority=this_priority;
402+
403+
/*
404+
* If priority is equal to 1, there cannot be any other WAL senders
405+
* with a lower priority, so we're done.
406+
*/
407+
if (this_priority==1)
408+
returnresult;
409+
}
410+
411+
returnresult;
412+
}
413+
360414
/*
361415
* Update the LSNs on each queue based upon our latest state. This
362416
* implements a simple policy of first-valid-standby-releases-waiter.
@@ -368,11 +422,9 @@ void
368422
SyncRepReleaseWaiters(void)
369423
{
370424
volatileWalSndCtlData*walsndctl=WalSndCtl;
371-
volatileWalSnd*syncWalSnd=NULL;
425+
WalSnd*syncWalSnd;
372426
intnumwrite=0;
373427
intnumflush=0;
374-
intpriority=0;
375-
inti;
376428

377429
/*
378430
* If this WALSender is serving a standby that is not on the list of
@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void)
387439

388440
/*
389441
* We're a potential sync standby. Release waiters if we are the highest
390-
* priority standby. If there are multiple standbys with same priorities
391-
* then we use the first mentioned standby. If you change this, also
392-
* change pg_stat_get_wal_senders().
442+
* priority standby.
393443
*/
394444
LWLockAcquire(SyncRepLock,LW_EXCLUSIVE);
445+
syncWalSnd=SyncRepGetSynchronousStandby();
395446

396-
for (i=0;i<max_wal_senders;i++)
397-
{
398-
/* use volatile pointer to prevent code rearrangement */
399-
volatileWalSnd*walsnd=&walsndctl->walsnds[i];
400-
401-
if (walsnd->pid!=0&&
402-
walsnd->state==WALSNDSTATE_STREAMING&&
403-
walsnd->sync_standby_priority>0&&
404-
(priority==0||
405-
priority>walsnd->sync_standby_priority)&&
406-
!XLogRecPtrIsInvalid(walsnd->flush))
407-
{
408-
priority=walsnd->sync_standby_priority;
409-
syncWalSnd=walsnd;
410-
}
411-
}
412-
413-
/*
414-
* We should have found ourselves at least.
415-
*/
416-
Assert(syncWalSnd);
447+
/* We should have found ourselves at least */
448+
Assert(syncWalSnd!=NULL);
417449

418450
/*
419451
* If we aren't managing the highest priority standby then just leave.

‎src/backend/replication/walsender.c

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
27412741
Tuplestorestate*tupstore;
27422742
MemoryContextper_query_ctx;
27432743
MemoryContextoldcontext;
2744-
int*sync_priority;
2745-
intpriority=0;
2746-
intsync_standby=-1;
2744+
WalSnd*sync_standby;
27472745
inti;
27482746

27492747
/* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
27722770
MemoryContextSwitchTo(oldcontext);
27732771

27742772
/*
2775-
* Get the priorities of sync standbys all in one go, to minimise lock
2776-
* acquisitions and to allow us to evaluate who is the current sync
2777-
* standby. This code must match the code in SyncRepReleaseWaiters().
2773+
* Get the currently active synchronous standby.
27782774
*/
2779-
sync_priority=palloc(sizeof(int)*max_wal_senders);
27802775
LWLockAcquire(SyncRepLock,LW_SHARED);
2781-
for (i=0;i<max_wal_senders;i++)
2782-
{
2783-
/* use volatile pointer to prevent code rearrangement */
2784-
volatileWalSnd*walsnd=&WalSndCtl->walsnds[i];
2785-
2786-
if (walsnd->pid!=0)
2787-
{
2788-
/*
2789-
* Treat a standby such as a pg_basebackup background process
2790-
* which always returns an invalid flush location, as an
2791-
* asynchronous standby.
2792-
*/
2793-
sync_priority[i]=XLogRecPtrIsInvalid(walsnd->flush) ?
2794-
0 :walsnd->sync_standby_priority;
2795-
2796-
if (walsnd->state==WALSNDSTATE_STREAMING&&
2797-
walsnd->sync_standby_priority>0&&
2798-
(priority==0||
2799-
priority>walsnd->sync_standby_priority)&&
2800-
!XLogRecPtrIsInvalid(walsnd->flush))
2801-
{
2802-
priority=walsnd->sync_standby_priority;
2803-
sync_standby=i;
2804-
}
2805-
}
2806-
}
2776+
sync_standby=SyncRepGetSynchronousStandby();
28072777
LWLockRelease(SyncRepLock);
28082778

28092779
for (i=0;i<max_wal_senders;i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
28142784
XLogRecPtrwrite;
28152785
XLogRecPtrflush;
28162786
XLogRecPtrapply;
2787+
intpriority;
28172788
WalSndStatestate;
28182789
Datumvalues[PG_STAT_GET_WAL_SENDERS_COLS];
28192790
boolnulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
28272798
write=walsnd->write;
28282799
flush=walsnd->flush;
28292800
apply=walsnd->apply;
2801+
priority=walsnd->sync_standby_priority;
28302802
SpinLockRelease(&walsnd->mutex);
28312803

28322804
memset(nulls,0,sizeof(nulls));
@@ -2857,23 +2829,29 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
28572829
nulls[5]= true;
28582830
values[5]=LSNGetDatum(apply);
28592831

2860-
values[6]=Int32GetDatum(sync_priority[i]);
2832+
/*
2833+
* Treat a standby such as a pg_basebackup background process
2834+
* which always returns an invalid flush location, as an
2835+
* asynchronous standby.
2836+
*/
2837+
priority=XLogRecPtrIsInvalid(walsnd->flush) ?0 :priority;
2838+
2839+
values[6]=Int32GetDatum(priority);
28612840

28622841
/*
28632842
* More easily understood version of standby state. This is purely
28642843
* informational, not different from priority.
28652844
*/
2866-
if (sync_priority[i]==0)
2845+
if (priority==0)
28672846
values[7]=CStringGetTextDatum("async");
2868-
elseif (i==sync_standby)
2847+
elseif (walsnd==sync_standby)
28692848
values[7]=CStringGetTextDatum("sync");
28702849
else
28712850
values[7]=CStringGetTextDatum("potential");
28722851
}
28732852

28742853
tuplestore_putvalues(tupstore,tupdesc,values,nulls);
28752854
}
2876-
pfree(sync_priority);
28772855

28782856
/* clean up and return the tuplestore */
28792857
tuplestore_donestoring(tupstore);

‎src/include/replication/syncrep.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
5050
/* called by various procs */
5151
externintSyncRepWakeQueue(boolall,intmode);
5252

53+
/* forward declaration to avoid pulling in walsender_private.h */
54+
structWalSnd;
55+
externstructWalSnd*SyncRepGetSynchronousStandby(void);
56+
5357
externboolcheck_synchronous_standby_names(char**newval,void**extra,GucSourcesource);
5458
externvoidassign_synchronous_commit(intnewval,void*extra);
5559

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp