8
8
*
9
9
*
10
10
* IDENTIFICATION
11
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.62 2000/08/25 23:13:33 tgl Exp $
11
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.63 2000/10/04 00:04:42 vadim Exp $
12
12
*
13
13
*-------------------------------------------------------------------------
14
14
*/
@@ -33,6 +33,7 @@ typedef struct
33
33
int best_delta ;/* best size delta so far */
34
34
}FindSplitData ;
35
35
36
+ void _bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf );
36
37
37
38
static TransactionId _bt_check_unique (Relation rel ,BTItem btitem ,
38
39
Relation heapRel ,Buffer buf ,
@@ -54,7 +55,6 @@ static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
54
55
int leftfree ,int rightfree ,
55
56
bool newitemonleft ,Size firstrightitemsz );
56
57
static Buffer _bt_getstackbuf (Relation rel ,BTStack stack );
57
- static void _bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf );
58
58
static void _bt_pgaddtup (Relation rel ,Page page ,
59
59
Size itemsize ,BTItem btitem ,
60
60
OffsetNumber itup_off ,const char * where );
@@ -514,6 +514,29 @@ _bt_insertonpg(Relation rel,
514
514
}
515
515
else
516
516
{
517
+ #ifdef XLOG
518
+ /* XLOG stuff */
519
+ {
520
+ char xlbuf [sizeof (xl_btree_insert )+ 2 * sizeof (CommandId )];
521
+ xl_btree_insert * xlrec = xlbuf ;
522
+ int hsize = SizeOfBtreeInsert ;
523
+
524
+ xlrec -> target .node = rel -> rd_node ;
525
+ ItemPointerSet (& (xlrec -> target .tid ),BufferGetBlockNumber (buf ),newitemoff );
526
+ if (P_ISLEAF (lpageop ))
527
+ {
528
+ CommandId cid = GetCurrentCommandId ();
529
+ memcpy (xlbuf + SizeOfBtreeInsert ,& (char * )cid ,sizeof (CommandId ));
530
+ hsize += sizeof (CommandId );
531
+ }
532
+
533
+ XLogRecPtr recptr = XLogInsert (RM_BTREE_ID ,XLOG_BTREE_INSERT ,
534
+ xlbuf ,hsize , (char * )btitem ,itemsz );
535
+
536
+ PageSetLSN (page ,recptr );
537
+ PageSetSUI (page ,ThisStartUpID );
538
+ }
539
+ #endif
517
540
_bt_pgaddtup (rel ,page ,itemsz ,btitem ,newitemoff ,"page" );
518
541
itup_off = newitemoff ;
519
542
itup_blkno = BufferGetBlockNumber (buf );
@@ -578,8 +601,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
578
601
ropaque = (BTPageOpaque )PageGetSpecialPointer (rightpage );
579
602
580
603
/* if we're splitting this page, it won't be the root when we're done */
581
- oopaque -> btpo_flags &= ~BTP_ROOT ;
582
- lopaque -> btpo_flags = ropaque -> btpo_flags = oopaque -> btpo_flags ;
604
+ lopaque -> btpo_flags = oopaque -> btpo_flags ;
605
+ lopaque -> btpo_flags &= ~BTP_ROOT ;
606
+ ropaque -> btpo_flags = lopaque -> btpo_flags ;
583
607
lopaque -> btpo_prev = oopaque -> btpo_prev ;
584
608
lopaque -> btpo_next = BufferGetBlockNumber (rbuf );
585
609
ropaque -> btpo_prev = BufferGetBlockNumber (buf );
@@ -608,7 +632,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
608
632
item = (BTItem )PageGetItem (origpage ,itemid );
609
633
if (PageAddItem (rightpage , (Item )item ,itemsz ,rightoff ,
610
634
LP_USED )== InvalidOffsetNumber )
611
- elog (FATAL ,"btree: failed to add hikey to the right sibling" );
635
+ elog (STOP ,"btree: failed to add hikey to the right sibling" );
612
636
rightoff = OffsetNumberNext (rightoff );
613
637
}
614
638
@@ -633,7 +657,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
633
657
}
634
658
if (PageAddItem (leftpage , (Item )item ,itemsz ,leftoff ,
635
659
LP_USED )== InvalidOffsetNumber )
636
- elog (FATAL ,"btree: failed to add hikey to the left sibling" );
660
+ elog (STOP ,"btree: failed to add hikey to the left sibling" );
637
661
leftoff = OffsetNumberNext (leftoff );
638
662
639
663
/*
@@ -704,6 +728,75 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
704
728
}
705
729
}
706
730
731
+ /*
732
+ * We have to grab the right sibling (if any) and fix the prev
733
+ * pointer there. We are guaranteed that this is deadlock-free
734
+ * since no other writer will be holding a lock on that page
735
+ * and trying to move left, and all readers release locks on a page
736
+ * before trying to fetch its neighbors.
737
+ */
738
+
739
+ if (!P_RIGHTMOST (ropaque ))
740
+ {
741
+ sbuf = _bt_getbuf (rel ,ropaque -> btpo_next ,BT_WRITE );
742
+ spage = BufferGetPage (sbuf );
743
+ }
744
+
745
+ #ifdef XLOG
746
+ /*
747
+ * Right sibling is locked, new siblings are prepared, but original
748
+ * page is not updated yet. Log changes before continuing.
749
+ *
750
+ * NO ELOG(ERROR) till right sibling is updated.
751
+ *
752
+ */
753
+ {
754
+ char xlbuf [sizeof (xl_btree_split )+
755
+ 2 * sizeof (CommandId )+ BLCKSZ ];
756
+ xl_btree_split * xlrec = xlbuf ;
757
+ int hsize = SizeOfBtreeSplit ;
758
+ int flag = (newitemonleft ) ?
759
+ XLOG_BTREE_SPLEFT :XLOG_BTREE_SPLIT ;
760
+
761
+ xlrec -> target .node = rel -> rd_node ;
762
+ ItemPointerSet (& (xlrec -> target .tid ),itup_blkno ,itup_off );
763
+ if (P_ISLEAF (lopaque ))
764
+ {
765
+ CommandId cid = GetCurrentCommandId ();
766
+ memcpy (xlbuf + hsize ,& (char * )cid ,sizeof (CommandId ));
767
+ hsize += sizeof (CommandId );
768
+ }
769
+ if (newitemonleft )
770
+ {
771
+ memcpy (xlbuf + hsize , (char * )newitem ,newitemsz );
772
+ hsize += newitemsz ;
773
+ xlrec -> otherblk = BufferGetBlockNumber (rbuf );
774
+ }
775
+ else
776
+ xlrec -> otherblk = BufferGetBlockNumber (buf );
777
+
778
+ xlrec -> rightblk = ropaque -> btpo_next ;
779
+
780
+ /*
781
+ * Dirrect access to page is not good but faster - we should
782
+ * implement some new func in page API.
783
+ */
784
+ XLogRecPtr recptr = XLogInsert (RM_BTREE_ID ,flag ,xlbuf ,
785
+ hsize , (char * )rightpage + (PageHeader )rightpage )-> pd_upper ,
786
+ ((PageHeader )rightpage )-> pd_special - ((PageHeader )rightpage )-> upper );
787
+
788
+ PageSetLSN (leftpage ,recptr );
789
+ PageSetSUI (leftpage ,ThisStartUpID );
790
+ PageSetLSN (rightpage ,recptr );
791
+ PageSetSUI (rightpage ,ThisStartUpID );
792
+ if (!P_RIGHTMOST (ropaque ))
793
+ {
794
+ PageSetLSN (spage ,recptr );
795
+ PageSetSUI (spage ,ThisStartUpID );
796
+ }
797
+ }
798
+ #endif
799
+
707
800
/*
708
801
* By here, the original data page has been split into two new halves,
709
802
* and these are correct. The algorithm requires that the left page
@@ -716,18 +809,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
716
809
717
810
PageRestoreTempPage (leftpage ,origpage );
718
811
719
- /*
720
- * Finally, we need to grab the right sibling (if any) and fix the
721
- * prev pointer there.We are guaranteed that this is deadlock-free
722
- * since no other writer will be holding a lock on that page
723
- * and trying to move left, and all readers release locks on a page
724
- * before trying to fetch its neighbors.
725
- */
726
-
727
812
if (!P_RIGHTMOST (ropaque ))
728
813
{
729
- sbuf = _bt_getbuf (rel ,ropaque -> btpo_next ,BT_WRITE );
730
- spage = BufferGetPage (sbuf );
731
814
sopaque = (BTPageOpaque )PageGetSpecialPointer (spage );
732
815
sopaque -> btpo_prev = BufferGetBlockNumber (rbuf );
733
816
@@ -1002,7 +1085,7 @@ _bt_getstackbuf(Relation rel, BTStack stack)
1002
1085
*two new children. The new root page is neither pinned nor locked, and
1003
1086
*we have also written out lbuf and rbuf and dropped their pins/locks.
1004
1087
*/
1005
- static void
1088
+ void
1006
1089
_bt_newroot (Relation rel ,Buffer lbuf ,Buffer rbuf )
1007
1090
{
1008
1091
Buffer rootbuf ;
@@ -1011,7 +1094,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1011
1094
rootpage ;
1012
1095
BlockNumber lbkno ,
1013
1096
rbkno ;
1014
- BlockNumber rootbknum ;
1097
+ BlockNumber rootblknum ;
1015
1098
BTPageOpaque rootopaque ;
1016
1099
ItemId itemid ;
1017
1100
BTItem item ;
@@ -1021,12 +1104,16 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1021
1104
/* get a new root page */
1022
1105
rootbuf = _bt_getbuf (rel ,P_NEW ,BT_WRITE );
1023
1106
rootpage = BufferGetPage (rootbuf );
1024
- rootbknum = BufferGetBlockNumber (rootbuf );
1107
+ rootblknum = BufferGetBlockNumber (rootbuf );
1108
+
1109
+
1110
+ /* NO ELOG(ERROR) from here till newroot op is logged */
1025
1111
1026
1112
/* set btree special data */
1027
1113
rootopaque = (BTPageOpaque )PageGetSpecialPointer (rootpage );
1028
1114
rootopaque -> btpo_prev = rootopaque -> btpo_next = P_NONE ;
1029
1115
rootopaque -> btpo_flags |=BTP_ROOT ;
1116
+ rootopaque -> btpo_parent = BTREE_METAPAGE ;
1030
1117
1031
1118
lbkno = BufferGetBlockNumber (lbuf );
1032
1119
rbkno = BufferGetBlockNumber (rbuf );
@@ -1040,7 +1127,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1040
1127
*/
1041
1128
((BTPageOpaque )PageGetSpecialPointer (lpage ))-> btpo_parent =
1042
1129
((BTPageOpaque )PageGetSpecialPointer (rpage ))-> btpo_parent =
1043
- rootbknum ;
1130
+ rootblknum ;
1044
1131
1045
1132
/*
1046
1133
* Create downlink item for left page (old root). Since this will be
@@ -1058,7 +1145,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1058
1145
* the two items will go into positions P_HIKEY and P_FIRSTKEY.
1059
1146
*/
1060
1147
if (PageAddItem (rootpage , (Item )new_item ,itemsz ,P_HIKEY ,LP_USED )== InvalidOffsetNumber )
1061
- elog (FATAL ,"btree: failed to add leftkey to new root page" );
1148
+ elog (STOP ,"btree: failed to add leftkey to new root page" );
1062
1149
pfree (new_item );
1063
1150
1064
1151
/*
@@ -1075,14 +1162,35 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1075
1162
* insert the right page pointer into the new root page.
1076
1163
*/
1077
1164
if (PageAddItem (rootpage , (Item )new_item ,itemsz ,P_FIRSTKEY ,LP_USED )== InvalidOffsetNumber )
1078
- elog (FATAL ,"btree: failed to add rightkey to new root page" );
1165
+ elog (STOP ,"btree: failed to add rightkey to new root page" );
1079
1166
pfree (new_item );
1080
1167
1168
+ #ifdef XLOG
1169
+ /* XLOG stuff */
1170
+ {
1171
+ xl_btree_newroot xlrec ;
1172
+ xlrec .node = rel -> rd_node ;
1173
+ xlrec .rootblk = rootblknum ;
1174
+
1175
+ /*
1176
+ * Dirrect access to page is not good but faster - we should
1177
+ * implement some new func in page API.
1178
+ */
1179
+ XLogRecPtr recptr = XLogInsert (RM_BTREE_ID ,XLOG_BTREE_NEWROOT ,
1180
+ & xlrec ,SizeOfBtreeNewroot ,
1181
+ (char * )rootpage + (PageHeader )rootpage )-> pd_upper ,
1182
+ ((PageHeader )rootpage )-> pd_special - ((PageHeader )rootpage )-> upper );
1183
+
1184
+ PageSetLSN (rootpage ,recptr );
1185
+ PageSetSUI (rootpage ,ThisStartUpID );
1186
+ }
1187
+ #endif
1188
+
1081
1189
/* write and let go of the new root buffer */
1082
1190
_bt_wrtbuf (rel ,rootbuf );
1083
1191
1084
1192
/* update metadata page with new root block number */
1085
- _bt_metaproot (rel ,rootbknum ,0 );
1193
+ _bt_metaproot (rel ,rootblknum ,0 );
1086
1194
1087
1195
/* update and release new sibling, and finally the old root */
1088
1196
_bt_wrtbuf (rel ,rbuf );
@@ -1125,7 +1233,7 @@ _bt_pgaddtup(Relation rel,
1125
1233
1126
1234
if (PageAddItem (page , (Item )btitem ,itemsize ,itup_off ,
1127
1235
LP_USED )== InvalidOffsetNumber )
1128
- elog (FATAL ,"btree: failed to add item to the %s for %s" ,
1236
+ elog (STOP ,"btree: failed to add item to the %s for %s" ,
1129
1237
where ,RelationGetRelationName (rel ));
1130
1238
}
1131
1239