1717#include "access/gin_private.h"
1818#include "access/xloginsert.h"
1919#include "miscadmin.h"
20+ #include "utils/memutils.h"
2021#include "utils/rel.h"
2122
2223static void ginFindParents (GinBtree btree ,GinBtreeStack * stack );
@@ -310,27 +311,45 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
310311 * Insert a new item to a page.
311312 *
312313 * Returns true if the insertion was finished. On false, the page was split and
313- * the parent needs to be updated. (a root split returns true as it doesn't
314- * need any further action by the caller to complete)
314+ * the parent needs to be updated. (A root split returns true as it doesn't
315+ * need any further action by the caller to complete. )
315316 *
316317 * When inserting a downlink to an internal page, 'childbuf' contains the
317318 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
318- * atomically with the insert. Also, the existing item atthe given location
319- * is updated to point to' updateblkno' .
319+ * atomically with the insert. Also, the existing item atoffset stack->off
320+ *in the target page is updated to point to updateblkno.
320321 *
321322 * stack->buffer is locked on entry, and is kept locked.
323+ * Likewise for childbuf, if given.
322324 */
323325static bool
324326ginPlaceToPage (GinBtree btree ,GinBtreeStack * stack ,
325327void * insertdata ,BlockNumber updateblkno ,
326328Buffer childbuf ,GinStatsData * buildStats )
327329{
328330Page page = BufferGetPage (stack -> buffer );
331+ bool result ;
329332GinPlaceToPageRC rc ;
330333uint16 xlflags = 0 ;
331334Page childpage = NULL ;
332335Page newlpage = NULL ,
333336newrpage = NULL ;
337+ void * ptp_workspace = NULL ;
338+ MemoryContext tmpCxt ;
339+ MemoryContext oldCxt ;
340+
341+ /*
342+ * We do all the work of this function and its subfunctions in a temporary
343+ * memory context. This avoids leakages and simplifies APIs, since some
344+ * subfunctions allocate storage that has to survive until we've finished
345+ * the WAL insertion.
346+ */
347+ tmpCxt = AllocSetContextCreate (CurrentMemoryContext ,
348+ "ginPlaceToPage temporary context" ,
349+ ALLOCSET_DEFAULT_MINSIZE ,
350+ ALLOCSET_DEFAULT_INITSIZE ,
351+ ALLOCSET_DEFAULT_MAXSIZE );
352+ oldCxt = MemoryContextSwitchTo (tmpCxt );
334353
335354if (GinPageIsData (page ))
336355xlflags |=GIN_INSERT_ISDATA ;
@@ -348,40 +367,42 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
348367}
349368
350369/*
351- * Try to put the incoming tuple on the page. placeToPage will decide if
352- * the page needs to be split.
353- *
354- * WAL-logging this operation is a bit funny:
355- *
356- * We're responsible for calling XLogBeginInsert() and XLogInsert().
357- * XLogBeginInsert() must be called before placeToPage, because
358- * placeToPage can register some data to the WAL record.
359- *
360- * If placeToPage returns INSERTED, placeToPage has already called
361- * START_CRIT_SECTION() and XLogBeginInsert(), and registered any data
362- * required to replay the operation, in block index 0. We're responsible
363- * for filling in the main data portion of the WAL record, calling
364- * XLogInsert(), and END_CRIT_SECTION.
365- *
366- * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
367- * Splits happen infrequently, so we just make a full-page image of all
368- * the pages involved.
370+ * See if the incoming tuple will fit on the page. beginPlaceToPage will
371+ * decide if the page needs to be split, and will compute the split
372+ * contents if so. See comments for beginPlaceToPage and execPlaceToPage
373+ * functions for more details of the API here.
369374 */
370- rc = btree -> placeToPage (btree ,stack -> buffer ,stack ,
371- insertdata ,updateblkno ,
372- & newlpage ,& newrpage );
373- if (rc == UNMODIFIED )
375+ rc = btree -> beginPlaceToPage (btree ,stack -> buffer ,stack ,
376+ insertdata ,updateblkno ,
377+ & ptp_workspace ,
378+ & newlpage ,& newrpage );
379+
380+ if (rc == GPTP_NO_WORK )
374381{
375- XLogResetInsertion ();
376- return true;
382+ /* Nothing to do */
383+ result = true;
377384}
378- else if (rc == INSERTED )
385+ else if (rc == GPTP_INSERT )
379386{
380- /* placeToPage did START_CRIT_SECTION() */
387+ /* It will fit, perform the insertion */
388+ START_CRIT_SECTION ();
389+
390+ if (RelationNeedsWAL (btree -> index ))
391+ {
392+ XLogBeginInsert ();
393+ XLogRegisterBuffer (0 ,stack -> buffer ,REGBUF_STANDARD );
394+ if (BufferIsValid (childbuf ))
395+ XLogRegisterBuffer (1 ,childbuf ,REGBUF_STANDARD );
396+ }
397+
398+ /* Perform the page update, and register any extra WAL data */
399+ btree -> execPlaceToPage (btree ,stack -> buffer ,stack ,
400+ insertdata ,updateblkno ,ptp_workspace );
401+
381402MarkBufferDirty (stack -> buffer );
382403
383404/* An insert to an internal page finishes the split of the child. */
384- if (childbuf != InvalidBuffer )
405+ if (BufferIsValid ( childbuf ) )
385406{
386407GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
387408MarkBufferDirty (childbuf );
@@ -393,21 +414,15 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
393414ginxlogInsert xlrec ;
394415BlockIdData childblknos [2 ];
395416
396- /*
397- * placetopage already registered stack->buffer as block 0.
398- */
399417xlrec .flags = xlflags ;
400418
401- if (childbuf != InvalidBuffer )
402- XLogRegisterBuffer (1 ,childbuf ,REGBUF_STANDARD );
403-
404419XLogRegisterData ((char * )& xlrec ,sizeof (ginxlogInsert ));
405420
406421/*
407422 * Log information about child if this was an insertion of a
408423 * downlink.
409424 */
410- if (childbuf != InvalidBuffer )
425+ if (BufferIsValid ( childbuf ) )
411426{
412427BlockIdSet (& childblknos [0 ],BufferGetBlockNumber (childbuf ));
413428BlockIdSet (& childblknos [1 ],GinPageGetOpaque (childpage )-> rightlink );
@@ -417,23 +432,29 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
417432
418433recptr = XLogInsert (RM_GIN_ID ,XLOG_GIN_INSERT );
419434PageSetLSN (page ,recptr );
420- if (childbuf != InvalidBuffer )
435+ if (BufferIsValid ( childbuf ) )
421436PageSetLSN (childpage ,recptr );
422437}
423438
424439END_CRIT_SECTION ();
425440
426- return true;
441+ /* Insertion is complete. */
442+ result = true;
427443}
428- else if (rc == SPLIT )
444+ else if (rc == GPTP_SPLIT )
429445{
430- /* Didn't fit, had to split */
446+ /*
447+ * Didn't fit, need to split. The split has been computed in newlpage
448+ * and newrpage, which are pointers to palloc'd pages, not associated
449+ * with buffers. stack->buffer is not touched yet.
450+ */
431451Buffer rbuffer ;
432452BlockNumber savedRightLink ;
433453ginxlogSplit data ;
434454Buffer lbuffer = InvalidBuffer ;
435455Page newrootpg = NULL ;
436456
457+ /* Get a new index page to become the right page */
437458rbuffer = GinNewBuffer (btree -> index );
438459
439460/* During index build, count the new page */
@@ -447,19 +468,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
447468
448469savedRightLink = GinPageGetOpaque (page )-> rightlink ;
449470
450- /*
451- * newlpage and newrpage are pointers to memory pages, not associated
452- * with buffers. stack->buffer is not touched yet.
453- */
454-
471+ /* Begin setting up WAL record */
455472data .node = btree -> index -> rd_node ;
456473data .flags = xlflags ;
457- if (childbuf != InvalidBuffer )
474+ if (BufferIsValid ( childbuf ) )
458475{
459- Page childpage = BufferGetPage (childbuf );
460-
461- GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
462-
463476data .leftChildBlkno = BufferGetBlockNumber (childbuf );
464477data .rightChildBlkno = GinPageGetOpaque (childpage )-> rightlink ;
465478}
@@ -469,12 +482,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
469482if (stack -> parent == NULL )
470483{
471484/*
472- *split root, so we need to allocate new left page and place
473- *pointer on root to left and right page
485+ *splitting the root, so we need to allocate new left page and
486+ *place pointers to left and right page on root page.
474487 */
475488lbuffer = GinNewBuffer (btree -> index );
476489
477- /* During index build, count thenewly-added root page */
490+ /* During index build, count thenew left page */
478491if (buildStats )
479492{
480493if (btree -> isData )
@@ -491,9 +504,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
491504
492505/*
493506 * Construct a new root page containing downlinks to the new left
494- * and right pages.(do this in a temporary copy first rather than
495- * overwriting the original page directly,so that we can still
496- *abort gracefully if this fails .)
507+ * and right pages. (Do this in a temporary copy rather than
508+ * overwriting the original page directly,since we're not in the
509+ *critical section yet .)
497510 */
498511newrootpg = PageGetTempPage (newrpage );
499512GinInitPage (newrootpg ,GinPageGetOpaque (newlpage )-> flags & ~(GIN_LEAF |GIN_COMPRESSED ),BLCKSZ );
@@ -504,7 +517,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
504517}
505518else
506519{
507- /*split non-root page */
520+ /*splitting a non-root page */
508521data .rrlink = savedRightLink ;
509522
510523GinPageGetOpaque (newrpage )-> rightlink = savedRightLink ;
@@ -513,41 +526,44 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
513526}
514527
515528/*
516- *Ok , we have the new contents of the left page in a temporary copy
517- * now (newlpage), andthe newly-allocated right block has been filled
518- *in . The original page is still unchanged.
529+ *OK , we have the new contents of the left page in a temporary copy
530+ * now (newlpage), andlikewise for the new contents of the
531+ *newly-allocated right block . The original page is still unchanged.
519532 *
520533 * If this is a root split, we also have a temporary page containing
521- * the new contents of the root. Copy the new left page to a
522- * newly-allocated block, and initialize the (original) root page the
523- * new copy. Otherwise, copy over the temporary copy of the new left
524- * page over the old left page.
534+ * the new contents of the root.
525535 */
526536
527537START_CRIT_SECTION ();
528538
529539MarkBufferDirty (rbuffer );
530540MarkBufferDirty (stack -> buffer );
531- if (BufferIsValid (childbuf ))
532- MarkBufferDirty (childbuf );
533541
534542/*
535- * Restore the temporary copies over the real buffers. But don't free
536- * the temporary copies yet, WAL record data points to them.
543+ * Restore the temporary copies over the real buffers.
537544 */
538545if (stack -> parent == NULL )
539546{
547+ /* Splitting the root, three pages to update */
540548MarkBufferDirty (lbuffer );
541- memcpy (BufferGetPage ( stack -> buffer ) ,newrootpg ,BLCKSZ );
549+ memcpy (page ,newrootpg ,BLCKSZ );
542550memcpy (BufferGetPage (lbuffer ),newlpage ,BLCKSZ );
543551memcpy (BufferGetPage (rbuffer ),newrpage ,BLCKSZ );
544552}
545553else
546554{
547- memcpy (BufferGetPage (stack -> buffer ),newlpage ,BLCKSZ );
555+ /* Normal split, only two pages to update */
556+ memcpy (page ,newlpage ,BLCKSZ );
548557memcpy (BufferGetPage (rbuffer ),newrpage ,BLCKSZ );
549558}
550559
560+ /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
561+ if (BufferIsValid (childbuf ))
562+ {
563+ GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
564+ MarkBufferDirty (childbuf );
565+ }
566+
551567/* write WAL record */
552568if (RelationNeedsWAL (btree -> index ))
553569{
@@ -572,12 +588,13 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
572588XLogRegisterBuffer (1 ,rbuffer ,REGBUF_FORCE_IMAGE |REGBUF_STANDARD );
573589}
574590if (BufferIsValid (childbuf ))
575- XLogRegisterBuffer (3 ,childbuf ,0 );
591+ XLogRegisterBuffer (3 ,childbuf ,REGBUF_STANDARD );
576592
577593XLogRegisterData ((char * )& data ,sizeof (ginxlogSplit ));
578594
579595recptr = XLogInsert (RM_GIN_ID ,XLOG_GIN_SPLIT );
580- PageSetLSN (BufferGetPage (stack -> buffer ),recptr );
596+
597+ PageSetLSN (page ,recptr );
581598PageSetLSN (BufferGetPage (rbuffer ),recptr );
582599if (stack -> parent == NULL )
583600PageSetLSN (BufferGetPage (lbuffer ),recptr );
@@ -587,33 +604,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
587604END_CRIT_SECTION ();
588605
589606/*
590- * We can release thelock on theright page now, but keep the
591- *original buffer locked.
607+ * We can release thelocks/pins on thenew pages now, but keep
608+ *stack-> buffer locked. childbuf doesn't get unlocked either .
592609 */
593610UnlockReleaseBuffer (rbuffer );
594611if (stack -> parent == NULL )
595612UnlockReleaseBuffer (lbuffer );
596613
597- pfree (newlpage );
598- pfree (newrpage );
599- if (newrootpg )
600- pfree (newrootpg );
601-
602614/*
603615 * If we split the root, we're done. Otherwise the split is not
604616 * complete until the downlink for the new page has been inserted to
605617 * the parent.
606618 */
607- if (stack -> parent == NULL )
608- return true;
609- else
610- return false;
619+ result = (stack -> parent == NULL );
611620}
612621else
613622{
614- elog (ERROR ,"unknown return code from GIN placeToPage method: %d" ,rc );
615- return false;/* keep compiler quiet */
623+ elog (ERROR ,"invalid return code from GIN placeToPage method: %d" ,rc );
624+ result = false;/* keep compiler quiet */
616625}
626+
627+ /* Clean up temp context */
628+ MemoryContextSwitchTo (oldCxt );
629+ MemoryContextDelete (tmpCxt );
630+
631+ return result ;
617632}
618633
619634/*