8
8
*
9
9
*
10
10
* IDENTIFICATION
11
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.76 2001/01/24 19:42:48 momjian Exp $
11
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.77 2001/01/26 01:24:31 vadim Exp $
12
12
*
13
13
*-------------------------------------------------------------------------
14
14
*/
@@ -34,7 +34,9 @@ typedef struct
34
34
int best_delta ;/* best size delta so far */
35
35
}FindSplitData ;
36
36
37
- void _bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf );
37
+ Buffer _bt_fixroot (Relation rel ,Buffer oldrootbuf ,bool release );
38
+
39
+ static Buffer _bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf );
38
40
39
41
static TransactionId _bt_check_unique (Relation rel ,BTItem btitem ,
40
42
Relation heapRel ,Buffer buf ,
@@ -44,6 +46,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
44
46
int keysz ,ScanKey scankey ,
45
47
BTItem btitem ,
46
48
OffsetNumber afteritem );
49
+ static void _bt_insertuple (Relation rel ,Buffer buf ,
50
+ Size itemsz ,BTItem btitem ,OffsetNumber newitemoff );
47
51
static Buffer _bt_split (Relation rel ,Buffer buf ,OffsetNumber firstright ,
48
52
OffsetNumber newitemoff ,Size newitemsz ,
49
53
BTItem newitem ,bool newitemonleft ,
@@ -456,9 +460,14 @@ _bt_insertonpg(Relation rel,
456
460
457
461
if (is_root )
458
462
{
463
+ Buffer rootbuf ;
464
+
459
465
Assert (stack == (BTStack )NULL );
460
466
/* create a new root node and release the split buffers */
461
- _bt_newroot (rel ,buf ,rbuf );
467
+ rootbuf = _bt_newroot (rel ,buf ,rbuf );
468
+ _bt_wrtbuf (rel ,rootbuf );
469
+ _bt_wrtbuf (rel ,rbuf );
470
+ _bt_wrtbuf (rel ,buf );
462
471
}
463
472
else
464
473
{
@@ -519,52 +528,11 @@ _bt_insertonpg(Relation rel,
519
528
}
520
529
else
521
530
{
522
- START_CRIT_SECTION ();
523
- _bt_pgaddtup (rel ,page ,itemsz ,btitem ,newitemoff ,"page" );
524
531
itup_off = newitemoff ;
525
532
itup_blkno = BufferGetBlockNumber (buf );
526
- /* XLOG stuff */
527
- {
528
- xl_btree_insert xlrec ;
529
- uint8 flag = XLOG_BTREE_INSERT ;
530
- XLogRecPtr recptr ;
531
- XLogRecData rdata [2 ];
532
- BTItemData truncitem ;
533
533
534
- xlrec .target .node = rel -> rd_node ;
535
- ItemPointerSet (& (xlrec .target .tid ),BufferGetBlockNumber (buf ),newitemoff );
536
- rdata [0 ].buffer = InvalidBuffer ;
537
- rdata [0 ].data = (char * )& xlrec ;
538
- rdata [0 ].len = SizeOfBtreeInsert ;
539
- rdata [0 ].next = & (rdata [1 ]);
540
-
541
- /* Read comments in _bt_pgaddtup */
542
- if (!(P_ISLEAF (lpageop ))& & newitemoff == P_FIRSTDATAKEY (lpageop ))
543
- {
544
- truncitem = * btitem ;
545
- truncitem .bti_itup .t_info = sizeof (BTItemData );
546
- rdata [1 ].data = (char * )& truncitem ;
547
- rdata [1 ].len = sizeof (BTItemData );
548
- }
549
- else
550
- {
551
- rdata [1 ].data = (char * )btitem ;
552
- rdata [1 ].len = IndexTupleDSize (btitem -> bti_itup )+
553
- (sizeof (BTItemData )- sizeof (IndexTupleData ));
554
- }
555
- rdata [1 ].buffer = buf ;
556
- rdata [1 ].next = NULL ;
534
+ _bt_insertuple (rel ,buf ,itemsz ,btitem ,newitemoff );
557
535
558
- if (P_ISLEAF (lpageop ))
559
- flag |=XLOG_BTREE_LEAF ;
560
-
561
- recptr = XLogInsert (RM_BTREE_ID ,flag ,rdata );
562
-
563
- PageSetLSN (page ,recptr );
564
- PageSetSUI (page ,ThisStartUpID );
565
- }
566
-
567
- END_CRIT_SECTION ();
568
536
/* Write out the updated page and release pin/lock */
569
537
_bt_wrtbuf (rel ,buf );
570
538
}
@@ -576,6 +544,57 @@ _bt_insertonpg(Relation rel,
576
544
return res ;
577
545
}
578
546
547
+ static void
548
+ _bt_insertuple (Relation rel ,Buffer buf ,
549
+ Size itemsz ,BTItem btitem ,OffsetNumber newitemoff )
550
+ {
551
+ Page page = BufferGetPage (buf );
552
+ BTPageOpaque pageop = (BTPageOpaque )PageGetSpecialPointer (page );
553
+
554
+ START_CRIT_SECTION ();
555
+ _bt_pgaddtup (rel ,page ,itemsz ,btitem ,newitemoff ,"page" );
556
+ /* XLOG stuff */
557
+ {
558
+ xl_btree_insert xlrec ;
559
+ uint8 flag = XLOG_BTREE_INSERT ;
560
+ XLogRecPtr recptr ;
561
+ XLogRecData rdata [2 ];
562
+ BTItemData truncitem ;
563
+ xlrec .target .node = rel -> rd_node ;
564
+ ItemPointerSet (& (xlrec .target .tid ),BufferGetBlockNumber (buf ),newitemoff );
565
+ rdata [0 ].buffer = InvalidBuffer ;
566
+ rdata [0 ].data = (char * )& xlrec ;
567
+ rdata [0 ].len = SizeOfBtreeInsert ;
568
+ rdata [0 ].next = & (rdata [1 ]);
569
+
570
+ /* Read comments in _bt_pgaddtup */
571
+ if (!(P_ISLEAF (pageop ))& & newitemoff == P_FIRSTDATAKEY (pageop ))
572
+ {
573
+ truncitem = * btitem ;
574
+ truncitem .bti_itup .t_info = sizeof (BTItemData );
575
+ rdata [1 ].data = (char * )& truncitem ;
576
+ rdata [1 ].len = sizeof (BTItemData );
577
+ }
578
+ else
579
+ {
580
+ rdata [1 ].data = (char * )btitem ;
581
+ rdata [1 ].len = IndexTupleDSize (btitem -> bti_itup )+
582
+ (sizeof (BTItemData )- sizeof (IndexTupleData ));
583
+ }
584
+ rdata [1 ].buffer = buf ;
585
+ rdata [1 ].next = NULL ;
586
+ if (P_ISLEAF (pageop ))
587
+ flag |=XLOG_BTREE_LEAF ;
588
+
589
+ recptr = XLogInsert (RM_BTREE_ID ,flag ,rdata );
590
+
591
+ PageSetLSN (page ,recptr );
592
+ PageSetSUI (page ,ThisStartUpID );
593
+ }
594
+
595
+ END_CRIT_SECTION ();
596
+ }
597
+
579
598
/*
580
599
*_bt_split() -- split a page in the btree.
581
600
*
@@ -1130,11 +1149,12 @@ _bt_getstackbuf(Relation rel, BTStack stack)
1130
1149
*graph.
1131
1150
*
1132
1151
*On entry, lbuf (the old root) and rbuf (its new peer) are write-
1133
- *locked. On exit, a new root page exists with entries for the
1134
- *two new children. The new root page is neither pinned nor locked, and
1135
- *we have also written out lbuf and rbuf and dropped their pins/locks.
1152
+ *locked. On exit, a new root page exists with entries for the
1153
+ *two new children, metapage is updated and unlocked/unpinned.
1154
+ * The new root buffer is returned to caller which has to unlock/unpin
1155
+ * lbuf, rbuf & rootbuf.
1136
1156
*/
1137
- void
1157
+ static Buffer
1138
1158
_bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf )
1139
1159
{
1140
1160
Buffer rootbuf ;
@@ -1257,13 +1277,156 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1257
1277
}
1258
1278
END_CRIT_SECTION ();
1259
1279
1260
- /* write and let go of the new root buffer */
1261
- _bt_wrtbuf (rel ,rootbuf );
1280
+ /* write and let go of metapage buffer */
1262
1281
_bt_wrtbuf (rel ,metabuf );
1263
1282
1264
- /* update and release new sibling, and finally the old root */
1265
- _bt_wrtbuf (rel ,rbuf );
1266
- _bt_wrtbuf (rel ,lbuf );
1283
+ return (rootbuf );
1284
+ }
1285
+
1286
+ /*
1287
+ * In the event old root page was splitted but no new one was created we
1288
+ * build required parent levels keeping write lock on old root page.
1289
+ * Note: it's assumed that old root page' btpo_parent points to meta page,
1290
+ * ie not to parent page. On exit, new root page buffer is write locked.
1291
+ * If "release" is TRUE then oldrootbuf will be released immediately
1292
+ * after upper level is builded.
1293
+ */
1294
+ Buffer
1295
+ _bt_fixroot (Relation rel ,Buffer oldrootbuf ,bool release )
1296
+ {
1297
+ Buffer rootbuf ;
1298
+ BlockNumber rootblk ;
1299
+ Page rootpage ;
1300
+ XLogRecPtr rootLSN ;
1301
+ Page oldrootpage = BufferGetPage (oldrootbuf );
1302
+ BTPageOpaque oldrootopaque = (BTPageOpaque )
1303
+ PageGetSpecialPointer (oldrootpage );
1304
+ Buffer buf ,leftbuf ,rightbuf ;
1305
+ Page page ,leftpage ,rightpage ;
1306
+ BTPageOpaque opaque ,leftopaque ,rightopaque ;
1307
+ OffsetNumber newitemoff ;
1308
+ BTItem btitem ,ritem ;
1309
+ Size itemsz ;
1310
+
1311
+ if (!P_LEFTMOST (oldrootopaque )|| P_RIGHTMOST (oldrootopaque ))
1312
+ elog (ERROR ,"bt_fixroot: not valid old root page" );
1313
+
1314
+ /* Read right neighbor and create new root page*/
1315
+ leftbuf = _bt_getbuf (rel ,oldrootopaque -> btpo_next ,BT_WRITE );
1316
+ leftpage = BufferGetPage (leftbuf );
1317
+ leftopaque = (BTPageOpaque )PageGetSpecialPointer (leftpage );
1318
+ rootbuf = _bt_newroot (rel ,oldrootbuf ,leftbuf );
1319
+ rootpage = BufferGetPage (rootbuf );
1320
+ rootLSN = PageGetLSN (rootpage );
1321
+ rootblk = BufferGetBlockNumber (rootbuf );
1322
+
1323
+ /*
1324
+ * Update LSN & StartUpID of old root buffer and its neighbor to
1325
+ * ensure that they will be written on disk after logging new
1326
+ * root creation. Unfortunately, for the moment (?) we do not
1327
+ * log this operation and so possibly break our rule to log entire
1328
+ * page content of first after checkpoint modification.
1329
+ */
1330
+ HOLD_INTERRUPTS ();
1331
+ oldrootopaque -> btpo_parent = rootblk ;
1332
+ leftopaque -> btpo_parent = rootblk ;
1333
+ PageSetLSN (oldrootpage ,rootLSN );
1334
+ PageSetSUI (oldrootpage ,ThisStartUpID );
1335
+ PageSetLSN (leftpage ,rootLSN );
1336
+ PageSetSUI (leftpage ,ThisStartUpID );
1337
+ RESUME_INTERRUPTS ();
1338
+
1339
+ /* parent page where to insert pointers */
1340
+ buf = rootbuf ;
1341
+ page = BufferGetPage (buf );
1342
+ opaque = (BTPageOpaque )PageGetSpecialPointer (page );
1343
+
1344
+ /*
1345
+ * Now read other pages (if any) on level and add them to new root.
1346
+ * If concurrent process will split one of pages on this level then it
1347
+ * will notice either btpo_parent == metablock or btpo_parent == rootblk.
1348
+ * In first case it will give up its locks and try to lock leftmost page
1349
+ * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
1350
+ * continue. In second case it will try to lock rootbuf keeping its locks
1351
+ * on buffers we already passed, also waiting for us. If we'll have to
1352
+ * unlock rootbuf (split it) and that process will have to split page
1353
+ * of new level we created (level of rootbuf) then it will wait while
1354
+ * we create upper level. Etc.
1355
+ */
1356
+ while (!P_RIGHTMOST (leftopaque ))
1357
+ {
1358
+ rightbuf = _bt_getbuf (rel ,leftopaque -> btpo_next ,BT_WRITE );
1359
+ rightpage = BufferGetPage (rightbuf );
1360
+ rightopaque = (BTPageOpaque )PageGetSpecialPointer (rightpage );
1361
+
1362
+ /* Update LSN & StartUpID (see comments above) */
1363
+ HOLD_INTERRUPTS ();
1364
+ rightopaque -> btpo_parent = rootblk ;
1365
+ if (XLByteLT (PageGetLSN (rightpage ),rootLSN ))
1366
+ PageSetLSN (rightpage ,rootLSN );
1367
+ PageSetSUI (rightpage ,ThisStartUpID );
1368
+ RESUME_INTERRUPTS ();
1369
+
1370
+ ritem = (BTItem )PageGetItem (leftpage ,PageGetItemId (leftpage ,P_HIKEY ));
1371
+ btitem = _bt_formitem (& (ritem -> bti_itup ));
1372
+ ItemPointerSet (& (btitem -> bti_itup .t_tid ),leftopaque -> btpo_next ,P_HIKEY );
1373
+ itemsz = IndexTupleDSize (btitem -> bti_itup )
1374
+ + (sizeof (BTItemData )- sizeof (IndexTupleData ));
1375
+ itemsz = MAXALIGN (itemsz );
1376
+
1377
+ newitemoff = OffsetNumberNext (PageGetMaxOffsetNumber (page ));
1378
+
1379
+ if (PageGetFreeSpace (page )< itemsz )
1380
+ {
1381
+ Buffer newbuf ;
1382
+ OffsetNumber firstright ;
1383
+ OffsetNumber itup_off ;
1384
+ BlockNumber itup_blkno ;
1385
+ bool newitemonleft ;
1386
+
1387
+ firstright = _bt_findsplitloc (rel ,page ,
1388
+ newitemoff ,itemsz ,& newitemonleft );
1389
+ newbuf = _bt_split (rel ,buf ,firstright ,
1390
+ newitemoff ,itemsz ,btitem ,newitemonleft ,
1391
+ & itup_off ,& itup_blkno );
1392
+ /* Keep lock on new "root" buffer ! */
1393
+ if (buf != rootbuf )
1394
+ _bt_relbuf (rel ,buf ,BT_WRITE );
1395
+ buf = newbuf ;
1396
+ page = BufferGetPage (buf );
1397
+ opaque = (BTPageOpaque )PageGetSpecialPointer (page );
1398
+ }
1399
+ else
1400
+ _bt_insertuple (rel ,buf ,itemsz ,btitem ,newitemoff );
1401
+
1402
+ /* give up left buffer */
1403
+ _bt_relbuf (rel ,leftbuf ,BT_WRITE );
1404
+ leftbuf = rightbuf ;
1405
+ leftpage = rightpage ;
1406
+ leftopaque = rightopaque ;
1407
+ }
1408
+
1409
+ /* give up rightmost page buffer */
1410
+ _bt_relbuf (rel ,leftbuf ,BT_WRITE );
1411
+
1412
+ /*
1413
+ * Here we hold locks on old root buffer, new root buffer we've
1414
+ * created with _bt_newroot() - rootbuf, - and buf we've used
1415
+ * for last insert ops - buf. If rootbuf != buf then we have to
1416
+ * create at least one more level. And if "release" is TRUE
1417
+ * (ie we've already created some levels) then we give up
1418
+ * oldrootbuf.
1419
+ */
1420
+ if (release )
1421
+ _bt_relbuf (rel ,oldrootbuf ,BT_WRITE );
1422
+
1423
+ if (rootbuf != buf )
1424
+ {
1425
+ _bt_relbuf (rel ,buf ,BT_WRITE );
1426
+ return (_bt_fixroot (rel ,rootbuf , true));
1427
+ }
1428
+
1429
+ return (rootbuf );
1267
1430
}
1268
1431
1269
1432
/*