17
17
#include "access/gin_private.h"
18
18
#include "access/xloginsert.h"
19
19
#include "miscadmin.h"
20
+ #include "utils/memutils.h"
20
21
#include "utils/rel.h"
21
22
22
23
static void ginFindParents (GinBtree btree ,GinBtreeStack * stack );
@@ -310,27 +311,45 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
310
311
* Insert a new item to a page.
311
312
*
312
313
* Returns true if the insertion was finished. On false, the page was split and
313
- * the parent needs to be updated. (a root split returns true as it doesn't
314
- * need any further action by the caller to complete)
314
+ * the parent needs to be updated. (A root split returns true as it doesn't
315
+ * need any further action by the caller to complete. )
315
316
*
316
317
* When inserting a downlink to an internal page, 'childbuf' contains the
317
318
* child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
318
- * atomically with the insert. Also, the existing item atthe given location
319
- * is updated to point to' updateblkno' .
319
+ * atomically with the insert. Also, the existing item atoffset stack->off
320
+ *in the target page is updated to point to updateblkno.
320
321
*
321
322
* stack->buffer is locked on entry, and is kept locked.
323
+ * Likewise for childbuf, if given.
322
324
*/
323
325
static bool
324
326
ginPlaceToPage (GinBtree btree ,GinBtreeStack * stack ,
325
327
void * insertdata ,BlockNumber updateblkno ,
326
328
Buffer childbuf ,GinStatsData * buildStats )
327
329
{
328
330
Page page = BufferGetPage (stack -> buffer );
331
+ bool result ;
329
332
GinPlaceToPageRC rc ;
330
333
uint16 xlflags = 0 ;
331
334
Page childpage = NULL ;
332
335
Page newlpage = NULL ,
333
336
newrpage = NULL ;
337
+ void * ptp_workspace = NULL ;
338
+ MemoryContext tmpCxt ;
339
+ MemoryContext oldCxt ;
340
+
341
+ /*
342
+ * We do all the work of this function and its subfunctions in a temporary
343
+ * memory context. This avoids leakages and simplifies APIs, since some
344
+ * subfunctions allocate storage that has to survive until we've finished
345
+ * the WAL insertion.
346
+ */
347
+ tmpCxt = AllocSetContextCreate (CurrentMemoryContext ,
348
+ "ginPlaceToPage temporary context" ,
349
+ ALLOCSET_DEFAULT_MINSIZE ,
350
+ ALLOCSET_DEFAULT_INITSIZE ,
351
+ ALLOCSET_DEFAULT_MAXSIZE );
352
+ oldCxt = MemoryContextSwitchTo (tmpCxt );
334
353
335
354
if (GinPageIsData (page ))
336
355
xlflags |=GIN_INSERT_ISDATA ;
@@ -348,40 +367,42 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
348
367
}
349
368
350
369
/*
351
- * Try to put the incoming tuple on the page. placeToPage will decide if
352
- * the page needs to be split.
353
- *
354
- * WAL-logging this operation is a bit funny:
355
- *
356
- * We're responsible for calling XLogBeginInsert() and XLogInsert().
357
- * XLogBeginInsert() must be called before placeToPage, because
358
- * placeToPage can register some data to the WAL record.
359
- *
360
- * If placeToPage returns INSERTED, placeToPage has already called
361
- * START_CRIT_SECTION() and XLogBeginInsert(), and registered any data
362
- * required to replay the operation, in block index 0. We're responsible
363
- * for filling in the main data portion of the WAL record, calling
364
- * XLogInsert(), and END_CRIT_SECTION.
365
- *
366
- * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
367
- * Splits happen infrequently, so we just make a full-page image of all
368
- * the pages involved.
370
+ * See if the incoming tuple will fit on the page. beginPlaceToPage will
371
+ * decide if the page needs to be split, and will compute the split
372
+ * contents if so. See comments for beginPlaceToPage and execPlaceToPage
373
+ * functions for more details of the API here.
369
374
*/
370
- rc = btree -> placeToPage (btree ,stack -> buffer ,stack ,
371
- insertdata ,updateblkno ,
372
- & newlpage ,& newrpage );
373
- if (rc == UNMODIFIED )
375
+ rc = btree -> beginPlaceToPage (btree ,stack -> buffer ,stack ,
376
+ insertdata ,updateblkno ,
377
+ & ptp_workspace ,
378
+ & newlpage ,& newrpage );
379
+
380
+ if (rc == GPTP_NO_WORK )
374
381
{
375
- XLogResetInsertion ();
376
- return true;
382
+ /* Nothing to do */
383
+ result = true;
377
384
}
378
- else if (rc == INSERTED )
385
+ else if (rc == GPTP_INSERT )
379
386
{
380
- /* placeToPage did START_CRIT_SECTION() */
387
+ /* It will fit, perform the insertion */
388
+ START_CRIT_SECTION ();
389
+
390
+ if (RelationNeedsWAL (btree -> index ))
391
+ {
392
+ XLogBeginInsert ();
393
+ XLogRegisterBuffer (0 ,stack -> buffer ,REGBUF_STANDARD );
394
+ if (BufferIsValid (childbuf ))
395
+ XLogRegisterBuffer (1 ,childbuf ,REGBUF_STANDARD );
396
+ }
397
+
398
+ /* Perform the page update, and register any extra WAL data */
399
+ btree -> execPlaceToPage (btree ,stack -> buffer ,stack ,
400
+ insertdata ,updateblkno ,ptp_workspace );
401
+
381
402
MarkBufferDirty (stack -> buffer );
382
403
383
404
/* An insert to an internal page finishes the split of the child. */
384
- if (childbuf != InvalidBuffer )
405
+ if (BufferIsValid ( childbuf ) )
385
406
{
386
407
GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
387
408
MarkBufferDirty (childbuf );
@@ -393,21 +414,15 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
393
414
ginxlogInsert xlrec ;
394
415
BlockIdData childblknos [2 ];
395
416
396
- /*
397
- * placetopage already registered stack->buffer as block 0.
398
- */
399
417
xlrec .flags = xlflags ;
400
418
401
- if (childbuf != InvalidBuffer )
402
- XLogRegisterBuffer (1 ,childbuf ,REGBUF_STANDARD );
403
-
404
419
XLogRegisterData ((char * )& xlrec ,sizeof (ginxlogInsert ));
405
420
406
421
/*
407
422
* Log information about child if this was an insertion of a
408
423
* downlink.
409
424
*/
410
- if (childbuf != InvalidBuffer )
425
+ if (BufferIsValid ( childbuf ) )
411
426
{
412
427
BlockIdSet (& childblknos [0 ],BufferGetBlockNumber (childbuf ));
413
428
BlockIdSet (& childblknos [1 ],GinPageGetOpaque (childpage )-> rightlink );
@@ -417,23 +432,29 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
417
432
418
433
recptr = XLogInsert (RM_GIN_ID ,XLOG_GIN_INSERT );
419
434
PageSetLSN (page ,recptr );
420
- if (childbuf != InvalidBuffer )
435
+ if (BufferIsValid ( childbuf ) )
421
436
PageSetLSN (childpage ,recptr );
422
437
}
423
438
424
439
END_CRIT_SECTION ();
425
440
426
- return true;
441
+ /* Insertion is complete. */
442
+ result = true;
427
443
}
428
- else if (rc == SPLIT )
444
+ else if (rc == GPTP_SPLIT )
429
445
{
430
- /* Didn't fit, had to split */
446
+ /*
447
+ * Didn't fit, need to split. The split has been computed in newlpage
448
+ * and newrpage, which are pointers to palloc'd pages, not associated
449
+ * with buffers. stack->buffer is not touched yet.
450
+ */
431
451
Buffer rbuffer ;
432
452
BlockNumber savedRightLink ;
433
453
ginxlogSplit data ;
434
454
Buffer lbuffer = InvalidBuffer ;
435
455
Page newrootpg = NULL ;
436
456
457
+ /* Get a new index page to become the right page */
437
458
rbuffer = GinNewBuffer (btree -> index );
438
459
439
460
/* During index build, count the new page */
@@ -447,19 +468,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
447
468
448
469
savedRightLink = GinPageGetOpaque (page )-> rightlink ;
449
470
450
- /*
451
- * newlpage and newrpage are pointers to memory pages, not associated
452
- * with buffers. stack->buffer is not touched yet.
453
- */
454
-
471
+ /* Begin setting up WAL record */
455
472
data .node = btree -> index -> rd_node ;
456
473
data .flags = xlflags ;
457
- if (childbuf != InvalidBuffer )
474
+ if (BufferIsValid ( childbuf ) )
458
475
{
459
- Page childpage = BufferGetPage (childbuf );
460
-
461
- GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
462
-
463
476
data .leftChildBlkno = BufferGetBlockNumber (childbuf );
464
477
data .rightChildBlkno = GinPageGetOpaque (childpage )-> rightlink ;
465
478
}
@@ -469,12 +482,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
469
482
if (stack -> parent == NULL )
470
483
{
471
484
/*
472
- *split root, so we need to allocate new left page and place
473
- *pointer on root to left and right page
485
+ *splitting the root, so we need to allocate new left page and
486
+ *place pointers to left and right page on root page.
474
487
*/
475
488
lbuffer = GinNewBuffer (btree -> index );
476
489
477
- /* During index build, count thenewly-added root page */
490
+ /* During index build, count thenew left page */
478
491
if (buildStats )
479
492
{
480
493
if (btree -> isData )
@@ -491,9 +504,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
491
504
492
505
/*
493
506
* Construct a new root page containing downlinks to the new left
494
- * and right pages.(do this in a temporary copy first rather than
495
- * overwriting the original page directly,so that we can still
496
- *abort gracefully if this fails .)
507
+ * and right pages. (Do this in a temporary copy rather than
508
+ * overwriting the original page directly,since we're not in the
509
+ *critical section yet .)
497
510
*/
498
511
newrootpg = PageGetTempPage (newrpage );
499
512
GinInitPage (newrootpg ,GinPageGetOpaque (newlpage )-> flags & ~(GIN_LEAF |GIN_COMPRESSED ),BLCKSZ );
@@ -504,7 +517,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
504
517
}
505
518
else
506
519
{
507
- /*split non-root page */
520
+ /*splitting a non-root page */
508
521
data .rrlink = savedRightLink ;
509
522
510
523
GinPageGetOpaque (newrpage )-> rightlink = savedRightLink ;
@@ -513,41 +526,44 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
513
526
}
514
527
515
528
/*
516
- *Ok , we have the new contents of the left page in a temporary copy
517
- * now (newlpage), andthe newly-allocated right block has been filled
518
- *in . The original page is still unchanged.
529
+ *OK , we have the new contents of the left page in a temporary copy
530
+ * now (newlpage), andlikewise for the new contents of the
531
+ *newly-allocated right block . The original page is still unchanged.
519
532
*
520
533
* If this is a root split, we also have a temporary page containing
521
- * the new contents of the root. Copy the new left page to a
522
- * newly-allocated block, and initialize the (original) root page the
523
- * new copy. Otherwise, copy over the temporary copy of the new left
524
- * page over the old left page.
534
+ * the new contents of the root.
525
535
*/
526
536
527
537
START_CRIT_SECTION ();
528
538
529
539
MarkBufferDirty (rbuffer );
530
540
MarkBufferDirty (stack -> buffer );
531
- if (BufferIsValid (childbuf ))
532
- MarkBufferDirty (childbuf );
533
541
534
542
/*
535
- * Restore the temporary copies over the real buffers. But don't free
536
- * the temporary copies yet, WAL record data points to them.
543
+ * Restore the temporary copies over the real buffers.
537
544
*/
538
545
if (stack -> parent == NULL )
539
546
{
547
+ /* Splitting the root, three pages to update */
540
548
MarkBufferDirty (lbuffer );
541
- memcpy (BufferGetPage ( stack -> buffer ) ,newrootpg ,BLCKSZ );
549
+ memcpy (page ,newrootpg ,BLCKSZ );
542
550
memcpy (BufferGetPage (lbuffer ),newlpage ,BLCKSZ );
543
551
memcpy (BufferGetPage (rbuffer ),newrpage ,BLCKSZ );
544
552
}
545
553
else
546
554
{
547
- memcpy (BufferGetPage (stack -> buffer ),newlpage ,BLCKSZ );
555
+ /* Normal split, only two pages to update */
556
+ memcpy (page ,newlpage ,BLCKSZ );
548
557
memcpy (BufferGetPage (rbuffer ),newrpage ,BLCKSZ );
549
558
}
550
559
560
+ /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
561
+ if (BufferIsValid (childbuf ))
562
+ {
563
+ GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
564
+ MarkBufferDirty (childbuf );
565
+ }
566
+
551
567
/* write WAL record */
552
568
if (RelationNeedsWAL (btree -> index ))
553
569
{
@@ -572,12 +588,13 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
572
588
XLogRegisterBuffer (1 ,rbuffer ,REGBUF_FORCE_IMAGE |REGBUF_STANDARD );
573
589
}
574
590
if (BufferIsValid (childbuf ))
575
- XLogRegisterBuffer (3 ,childbuf ,0 );
591
+ XLogRegisterBuffer (3 ,childbuf ,REGBUF_STANDARD );
576
592
577
593
XLogRegisterData ((char * )& data ,sizeof (ginxlogSplit ));
578
594
579
595
recptr = XLogInsert (RM_GIN_ID ,XLOG_GIN_SPLIT );
580
- PageSetLSN (BufferGetPage (stack -> buffer ),recptr );
596
+
597
+ PageSetLSN (page ,recptr );
581
598
PageSetLSN (BufferGetPage (rbuffer ),recptr );
582
599
if (stack -> parent == NULL )
583
600
PageSetLSN (BufferGetPage (lbuffer ),recptr );
@@ -587,33 +604,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
587
604
END_CRIT_SECTION ();
588
605
589
606
/*
590
- * We can release thelock on theright page now, but keep the
591
- *original buffer locked.
607
+ * We can release thelocks/pins on thenew pages now, but keep
608
+ *stack-> buffer locked. childbuf doesn't get unlocked either .
592
609
*/
593
610
UnlockReleaseBuffer (rbuffer );
594
611
if (stack -> parent == NULL )
595
612
UnlockReleaseBuffer (lbuffer );
596
613
597
- pfree (newlpage );
598
- pfree (newrpage );
599
- if (newrootpg )
600
- pfree (newrootpg );
601
-
602
614
/*
603
615
* If we split the root, we're done. Otherwise the split is not
604
616
* complete until the downlink for the new page has been inserted to
605
617
* the parent.
606
618
*/
607
- if (stack -> parent == NULL )
608
- return true;
609
- else
610
- return false;
619
+ result = (stack -> parent == NULL );
611
620
}
612
621
else
613
622
{
614
- elog (ERROR ,"unknown return code from GIN placeToPage method: %d" ,rc );
615
- return false;/* keep compiler quiet */
623
+ elog (ERROR ,"invalid return code from GIN placeToPage method: %d" ,rc );
624
+ result = false;/* keep compiler quiet */
616
625
}
626
+
627
+ /* Clean up temp context */
628
+ MemoryContextSwitchTo (oldCxt );
629
+ MemoryContextDelete (tmpCxt );
630
+
631
+ return result ;
617
632
}
618
633
619
634
/*