Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1260541

Browse files
committed
Use dlists instead of SHM_QUEUE for syncrep queue
Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely usedand have an easier to use interface.Reviewed-by: Thomas Munro <thomas.munro@gmail.com> (in an older version)Discussion:https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.deDiscussion:https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de
1 parent5764f61 commit1260541

File tree

5 files changed

+41
-57
lines changed

5 files changed

+41
-57
lines changed

‎src/backend/replication/syncrep.c

Lines changed: 36 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
182182
else
183183
mode=Min(SyncRepWaitMode,SYNC_REP_WAIT_FLUSH);
184184

185-
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
185+
Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
186186
Assert(WalSndCtl!=NULL);
187187

188188
LWLockAcquire(SyncRepLock,LW_EXCLUSIVE);
@@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
318318
* assertions, but better safe than sorry).
319319
*/
320320
pg_read_barrier();
321-
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
321+
Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
322322
MyProc->syncRepState=SYNC_REP_NOT_WAITING;
323323
MyProc->waitLSN=0;
324324

@@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
339339
staticvoid
340340
SyncRepQueueInsert(intmode)
341341
{
342-
PGPROC*proc;
342+
dlist_head*queue;
343+
dlist_iteriter;
343344

344345
Assert(mode >=0&&mode<NUM_SYNC_REP_WAIT_MODE);
345-
proc= (PGPROC*)SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
346-
&(WalSndCtl->SyncRepQueue[mode]),
347-
offsetof(PGPROC,syncRepLinks));
346+
queue=&WalSndCtl->SyncRepQueue[mode];
348347

349-
while (proc)
348+
dlist_reverse_foreach(iter,queue)
350349
{
350+
PGPROC*proc=dlist_container(PGPROC,syncRepLinks,iter.cur);
351+
351352
/*
352-
* Stop at the queue element that we should after to ensure the queue
353-
* is ordered by LSN.
353+
* Stop at the queue element that we shouldinsertafter to ensure the
354+
*queueis ordered by LSN.
354355
*/
355356
if (proc->waitLSN<MyProc->waitLSN)
356-
break;
357-
358-
proc= (PGPROC*)SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
359-
&(proc->syncRepLinks),
360-
offsetof(PGPROC,syncRepLinks));
357+
{
358+
dlist_insert_after(&proc->syncRepLinks,&MyProc->syncRepLinks);
359+
return;
360+
}
361361
}
362362

363-
if (proc)
364-
SHMQueueInsertAfter(&(proc->syncRepLinks),&(MyProc->syncRepLinks));
365-
else
366-
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]),&(MyProc->syncRepLinks));
363+
/*
364+
* If we get here, the list was either empty, or this process needs to be
365+
* at the head.
366+
*/
367+
dlist_push_head(queue,&MyProc->syncRepLinks);
367368
}
368369

369370
/*
@@ -373,8 +374,8 @@ static void
373374
SyncRepCancelWait(void)
374375
{
375376
LWLockAcquire(SyncRepLock,LW_EXCLUSIVE);
376-
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
377-
SHMQueueDelete(&(MyProc->syncRepLinks));
377+
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
378+
dlist_delete_thoroughly(&MyProc->syncRepLinks);
378379
MyProc->syncRepState=SYNC_REP_NOT_WAITING;
379380
LWLockRelease(SyncRepLock);
380381
}
@@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
386387
* First check if we are removed from the queue without the lock to not
387388
* slow down backend exit.
388389
*/
389-
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
390+
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
390391
{
391392
LWLockAcquire(SyncRepLock,LW_EXCLUSIVE);
392393

393394
/* maybe we have just been removed, so recheck */
394-
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
395-
SHMQueueDelete(&(MyProc->syncRepLinks));
395+
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
396+
dlist_delete_thoroughly(&MyProc->syncRepLinks);
396397

397398
LWLockRelease(SyncRepLock);
398399
}
@@ -879,39 +880,27 @@ static int
879880
SyncRepWakeQueue(boolall,intmode)
880881
{
881882
volatileWalSndCtlData*walsndctl=WalSndCtl;
882-
PGPROC*proc=NULL;
883-
PGPROC*thisproc=NULL;
884883
intnumprocs=0;
884+
dlist_mutable_iteriter;
885885

886886
Assert(mode >=0&&mode<NUM_SYNC_REP_WAIT_MODE);
887887
Assert(LWLockHeldByMeInMode(SyncRepLock,LW_EXCLUSIVE));
888888
Assert(SyncRepQueueIsOrderedByLSN(mode));
889889

890-
proc= (PGPROC*)SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
891-
&(WalSndCtl->SyncRepQueue[mode]),
892-
offsetof(PGPROC,syncRepLinks));
893-
894-
while (proc)
890+
dlist_foreach_modify(iter,&WalSndCtl->SyncRepQueue[mode])
895891
{
892+
PGPROC*proc=dlist_container(PGPROC,syncRepLinks,iter.cur);
893+
896894
/*
897895
* Assume the queue is ordered by LSN
898896
*/
899897
if (!all&&walsndctl->lsn[mode]<proc->waitLSN)
900898
returnnumprocs;
901899

902900
/*
903-
* Move to next proc, so we can delete thisproc from the queue.
904-
* thisproc is valid, proc may be NULL after this.
905-
*/
906-
thisproc=proc;
907-
proc= (PGPROC*)SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
908-
&(proc->syncRepLinks),
909-
offsetof(PGPROC,syncRepLinks));
910-
911-
/*
912-
* Remove thisproc from queue.
901+
* Remove from queue.
913902
*/
914-
SHMQueueDelete(&(thisproc->syncRepLinks));
903+
dlist_delete_thoroughly(&proc->syncRepLinks);
915904

916905
/*
917906
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
924913
* Set state to complete; see SyncRepWaitForLSN() for discussion of
925914
* the various states.
926915
*/
927-
thisproc->syncRepState=SYNC_REP_WAIT_COMPLETE;
916+
proc->syncRepState=SYNC_REP_WAIT_COMPLETE;
928917

929918
/*
930919
* Wake only when we have set state and removed from queue.
931920
*/
932-
SetLatch(&(thisproc->procLatch));
921+
SetLatch(&(proc->procLatch));
933922

934923
numprocs++;
935924
}
@@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
983972
staticbool
984973
SyncRepQueueIsOrderedByLSN(intmode)
985974
{
986-
PGPROC*proc=NULL;
987975
XLogRecPtrlastLSN;
976+
dlist_iteriter;
988977

989978
Assert(mode >=0&&mode<NUM_SYNC_REP_WAIT_MODE);
990979

991980
lastLSN=0;
992981

993-
proc= (PGPROC*)SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
994-
&(WalSndCtl->SyncRepQueue[mode]),
995-
offsetof(PGPROC,syncRepLinks));
996-
997-
while (proc)
982+
dlist_foreach(iter,&WalSndCtl->SyncRepQueue[mode])
998983
{
984+
PGPROC*proc=dlist_container(PGPROC,syncRepLinks,iter.cur);
985+
999986
/*
1000987
* Check the queue is ordered by LSN and that multiple procs don't
1001988
* have matching LSNs
@@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
1004991
return false;
1005992

1006993
lastLSN=proc->waitLSN;
1007-
1008-
proc= (PGPROC*)SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1009-
&(proc->syncRepLinks),
1010-
offsetof(PGPROC,syncRepLinks));
1011994
}
1012995

1013996
return true;

‎src/backend/replication/walsender.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3275,7 +3275,7 @@ WalSndShmemInit(void)
32753275
MemSet(WalSndCtl,0,WalSndShmemSize());
32763276

32773277
for (i=0;i<NUM_SYNC_REP_WAIT_MODE;i++)
3278-
SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3278+
dlist_init(&(WalSndCtl->SyncRepQueue[i]));
32793279

32803280
for (i=0;i<max_wal_senders;i++)
32813281
{

‎src/backend/storage/lmgr/proc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ InitProcess(void)
410410
/* Initialize fields for sync rep */
411411
MyProc->waitLSN=0;
412412
MyProc->syncRepState=SYNC_REP_NOT_WAITING;
413-
SHMQueueElemInit(&(MyProc->syncRepLinks));
413+
dlist_node_init(&MyProc->syncRepLinks);
414414

415415
/* Initialize fields for group XID clearing. */
416416
MyProc->procArrayGroupMember= false;

‎src/include/replication/walsender_private.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#define_WALSENDER_PRIVATE_H
1414

1515
#include"access/xlog.h"
16+
#include"lib/ilist.h"
1617
#include"nodes/nodes.h"
1718
#include"replication/syncrep.h"
1819
#include"storage/latch.h"
@@ -89,7 +90,7 @@ typedef struct
8990
* Synchronous replication queue with one queue per request type.
9091
* Protected by SyncRepLock.
9192
*/
92-
SHM_QUEUESyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
93+
dlist_headSyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
9394

9495
/*
9596
* Current location of the head of the queue. All waiters should have a

‎src/include/storage/proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ struct PGPROC
248248
*/
249249
XLogRecPtrwaitLSN;/* waiting for this LSN or higher */
250250
intsyncRepState;/* wait state for sync rep */
251-
SHM_QUEUEsyncRepLinks;/* list link if process is in syncrep queue */
251+
dlist_nodesyncRepLinks;/* list link if process is in syncrep queue */
252252

253253
/*
254254
* All PROCLOCK objects for locks held or awaited by this backend are

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp