|
103 | 103 | * until we reach either a notification from an uncommitted transaction or
|
104 | 104 | * the head pointer's position.
|
105 | 105 | *
|
106 |
| - * 6. To avoid SLRU wraparound and limit disk space consumption, the tail |
107 |
| - * pointer needs to be advanced so that old pages can be truncated. |
108 |
| - * This is relatively expensive (notably, it requires an exclusive lock), |
109 |
| - * so we don't want to do it often. We make sending backends do this work |
110 |
| - * if they advanced the queue head into a new page, but only once every |
111 |
| - * QUEUE_CLEANUP_DELAY pages. |
| 106 | + * 6. To limit disk space consumption, the tail pointer needs to be advanced |
| 107 | + * so that old pages can be truncated. This is relatively expensive |
| 108 | + * (notably, it requires an exclusive lock), so we don't want to do it |
| 109 | + * often. We make sending backends do this work if they advanced the queue |
| 110 | + * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages. |
112 | 111 | *
|
113 | 112 | * An application that listens on the same channel it notifies will get
|
114 | 113 | * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
|
|
120 | 119 | * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
|
121 | 120 | * can be varied without affecting anything but performance. The maximum
|
122 | 121 | * amount of notification data that can be queued at one time is determined
|
123 |
| - * byslru.c's wraparound limit; see QUEUE_MAX_PAGE below. |
| 122 | + * bymax_notify_queue_pages GUC. |
124 | 123 | *-------------------------------------------------------------------------
|
125 | 124 | */
|
126 | 125 |
|
@@ -312,23 +311,8 @@ static SlruCtlData NotifyCtlData;
|
312 | 311 |
|
313 | 312 | #defineNotifyCtl(&NotifyCtlData)
|
314 | 313 | #defineQUEUE_PAGESIZEBLCKSZ
|
315 |
| -#defineQUEUE_FULL_WARN_INTERVAL5000/* warn at most once every 5s */ |
316 | 314 |
|
317 |
| -/* |
318 |
| - * Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages |
319 |
| - * which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1. |
320 |
| - * We could use as many segments as SlruScanDirectory() allows, but this gives |
321 |
| - * us so much space already that it doesn't seem worth the trouble. |
322 |
| - * |
323 |
| - * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 |
324 |
| - * pages, because more than that would confuse slru.c into thinking there |
325 |
| - * was a wraparound condition. With the default BLCKSZ this means there |
326 |
| - * can be up to 8GB of queued-and-not-read data. |
327 |
| - * |
328 |
| - * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of |
329 |
| - * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour. |
330 |
| - */ |
331 |
| -#defineQUEUE_MAX_PAGE(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) |
| 315 | +#defineQUEUE_FULL_WARN_INTERVAL5000/* warn at most once every 5s */ |
332 | 316 |
|
333 | 317 | /*
|
334 | 318 | * listenChannels identifies the channels we are actually listening to
|
@@ -439,12 +423,15 @@ static bool amRegisteredListener = false;
|
439 | 423 | /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
|
440 | 424 | staticbooltryAdvanceTail= false;
|
441 | 425 |
|
442 |
| -/* GUCparameter */ |
| 426 | +/* GUCparameters */ |
443 | 427 | boolTrace_notify= false;
|
444 | 428 |
|
| 429 | +/* For 8 KB pages this gives 8 GB of disk space */ |
| 430 | +intmax_notify_queue_pages=1048576; |
| 431 | + |
445 | 432 | /* local function prototypes */
|
446 |
| -staticint64asyncQueuePageDiff(int64p,int64q); |
447 |
| -staticboolasyncQueuePagePrecedes(int64p,int64q); |
| 433 | +staticinlineint64asyncQueuePageDiff(int64p,int64q); |
| 434 | +staticinlineboolasyncQueuePagePrecedes(int64p,int64q); |
448 | 435 | staticvoidqueue_listen(ListenActionKindaction,constchar*channel);
|
449 | 436 | staticvoidAsync_UnlistenOnExit(intcode,Datumarg);
|
450 | 437 | staticvoidExec_ListenPreCommit(void);
|
@@ -474,39 +461,23 @@ static intnotification_match(const void *key1, const void *key2, Size keysize);
|
474 | 461 | staticvoidClearPendingActionsAndNotifies(void);
|
475 | 462 |
|
476 | 463 | /*
|
477 |
| - * Compute the difference between two queue page numbers (i.e., p - q), |
478 |
| - *accountingfor wraparound. |
| 464 | + * Compute the difference between two queue page numbers. |
| 465 | + *Previously this function accountedfor a wraparound. |
479 | 466 | */
|
480 |
| -staticint64 |
| 467 | +staticinlineint64 |
481 | 468 | asyncQueuePageDiff(int64p,int64q)
|
482 | 469 | {
|
483 |
| -int64diff; |
484 |
| - |
485 |
| -/* |
486 |
| - * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be |
487 |
| - * in the range 0..QUEUE_MAX_PAGE. |
488 |
| - */ |
489 |
| -Assert(p >=0&&p <=QUEUE_MAX_PAGE); |
490 |
| -Assert(q >=0&&q <=QUEUE_MAX_PAGE); |
491 |
| - |
492 |
| -diff=p-q; |
493 |
| -if (diff >= ((QUEUE_MAX_PAGE+1) /2)) |
494 |
| -diff-=QUEUE_MAX_PAGE+1; |
495 |
| -elseif (diff<-((QUEUE_MAX_PAGE+1) /2)) |
496 |
| -diff+=QUEUE_MAX_PAGE+1; |
497 |
| -returndiff; |
| 470 | +returnp-q; |
498 | 471 | }
|
499 | 472 |
|
500 | 473 | /*
|
501 |
| - * Is p < q, accounting for wraparound? |
502 |
| - * |
503 |
| - * Since asyncQueueIsFull() blocks creation of a page that could precede any |
504 |
| - * extant page, we need not assess entries within a page. |
| 474 | + * Determines whether p precedes q. |
| 475 | + * Previously this function accounted for a wraparound. |
505 | 476 | */
|
506 |
| -staticbool |
| 477 | +staticinlinebool |
507 | 478 | asyncQueuePagePrecedes(int64p,int64q)
|
508 | 479 | {
|
509 |
| -returnasyncQueuePageDiff(p,q)<0; |
| 480 | +returnp<q; |
510 | 481 | }
|
511 | 482 |
|
512 | 483 | /*
|
@@ -566,12 +537,13 @@ AsyncShmemInit(void)
|
566 | 537 | }
|
567 | 538 |
|
568 | 539 | /*
|
569 |
| - * Set up SLRU management of the pg_notify data. |
| 540 | + * Set up SLRU management of the pg_notify data. Note that long segment |
| 541 | + * names are used in order to avoid wraparound. |
570 | 542 | */
|
571 | 543 | NotifyCtl->PagePrecedes=asyncQueuePagePrecedes;
|
572 | 544 | SimpleLruInit(NotifyCtl,"Notify",NUM_NOTIFY_BUFFERS,0,
|
573 | 545 | NotifySLRULock,"pg_notify",LWTRANCHE_NOTIFY_BUFFER,
|
574 |
| -SYNC_HANDLER_NONE,false); |
| 546 | +SYNC_HANDLER_NONE,true); |
575 | 547 |
|
576 | 548 | if (!found)
|
577 | 549 | {
|
@@ -1305,27 +1277,11 @@ asyncQueueUnregister(void)
|
1305 | 1277 | staticbool
|
1306 | 1278 | asyncQueueIsFull(void)
|
1307 | 1279 | {
|
1308 |
| -intnexthead; |
1309 |
| -intboundary; |
| 1280 | +intheadPage=QUEUE_POS_PAGE(QUEUE_HEAD); |
| 1281 | +inttailPage=QUEUE_POS_PAGE(QUEUE_TAIL); |
| 1282 | +intoccupied=headPage-tailPage; |
1310 | 1283 |
|
1311 |
| -/* |
1312 |
| - * The queue is full if creating a new head page would create a page that |
1313 |
| - * logically precedes the current global tail pointer, ie, the head |
1314 |
| - * pointer would wrap around compared to the tail. We cannot create such |
1315 |
| - * a head page for fear of confusing slru.c. For safety we round the tail |
1316 |
| - * pointer back to a segment boundary (truncation logic in |
1317 |
| - * asyncQueueAdvanceTail does not do this, so doing it here is optional). |
1318 |
| - * |
1319 |
| - * Note that this test is *not* dependent on how much space there is on |
1320 |
| - * the current head page. This is necessary because asyncQueueAddEntries |
1321 |
| - * might try to create the next head page in any case. |
1322 |
| - */ |
1323 |
| -nexthead=QUEUE_POS_PAGE(QUEUE_HEAD)+1; |
1324 |
| -if (nexthead>QUEUE_MAX_PAGE) |
1325 |
| -nexthead=0;/* wrap around */ |
1326 |
| -boundary=QUEUE_STOP_PAGE; |
1327 |
| -boundary-=boundary %SLRU_PAGES_PER_SEGMENT; |
1328 |
| -returnasyncQueuePagePrecedes(nexthead,boundary); |
| 1284 | +returnoccupied >=max_notify_queue_pages; |
1329 | 1285 | }
|
1330 | 1286 |
|
1331 | 1287 | /*
|
@@ -1355,8 +1311,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
|
1355 | 1311 | if (offset+QUEUEALIGN(AsyncQueueEntryEmptySize)>QUEUE_PAGESIZE)
|
1356 | 1312 | {
|
1357 | 1313 | pageno++;
|
1358 |
| -if (pageno>QUEUE_MAX_PAGE) |
1359 |
| -pageno=0;/* wrap around */ |
1360 | 1314 | offset=0;
|
1361 | 1315 | pageJump= true;
|
1362 | 1316 | }
|
@@ -1433,9 +1387,6 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
1433 | 1387 | * If this is the first write since the postmaster started, we need to
|
1434 | 1388 | * initialize the first page of the async SLRU. Otherwise, the current
|
1435 | 1389 | * page should be initialized already, so just fetch it.
|
1436 |
| - * |
1437 |
| - * (We could also take the first path when the SLRU position has just |
1438 |
| - * wrapped around, but re-zeroing the page is harmless in that case.) |
1439 | 1390 | */
|
1440 | 1391 | pageno=QUEUE_POS_PAGE(queue_head);
|
1441 | 1392 | if (QUEUE_POS_IS_ZERO(queue_head))
|
@@ -1548,20 +1499,12 @@ asyncQueueUsage(void)
|
1548 | 1499 | {
|
1549 | 1500 | intheadPage=QUEUE_POS_PAGE(QUEUE_HEAD);
|
1550 | 1501 | inttailPage=QUEUE_POS_PAGE(QUEUE_TAIL);
|
1551 |
| -intoccupied; |
1552 |
| - |
1553 |
| -occupied=headPage-tailPage; |
| 1502 | +intoccupied=headPage-tailPage; |
1554 | 1503 |
|
1555 | 1504 | if (occupied==0)
|
1556 | 1505 | return (double)0;/* fast exit for common case */
|
1557 | 1506 |
|
1558 |
| -if (occupied<0) |
1559 |
| -{ |
1560 |
| -/* head has wrapped around, tail not yet */ |
1561 |
| -occupied+=QUEUE_MAX_PAGE+1; |
1562 |
| -} |
1563 |
| - |
1564 |
| -return (double)occupied / (double) ((QUEUE_MAX_PAGE+1) /2); |
| 1507 | +return (double)occupied / (double)max_notify_queue_pages; |
1565 | 1508 | }
|
1566 | 1509 |
|
1567 | 1510 | /*
|
@@ -2209,11 +2152,6 @@ asyncQueueAdvanceTail(void)
|
2209 | 2152 | */
|
2210 | 2153 | SimpleLruTruncate(NotifyCtl,newtailpage);
|
2211 | 2154 |
|
2212 |
| -/* |
2213 |
| - * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict |
2214 |
| - * for the segment immediately prior to the old tail, allowing fresh |
2215 |
| - * data into that segment. |
2216 |
| - */ |
2217 | 2155 | LWLockAcquire(NotifyQueueLock,LW_EXCLUSIVE);
|
2218 | 2156 | QUEUE_STOP_PAGE=newtailpage;
|
2219 | 2157 | LWLockRelease(NotifyQueueLock);
|
|