88 * Portions Copyright (c) 1994, Regents of the University of California
99 *
1010 * IDENTIFICATION
11- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.5 2005/06/28 15:51:00 teodor Exp $
11+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.6 2005/06/30 17:52:14 teodor Exp $
1212 *-------------------------------------------------------------------------
1313 */
1414#include "postgres.h"
@@ -44,6 +44,7 @@ typedef struct {
4444
4545typedef struct gistIncompleteInsert {
4646RelFileNode node ;
47+ BlockNumber origblkno ;/* for splits */
4748ItemPointerData key ;
4849int lenblk ;
4950BlockNumber * blkno ;
@@ -79,6 +80,7 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
7980ninsert -> lenblk = lenblk ;
8081ninsert -> blkno = (BlockNumber * )palloc (sizeof (BlockNumber )* ninsert -> lenblk );
8182memcpy (ninsert -> blkno ,blkno ,sizeof (BlockNumber )* ninsert -> lenblk );
83+ ninsert -> origblkno = * blkno ;
8284}else {
8385int i ;
8486
@@ -87,6 +89,7 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
8789ninsert -> blkno = (BlockNumber * )palloc (sizeof (BlockNumber )* ninsert -> lenblk );
8890for (i = 0 ;i < ninsert -> lenblk ;i ++ )
8991ninsert -> blkno [i ]= xlinfo -> page [i ].header -> blkno ;
92+ ninsert -> origblkno = xlinfo -> data -> origblkno ;
9093}
9194Assert (ninsert -> lenblk > 0 );
9295
@@ -209,6 +212,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
209212
210213PageSetLSN (page ,lsn );
211214PageSetTLI (page ,ThisTimeLineID );
215+ GistPageGetOpaque (page )-> rightlink = InvalidBlockNumber ;
212216LockBuffer (buffer ,BUFFER_LOCK_UNLOCK );
213217WriteBuffer (buffer );
214218
@@ -466,81 +470,98 @@ gist_form_invalid_tuple(BlockNumber blkno) {
466470return tuple ;
467471}
468472
473+ static Buffer
474+ gistXLogReadAndLockBuffer (Relation r ,BlockNumber blkno ) {
475+ Buffer buffer = XLogReadBuffer ( false,r ,blkno );
476+ if (!BufferIsValid (buffer ))
477+ elog (PANIC ,"gistXLogReadAndLockBuffer: block %u unfound" ,blkno );
478+ if (PageIsNew ( (PageHeader )(BufferGetPage (buffer )) ) )
479+ elog (PANIC ,"gistXLogReadAndLockBuffer: uninitialized page %u" ,blkno );
480+
481+ return buffer ;
482+ }
483+
484+
469485static void
470486gixtxlogFindPath (Relation index ,gistIncompleteInsert * insert ) {
471- int i ;
472487GISTInsertStack * top ;
473488
474489insert -> pathlen = 0 ;
475490insert -> path = NULL ;
476491
477- for ( i = 0 ; insert -> lenblk ; i ++ ) {
478- if ( ( top = gistFindPath ( index , insert -> blkno [ i ], XLogReadBuffer )) != NULL ) {
479- GISTInsertStack * ptr = top ;
480- while (ptr ) {
481- insert -> pathlen ++ ;
482- ptr = ptr -> parent ;
483- }
492+ if ( ( top = gistFindPath ( index , insert -> origblkno , gistXLogReadAndLockBuffer )) != NULL ) {
493+ int i ;
494+ GISTInsertStack * ptr = top ;
495+ while (ptr ) {
496+ insert -> pathlen ++ ;
497+ ptr = ptr -> parent ;
498+ }
484499
485- insert -> path = (BlockNumber * )palloc (sizeof (BlockNumber )* insert -> pathlen );
500+ insert -> path = (BlockNumber * )palloc (sizeof (BlockNumber )* insert -> pathlen );
486501
487- i = 0 ;
488- ptr = top ;
489- while (ptr ) {
490- insert -> path [i ]= ptr -> blkno ;
491- i ++ ;
492- ptr = ptr -> parent ;
493- }
494- break ;
502+ i = 0 ;
503+ ptr = top ;
504+ while (ptr ) {
505+ insert -> path [i ]= ptr -> blkno ;
506+ i ++ ;
507+ ptr = ptr -> parent ;
495508}
496- }
509+ }else
510+ elog (LOG ,"gixtxlogFindPath: lost parent for block %u" ,insert -> origblkno );
497511}
498512
499513static void
500514gistContinueInsert (gistIncompleteInsert * insert ) {
501515IndexTuple * itup ;
502516int i ,lenitup ;
503- MemoryContext oldCxt ;
504517Relation index ;
505518
506- oldCxt = MemoryContextSwitchTo (opCtx );
507-
508519index = XLogOpenRelation (insert -> node );
509- if (!RelationIsValid (index ))
520+ if (!RelationIsValid (index ))
510521return ;
511522
512- elog (LOG ,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index" ,
513- insert -> node .spcNode ,insert -> node .dbNode ,insert -> node .relNode );
514-
515523/* needed vector itup never will be more than initial lenblkno+2,
516524 because during this processing Indextuple can be only smaller */
517525lenitup = insert -> lenblk ;
518526itup = (IndexTuple * )palloc (sizeof (IndexTuple )* (lenitup + 2 /*guarantee root split*/ ));
519527
520- for (i = 0 ;i < insert -> lenblk ;i ++ )
528+ for (i = 0 ;i < insert -> lenblk ;i ++ )
521529itup [i ]= gist_form_invalid_tuple (insert -> blkno [i ] );
522530
523- /* construct path */
524- gixtxlogFindPath (index ,insert );
525-
526- if (insert -> pathlen == 0 ) {
527- /*it was split root, so we should only make new root*/
531+ if (insert -> origblkno == GIST_ROOT_BLKNO ) {
532+ /*it was split root, so we should only make new root.
533+ it can't be simple insert into root, look at call
534+ pushIncompleteInsert in gistRedoPageSplitRecord */
528535Buffer buffer = XLogReadBuffer (true,index ,GIST_ROOT_BLKNO );
529536Page page ;
530537
531538if (!BufferIsValid (buffer ))
532539elog (PANIC ,"gistContinueInsert: root block unfound" );
533540
541+ page = BufferGetPage (buffer );
542+ if (XLByteLE (insert -> lsn ,PageGetLSN (page ))) {
543+ LockBuffer (buffer ,BUFFER_LOCK_UNLOCK );
544+ ReleaseBuffer (buffer );
545+ return ;
546+ }
547+
534548GISTInitBuffer (buffer ,0 );
535549page = BufferGetPage (buffer );
536550gistfillbuffer (index ,page ,itup ,lenitup ,FirstOffsetNumber );
551+ PageSetLSN (page ,insert -> lsn );
552+ PageSetTLI (page ,ThisTimeLineID );
537553LockBuffer (buffer ,BUFFER_LOCK_UNLOCK );
538554WriteBuffer (buffer );
539555}else {
540556Buffer * buffers ;
541557Page * pages ;
542558int numbuffer ;
543-
559+
560+ /* construct path */
561+ gixtxlogFindPath (index ,insert );
562+
563+ Assert (insert -> pathlen > 0 );
564+
544565buffers = (Buffer * )palloc (sizeof (Buffer )* (insert -> lenblk + 2 /*guarantee root split*/ ) );
545566pages = (Page * )palloc (sizeof (Page )* (insert -> lenblk + 2 /*guarantee root split*/ ) );
546567
@@ -555,6 +576,12 @@ gistContinueInsert(gistIncompleteInsert *insert) {
555576if (PageIsNew ((PageHeader )(pages [numbuffer - 1 ])) )
556577elog (PANIC ,"gistContinueInsert: uninitialized page" );
557578
579+ if (XLByteLE (insert -> lsn ,PageGetLSN (pages [numbuffer - 1 ]))) {
580+ LockBuffer (buffers [numbuffer - 1 ],BUFFER_LOCK_UNLOCK );
581+ ReleaseBuffer (buffers [numbuffer - 1 ]);
582+ return ;
583+ }
584+
558585pituplen = PageGetMaxOffsetNumber (pages [numbuffer - 1 ]);
559586
560587/* remove old IndexTuples */
@@ -587,9 +614,10 @@ gistContinueInsert(gistIncompleteInsert *insert) {
587614if (BufferGetBlockNumber (buffers [0 ] )== GIST_ROOT_BLKNO ) {
588615IndexTuple * parentitup ;
589616
617+ /* we split root, just copy tuples from old root to new page */
590618parentitup = gistextractbuffer (buffers [numbuffer - 1 ],& pituplen );
591619
592- /*we split root, just copy tuples from old root to new page */
620+ /*sanity check */
593621if (i + 1 != insert -> pathlen )
594622elog (PANIC ,"gistContinueInsert: can't restore index '%s'" ,
595623RelationGetRelationName (index ));
@@ -624,14 +652,15 @@ gistContinueInsert(gistIncompleteInsert *insert) {
624652itup [j ]= gist_form_invalid_tuple (BufferGetBlockNumber (buffers [j ] ) );
625653PageSetLSN (pages [j ],insert -> lsn );
626654PageSetTLI (pages [j ],ThisTimeLineID );
655+ GistPageGetOpaque (pages [j ])-> rightlink = InvalidBlockNumber ;
627656LockBuffer (buffers [j ],BUFFER_LOCK_UNLOCK );
628657WriteBuffer (buffers [j ] );
629658}
630659}
631660}
632661
633- MemoryContextSwitchTo ( oldCxt );
634- MemoryContextReset ( opCtx );
662+ elog ( LOG , "Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index" ,
663+ insert -> node . spcNode , insert -> node . dbNode , insert -> node . relNode );
635664}
636665
637666void
@@ -648,11 +677,22 @@ gist_xlog_startup(void) {
648677void
649678gist_xlog_cleanup (void ) {
650679ListCell * l ;
680+ List * reverse = NIL ;
681+ MemoryContext oldCxt = MemoryContextSwitchTo (insertCtx );
651682
652- foreach (l ,incomplete_inserts ) {
683+ /* we should call gistContinueInsert in reverse order */
684+
685+ foreach (l ,incomplete_inserts )
686+ reverse = lappend (reverse ,lfirst (l ));
687+
688+ MemoryContextSwitchTo (opCtx );
689+ foreach (l ,reverse ) {
653690gistIncompleteInsert * insert = (gistIncompleteInsert * )lfirst (l );
654691gistContinueInsert (insert );
692+ MemoryContextReset (opCtx );
655693}
694+ MemoryContextSwitchTo (oldCxt );
695+
656696MemoryContextDelete (opCtx );
657697MemoryContextDelete (insertCtx );
658698}