88 *
99 *
1010 * IDENTIFICATION
11- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.76 2001/01/24 19:42:48 momjian Exp $
11+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.77 2001/01/26 01:24:31 vadim Exp $
1212 *
1313 *-------------------------------------------------------------------------
1414 */
@@ -34,7 +34,9 @@ typedef struct
3434int best_delta ;/* best size delta so far */
3535}FindSplitData ;
3636
37- void _bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf );
37+ Buffer _bt_fixroot (Relation rel ,Buffer oldrootbuf ,bool release );
38+
39+ static Buffer _bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf );
3840
3941static TransactionId _bt_check_unique (Relation rel ,BTItem btitem ,
4042Relation heapRel ,Buffer buf ,
@@ -44,6 +46,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
4446int keysz ,ScanKey scankey ,
4547BTItem btitem ,
4648OffsetNumber afteritem );
49+ static void _bt_insertuple (Relation rel ,Buffer buf ,
50+ Size itemsz ,BTItem btitem ,OffsetNumber newitemoff );
4751static Buffer _bt_split (Relation rel ,Buffer buf ,OffsetNumber firstright ,
4852OffsetNumber newitemoff ,Size newitemsz ,
4953BTItem newitem ,bool newitemonleft ,
@@ -456,9 +460,14 @@ _bt_insertonpg(Relation rel,
456460
457461if (is_root )
458462{
463+ Buffer rootbuf ;
464+
459465Assert (stack == (BTStack )NULL );
460466/* create a new root node and release the split buffers */
461- _bt_newroot (rel ,buf ,rbuf );
467+ rootbuf = _bt_newroot (rel ,buf ,rbuf );
468+ _bt_wrtbuf (rel ,rootbuf );
469+ _bt_wrtbuf (rel ,rbuf );
470+ _bt_wrtbuf (rel ,buf );
462471}
463472else
464473{
@@ -519,52 +528,11 @@ _bt_insertonpg(Relation rel,
519528}
520529else
521530{
522- START_CRIT_SECTION ();
523- _bt_pgaddtup (rel ,page ,itemsz ,btitem ,newitemoff ,"page" );
524531itup_off = newitemoff ;
525532itup_blkno = BufferGetBlockNumber (buf );
526- /* XLOG stuff */
527- {
528- xl_btree_insert xlrec ;
529- uint8 flag = XLOG_BTREE_INSERT ;
530- XLogRecPtr recptr ;
531- XLogRecData rdata [2 ];
532- BTItemData truncitem ;
533533
534- xlrec .target .node = rel -> rd_node ;
535- ItemPointerSet (& (xlrec .target .tid ),BufferGetBlockNumber (buf ),newitemoff );
536- rdata [0 ].buffer = InvalidBuffer ;
537- rdata [0 ].data = (char * )& xlrec ;
538- rdata [0 ].len = SizeOfBtreeInsert ;
539- rdata [0 ].next = & (rdata [1 ]);
540-
541- /* Read comments in _bt_pgaddtup */
542- if (!(P_ISLEAF (lpageop ))& & newitemoff == P_FIRSTDATAKEY (lpageop ))
543- {
544- truncitem = * btitem ;
545- truncitem .bti_itup .t_info = sizeof (BTItemData );
546- rdata [1 ].data = (char * )& truncitem ;
547- rdata [1 ].len = sizeof (BTItemData );
548- }
549- else
550- {
551- rdata [1 ].data = (char * )btitem ;
552- rdata [1 ].len = IndexTupleDSize (btitem -> bti_itup )+
553- (sizeof (BTItemData )- sizeof (IndexTupleData ));
554- }
555- rdata [1 ].buffer = buf ;
556- rdata [1 ].next = NULL ;
534+ _bt_insertuple (rel ,buf ,itemsz ,btitem ,newitemoff );
557535
558- if (P_ISLEAF (lpageop ))
559- flag |=XLOG_BTREE_LEAF ;
560-
561- recptr = XLogInsert (RM_BTREE_ID ,flag ,rdata );
562-
563- PageSetLSN (page ,recptr );
564- PageSetSUI (page ,ThisStartUpID );
565- }
566-
567- END_CRIT_SECTION ();
568536/* Write out the updated page and release pin/lock */
569537_bt_wrtbuf (rel ,buf );
570538}
@@ -576,6 +544,57 @@ _bt_insertonpg(Relation rel,
576544return res ;
577545}
578546
547+ static void
548+ _bt_insertuple (Relation rel ,Buffer buf ,
549+ Size itemsz ,BTItem btitem ,OffsetNumber newitemoff )
550+ {
551+ Page page = BufferGetPage (buf );
552+ BTPageOpaque pageop = (BTPageOpaque )PageGetSpecialPointer (page );
553+
554+ START_CRIT_SECTION ();
555+ _bt_pgaddtup (rel ,page ,itemsz ,btitem ,newitemoff ,"page" );
556+ /* XLOG stuff */
557+ {
558+ xl_btree_insert xlrec ;
559+ uint8 flag = XLOG_BTREE_INSERT ;
560+ XLogRecPtr recptr ;
561+ XLogRecData rdata [2 ];
562+ BTItemData truncitem ;
563+ xlrec .target .node = rel -> rd_node ;
564+ ItemPointerSet (& (xlrec .target .tid ),BufferGetBlockNumber (buf ),newitemoff );
565+ rdata [0 ].buffer = InvalidBuffer ;
566+ rdata [0 ].data = (char * )& xlrec ;
567+ rdata [0 ].len = SizeOfBtreeInsert ;
568+ rdata [0 ].next = & (rdata [1 ]);
569+
570+ /* Read comments in _bt_pgaddtup */
571+ if (!(P_ISLEAF (pageop ))& & newitemoff == P_FIRSTDATAKEY (pageop ))
572+ {
573+ truncitem = * btitem ;
574+ truncitem .bti_itup .t_info = sizeof (BTItemData );
575+ rdata [1 ].data = (char * )& truncitem ;
576+ rdata [1 ].len = sizeof (BTItemData );
577+ }
578+ else
579+ {
580+ rdata [1 ].data = (char * )btitem ;
581+ rdata [1 ].len = IndexTupleDSize (btitem -> bti_itup )+
582+ (sizeof (BTItemData )- sizeof (IndexTupleData ));
583+ }
584+ rdata [1 ].buffer = buf ;
585+ rdata [1 ].next = NULL ;
586+ if (P_ISLEAF (pageop ))
587+ flag |=XLOG_BTREE_LEAF ;
588+
589+ recptr = XLogInsert (RM_BTREE_ID ,flag ,rdata );
590+
591+ PageSetLSN (page ,recptr );
592+ PageSetSUI (page ,ThisStartUpID );
593+ }
594+
595+ END_CRIT_SECTION ();
596+ }
597+
579598/*
580599 *_bt_split() -- split a page in the btree.
581600 *
@@ -1130,11 +1149,12 @@ _bt_getstackbuf(Relation rel, BTStack stack)
11301149 *graph.
11311150 *
11321151 *On entry, lbuf (the old root) and rbuf (its new peer) are write-
1133- *locked. On exit, a new root page exists with entries for the
1134- *two new children. The new root page is neither pinned nor locked, and
1135- *we have also written out lbuf and rbuf and dropped their pins/locks.
1152+ *locked. On exit, a new root page exists with entries for the
1153+ *two new children, metapage is updated and unlocked/unpinned.
1154+ * The new root buffer is returned to caller which has to unlock/unpin
1155+ * lbuf, rbuf & rootbuf.
11361156 */
1137- void
1157+ static Buffer
11381158_bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf )
11391159{
11401160Buffer rootbuf ;
@@ -1257,13 +1277,156 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
12571277}
12581278END_CRIT_SECTION ();
12591279
1260- /* write and let go of the new root buffer */
1261- _bt_wrtbuf (rel ,rootbuf );
1280+ /* write and let go of metapage buffer */
12621281_bt_wrtbuf (rel ,metabuf );
12631282
1264- /* update and release new sibling, and finally the old root */
1265- _bt_wrtbuf (rel ,rbuf );
1266- _bt_wrtbuf (rel ,lbuf );
1283+ return (rootbuf );
1284+ }
1285+
1286+ /*
1287+ * In the event old root page was splitted but no new one was created we
1288+ * build required parent levels keeping write lock on old root page.
1289+ * Note: it's assumed that old root page' btpo_parent points to meta page,
1290+ * ie not to parent page. On exit, new root page buffer is write locked.
1291+ * If "release" is TRUE then oldrootbuf will be released immediately
1292+ * after upper level is builded.
1293+ */
1294+ Buffer
1295+ _bt_fixroot (Relation rel ,Buffer oldrootbuf ,bool release )
1296+ {
1297+ Buffer rootbuf ;
1298+ BlockNumber rootblk ;
1299+ Page rootpage ;
1300+ XLogRecPtr rootLSN ;
1301+ Page oldrootpage = BufferGetPage (oldrootbuf );
1302+ BTPageOpaque oldrootopaque = (BTPageOpaque )
1303+ PageGetSpecialPointer (oldrootpage );
1304+ Buffer buf ,leftbuf ,rightbuf ;
1305+ Page page ,leftpage ,rightpage ;
1306+ BTPageOpaque opaque ,leftopaque ,rightopaque ;
1307+ OffsetNumber newitemoff ;
1308+ BTItem btitem ,ritem ;
1309+ Size itemsz ;
1310+
1311+ if (!P_LEFTMOST (oldrootopaque )|| P_RIGHTMOST (oldrootopaque ))
1312+ elog (ERROR ,"bt_fixroot: not valid old root page" );
1313+
1314+ /* Read right neighbor and create new root page*/
1315+ leftbuf = _bt_getbuf (rel ,oldrootopaque -> btpo_next ,BT_WRITE );
1316+ leftpage = BufferGetPage (leftbuf );
1317+ leftopaque = (BTPageOpaque )PageGetSpecialPointer (leftpage );
1318+ rootbuf = _bt_newroot (rel ,oldrootbuf ,leftbuf );
1319+ rootpage = BufferGetPage (rootbuf );
1320+ rootLSN = PageGetLSN (rootpage );
1321+ rootblk = BufferGetBlockNumber (rootbuf );
1322+
1323+ /*
1324+ * Update LSN & StartUpID of old root buffer and its neighbor to
1325+ * ensure that they will be written on disk after logging new
1326+ * root creation. Unfortunately, for the moment (?) we do not
1327+ * log this operation and so possibly break our rule to log entire
1328+ * page content of first after checkpoint modification.
1329+ */
1330+ HOLD_INTERRUPTS ();
1331+ oldrootopaque -> btpo_parent = rootblk ;
1332+ leftopaque -> btpo_parent = rootblk ;
1333+ PageSetLSN (oldrootpage ,rootLSN );
1334+ PageSetSUI (oldrootpage ,ThisStartUpID );
1335+ PageSetLSN (leftpage ,rootLSN );
1336+ PageSetSUI (leftpage ,ThisStartUpID );
1337+ RESUME_INTERRUPTS ();
1338+
1339+ /* parent page where to insert pointers */
1340+ buf = rootbuf ;
1341+ page = BufferGetPage (buf );
1342+ opaque = (BTPageOpaque )PageGetSpecialPointer (page );
1343+
1344+ /*
1345+ * Now read other pages (if any) on level and add them to new root.
1346+ * If concurrent process will split one of pages on this level then it
1347+ * will notice either btpo_parent == metablock or btpo_parent == rootblk.
1348+ * In first case it will give up its locks and try to lock leftmost page
1349+ * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
1350+ * continue. In second case it will try to lock rootbuf keeping its locks
1351+ * on buffers we already passed, also waiting for us. If we'll have to
1352+ * unlock rootbuf (split it) and that process will have to split page
1353+ * of new level we created (level of rootbuf) then it will wait while
1354+ * we create upper level. Etc.
1355+ */
1356+ while (!P_RIGHTMOST (leftopaque ))
1357+ {
1358+ rightbuf = _bt_getbuf (rel ,leftopaque -> btpo_next ,BT_WRITE );
1359+ rightpage = BufferGetPage (rightbuf );
1360+ rightopaque = (BTPageOpaque )PageGetSpecialPointer (rightpage );
1361+
1362+ /* Update LSN & StartUpID (see comments above) */
1363+ HOLD_INTERRUPTS ();
1364+ rightopaque -> btpo_parent = rootblk ;
1365+ if (XLByteLT (PageGetLSN (rightpage ),rootLSN ))
1366+ PageSetLSN (rightpage ,rootLSN );
1367+ PageSetSUI (rightpage ,ThisStartUpID );
1368+ RESUME_INTERRUPTS ();
1369+
1370+ ritem = (BTItem )PageGetItem (leftpage ,PageGetItemId (leftpage ,P_HIKEY ));
1371+ btitem = _bt_formitem (& (ritem -> bti_itup ));
1372+ ItemPointerSet (& (btitem -> bti_itup .t_tid ),leftopaque -> btpo_next ,P_HIKEY );
1373+ itemsz = IndexTupleDSize (btitem -> bti_itup )
1374+ + (sizeof (BTItemData )- sizeof (IndexTupleData ));
1375+ itemsz = MAXALIGN (itemsz );
1376+
1377+ newitemoff = OffsetNumberNext (PageGetMaxOffsetNumber (page ));
1378+
1379+ if (PageGetFreeSpace (page )< itemsz )
1380+ {
1381+ Buffer newbuf ;
1382+ OffsetNumber firstright ;
1383+ OffsetNumber itup_off ;
1384+ BlockNumber itup_blkno ;
1385+ bool newitemonleft ;
1386+
1387+ firstright = _bt_findsplitloc (rel ,page ,
1388+ newitemoff ,itemsz ,& newitemonleft );
1389+ newbuf = _bt_split (rel ,buf ,firstright ,
1390+ newitemoff ,itemsz ,btitem ,newitemonleft ,
1391+ & itup_off ,& itup_blkno );
1392+ /* Keep lock on new "root" buffer ! */
1393+ if (buf != rootbuf )
1394+ _bt_relbuf (rel ,buf ,BT_WRITE );
1395+ buf = newbuf ;
1396+ page = BufferGetPage (buf );
1397+ opaque = (BTPageOpaque )PageGetSpecialPointer (page );
1398+ }
1399+ else
1400+ _bt_insertuple (rel ,buf ,itemsz ,btitem ,newitemoff );
1401+
1402+ /* give up left buffer */
1403+ _bt_relbuf (rel ,leftbuf ,BT_WRITE );
1404+ leftbuf = rightbuf ;
1405+ leftpage = rightpage ;
1406+ leftopaque = rightopaque ;
1407+ }
1408+
1409+ /* give up rightmost page buffer */
1410+ _bt_relbuf (rel ,leftbuf ,BT_WRITE );
1411+
1412+ /*
1413+ * Here we hold locks on old root buffer, new root buffer we've
1414+ * created with _bt_newroot() - rootbuf, - and buf we've used
1415+ * for last insert ops - buf. If rootbuf != buf then we have to
1416+ * create at least one more level. And if "release" is TRUE
1417+ * (ie we've already created some levels) then we give up
1418+ * oldrootbuf.
1419+ */
1420+ if (release )
1421+ _bt_relbuf (rel ,oldrootbuf ,BT_WRITE );
1422+
1423+ if (rootbuf != buf )
1424+ {
1425+ _bt_relbuf (rel ,buf ,BT_WRITE );
1426+ return (_bt_fixroot (rel ,rootbuf , true));
1427+ }
1428+
1429+ return (rootbuf );
12671430}
12681431
12691432/*