@@ -749,7 +749,7 @@ RumPageDeletePostingItem(Page page, OffsetNumber offset)
749749}
750750
751751/*
752- * checks space to install new value,
752+ * checks space to installat least one new value,
753753 * item pointer never deletes!
754754 */
755755static bool
@@ -762,8 +762,6 @@ dataIsEnoughSpace(RumBtree btree, Buffer buf, OffsetNumber off)
762762
763763if (RumPageIsLeaf (page ))
764764{
765- int n ,
766- j ;
767765ItemPointerData iptr = {{0 ,0 },0 };
768766Size size = 0 ;
769767
@@ -772,24 +770,8 @@ dataIsEnoughSpace(RumBtree btree, Buffer buf, OffsetNumber off)
772770 * encoding from zero item pointer. Also use worst case assumption
773771 * about alignment.
774772 */
775- n = RumPageGetOpaque (page )-> maxoff ;
776-
777- if (RumPageRightMost (page )&& off > n )
778- {
779- for (j = btree -> curitem ;j < btree -> nitem ;j ++ )
780- {
781- size = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
782- & btree -> items [j ],
783- (j == btree -> curitem ) ? (& iptr ) :& btree -> items [j - 1 ].iptr ,
784- btree -> rumstate ,size );
785- }
786- }
787- else
788- {
789- j = btree -> curitem ;
790- size = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
791- & btree -> items [j ],& iptr ,btree -> rumstate ,size );
792- }
773+ size = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
774+ btree -> items + btree -> curitem ,& iptr ,btree -> rumstate ,0 );
793775size += MAXIMUM_ALIGNOF ;
794776
795777if (RumPageGetOpaque (page )-> freespace >=size )
@@ -840,86 +822,127 @@ dataPlaceToPage(RumBtree btree, Page page, OffsetNumber off)
840822
841823if (RumPageIsLeaf (page ))
842824{
843- int i = 0 ,
844- j ,
845- max_j ;
846825Pointer ptr = RumDataPageGetData (page ),
847- copy_ptr = NULL ;
826+ copyPtr = NULL ;
848827ItemPointerData iptr = {{0 ,0 },0 };
849- RumKey copy_item ;
828+ RumKey copyItem ;
829+ bool copyItemEmpty = true;
850830char pageCopy [BLCKSZ ];
851831int maxoff = RumPageGetOpaque (page )-> maxoff ;
852- int freespace ;
832+ int freespace ,
833+ insertCount = 0 ;
834+ bool stopAppend = false;
853835
854836/*
855837 * We're going to prevent var-byte re-encoding of whole page. Find
856838 * position in page using page indexes.
857839 */
858840findInLeafPage (btree ,page ,& off ,& iptr ,& ptr );
859841
860- freespace = RumDataPageFreeSpacePre (page ,ptr );
861- Assert (freespace >=0 );
862-
863842if (off <=maxoff )
864843{
865844/*
866845 * Read next item-pointer with additional information: we'll have
867846 * to re-encode it. Copy previous part of page.
868847 */
869- memcpy (pageCopy , page , BLCKSZ );
870- copy_ptr = pageCopy + (ptr - page );
871- copy_item .iptr = iptr ;
848+ memcpy (pageCopy + ( ptr - page ), ptr , BLCKSZ - ( ptr - page ) );
849+ copyPtr = pageCopy + (ptr - page );
850+ copyItem .iptr = iptr ;
872851}
873852
874- /* Check how many items we're going to add */
875- if (RumPageRightMost (page )&& off > maxoff )
876- max_j = btree -> nitem ;
877- else
878- max_j = btree -> curitem + 1 ;
853+ freespace = RumPageGetOpaque (page )-> freespace ;
879854
880- /* Place items to the page while we have enough of space */
881- i = 0 ;
882- for (j = btree -> curitem ;j < max_j ;j ++ )
855+ /*
856+ * We are execute a merge join over old page and new items, but we
857+ * should stop inserting of new items if free space of page is ended
858+ * to put all old items back
859+ */
860+ while (42 )
883861{
884- Pointer ptr2 ;
862+ int cmp ;
885863
886- ptr2 = page + rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
887- & btree -> items [j ],& iptr ,btree -> rumstate ,ptr - page );
864+ /* get next item to copy if we pushed it on previous loop */
865+ if (copyItemEmpty == true&& off <=maxoff )
866+ {
867+ copyPtr = rumDataPageLeafRead (copyPtr ,btree -> entryAttnum ,
868+ & copyItem ,btree -> rumstate );
869+ copyItemEmpty = false;
870+ }
888871
889- freespace = RumDataPageFreeSpacePre (page ,ptr2 );
890- if (freespace < 0 )
872+ if (off <=maxoff && btree -> curitem < btree -> nitem )
873+ {
874+ if (stopAppend )
875+ cmp = -1 ;/* force copy */
876+ else
877+ cmp = compareRumKey (btree -> rumstate ,btree -> entryAttnum ,
878+ & copyItem ,
879+ btree -> items + btree -> curitem );
880+ }
881+ else if (btree -> curitem < btree -> nitem )
882+ {
883+ /* we copied all old items but we have to add more new items */
884+ if (stopAppend )
885+ /* there is no free space on page */
886+ break ;
887+ else
888+ /* force insertion of new item */
889+ cmp = 1 ;
890+ }
891+ else if (off <=maxoff )
892+ {
893+ /* force copy, we believe that all old items could be placed */
894+ cmp = -1 ;
895+ }
896+ else
897+ {
891898break ;
899+ }
892900
893- ptr = rumPlaceToDataPageLeaf (ptr ,btree -> entryAttnum ,
894- & btree -> items [j ],& iptr ,btree -> rumstate );
895- Assert (RumDataPageFreeSpacePre (page ,ptr ) >=0 );
896-
897- iptr = btree -> items [j ].iptr ;
898- btree -> curitem ++ ;
899- i ++ ;
900- }
901-
902- /* Place rest of the page back */
903- if (off <=maxoff )
904- {
905- for (j = off ;j <=maxoff ;j ++ )
901+ if (cmp <=0 )
906902{
907- copy_ptr = rumDataPageLeafRead (copy_ptr ,btree -> entryAttnum ,
908- & copy_item ,btree -> rumstate );
909- ptr = rumPlaceToDataPageLeaf (ptr ,btree -> entryAttnum ,& copy_item ,
903+ ptr = rumPlaceToDataPageLeaf (ptr ,btree -> entryAttnum ,
904+ & copyItem ,
910905& iptr ,btree -> rumstate );
911906
912- Assert (RumDataPageFreeSpacePre (page ,ptr ) >=0 );
907+ iptr = copyItem .iptr ;
908+ off ++ ;
909+ copyItemEmpty = true;
913910
914- iptr = copy_item .iptr ;
911+ if (cmp == 0 )
912+ {
913+ btree -> curitem ++ ;
914+ insertCount ++ ;
915+ }
916+ }
917+ else /* if (cmp > 0) */
918+ {
919+ int newItemSize ;
920+
921+ newItemSize = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
922+ btree -> items + btree -> curitem ,& iptr ,
923+ btree -> rumstate ,0 );
924+
925+ if (newItemSize <=freespace )
926+ {
927+ ptr = rumPlaceToDataPageLeaf (ptr ,btree -> entryAttnum ,
928+ btree -> items + btree -> curitem ,
929+ & iptr ,btree -> rumstate );
930+ iptr = btree -> items [btree -> curitem ].iptr ;
931+ btree -> curitem ++ ;
932+ freespace -= newItemSize ;
933+ insertCount ++ ;
934+ }
935+ else
936+ {
937+ stopAppend = true;
938+ }
915939}
916- }
917940
918- RumPageGetOpaque (page )-> maxoff += i ;
941+ Assert (RumDataPageFreeSpacePre (page ,ptr ) >=0 );
942+ }
919943
920- freespace = RumDataPageFreeSpacePre (page ,ptr );
921- if (freespace < 0 )
922- elog (ERROR ,"Not enough of space in leaf page!" );
944+ Assert (insertCount > 0 );
945+ RumPageGetOpaque (page )-> maxoff += insertCount ;
923946
924947/* Update indexes in the end of page */
925948updateItemIndexes (page ,btree -> entryAttnum ,btree -> rumstate );