88 * Portions Copyright (c) 1994, Regents of the University of California
99 *
1010 * IDENTIFICATION
11- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.31 2006/04/01 03:03:37 tgl Exp $
11+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.32 2006/04/13 03:53:05 tgl Exp $
1212 *
1313 *-------------------------------------------------------------------------
1414 */
@@ -51,32 +51,16 @@ log_incomplete_split(RelFileNode node, BlockNumber leftblk,
5151}
5252
5353static void
54- forget_matching_split (Relation reln ,RelFileNode node ,
55- BlockNumber insertblk ,OffsetNumber offnum ,
56- bool is_root )
54+ forget_matching_split (RelFileNode node ,BlockNumber downlink ,bool is_root )
5755{
58- Buffer buffer ;
59- Page page ;
60- IndexTuple itup ;
61- BlockNumber rightblk ;
6256ListCell * l ;
6357
64- /* Get downlink TID from page */
65- buffer = XLogReadBuffer (reln ,insertblk , false);
66- if (!BufferIsValid (buffer ))
67- return ;
68- page = (Page )BufferGetPage (buffer );
69- itup = (IndexTuple )PageGetItem (page ,PageGetItemId (page ,offnum ));
70- rightblk = ItemPointerGetBlockNumber (& (itup -> t_tid ));
71- Assert (ItemPointerGetOffsetNumber (& (itup -> t_tid ))== P_HIKEY );
72- UnlockReleaseBuffer (buffer );
73-
7458foreach (l ,incomplete_splits )
7559{
7660bt_incomplete_split * split = (bt_incomplete_split * )lfirst (l );
7761
7862if (RelFileNodeEquals (node ,split -> node )&&
79- rightblk == split -> rightblk )
63+ downlink == split -> rightblk )
8064{
8165if (is_root != split -> is_root )
8266elog (LOG ,"forget_matching_split: fishy is_root data (expected %d, got %d)" ,
@@ -87,6 +71,20 @@ forget_matching_split(Relation reln, RelFileNode node,
8771}
8872}
8973
74+ /*
75+ * _bt_restore_page -- re-enter all the index tuples on a page
76+ *
77+ * The page is freshly init'd, and *from (length len) is a copy of what
78+ * had been its upper part (pd_upper to pd_special). We assume that the
79+ * tuples had been added to the page in item-number order, and therefore
80+ * the one with highest item number appears first (lowest on the page).
81+ *
82+ * NOTE: the way this routine is coded, the rebuilt page will have the items
83+ * in correct itemno sequence, but physically the opposite order from the
84+ * original, because we insert them in the opposite of itemno order. This
85+ * does not matter in any current btree code, but it's something to keep an
86+ * eye on. Is it worth changing just on general principles?
87+ */
9088static void
9189_bt_restore_page (Page page ,char * from ,int len )
9290{
@@ -158,18 +156,24 @@ btree_xlog_insert(bool isleaf, bool ismeta,
158156char * datapos ;
159157int datalen ;
160158xl_btree_metadata md ;
159+ BlockNumber downlink = 0 ;
161160
162161datapos = (char * )xlrec + SizeOfBtreeInsert ;
163162datalen = record -> xl_len - SizeOfBtreeInsert ;
163+ if (!isleaf )
164+ {
165+ memcpy (& downlink ,datapos ,sizeof (BlockNumber ));
166+ datapos += sizeof (BlockNumber );
167+ datalen -= sizeof (BlockNumber );
168+ }
164169if (ismeta )
165170{
166171memcpy (& md ,datapos ,sizeof (xl_btree_metadata ));
167172datapos += sizeof (xl_btree_metadata );
168173datalen -= sizeof (xl_btree_metadata );
169174}
170175
171- if ((record -> xl_info & XLR_BKP_BLOCK_1 )&& !ismeta &&
172- incomplete_splits == NIL )
176+ if ((record -> xl_info & XLR_BKP_BLOCK_1 )&& !ismeta && isleaf )
173177return ;/* nothing to do */
174178
175179reln = XLogOpenRelation (xlrec -> target .node );
@@ -208,13 +212,8 @@ btree_xlog_insert(bool isleaf, bool ismeta,
208212md .fastroot ,md .fastlevel );
209213
210214/* Forget any split this insertion completes */
211- if (!isleaf && incomplete_splits != NIL )
212- {
213- forget_matching_split (reln ,xlrec -> target .node ,
214- ItemPointerGetBlockNumber (& (xlrec -> target .tid )),
215- ItemPointerGetOffsetNumber (& (xlrec -> target .tid )),
216- false);
217- }
215+ if (!isleaf )
216+ forget_matching_split (xlrec -> target .node ,downlink , false);
218217}
219218
220219static void
@@ -224,14 +223,17 @@ btree_xlog_split(bool onleft, bool isroot,
224223xl_btree_split * xlrec = (xl_btree_split * )XLogRecGetData (record );
225224Relation reln ;
226225BlockNumber targetblk ;
226+ OffsetNumber targetoff ;
227227BlockNumber leftsib ;
228228BlockNumber rightsib ;
229+ BlockNumber downlink = 0 ;
229230Buffer buffer ;
230231Page page ;
231232BTPageOpaque pageop ;
232233
233234reln = XLogOpenRelation (xlrec -> target .node );
234235targetblk = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
236+ targetoff = ItemPointerGetOffsetNumber (& (xlrec -> target .tid ));
235237leftsib = (onleft ) ?targetblk :xlrec -> otherblk ;
236238rightsib = (onleft ) ?xlrec -> otherblk :targetblk ;
237239
@@ -252,6 +254,16 @@ btree_xlog_split(bool onleft, bool isroot,
252254 (char * )xlrec + SizeOfBtreeSplit ,
253255xlrec -> leftlen );
254256
257+ if (onleft && xlrec -> level > 0 )
258+ {
259+ IndexTuple itup ;
260+
261+ /* extract downlink in the target tuple */
262+ itup = (IndexTuple )PageGetItem (page ,PageGetItemId (page ,targetoff ));
263+ downlink = ItemPointerGetBlockNumber (& (itup -> t_tid ));
264+ Assert (ItemPointerGetOffsetNumber (& (itup -> t_tid ))== P_HIKEY );
265+ }
266+
255267PageSetLSN (page ,lsn );
256268PageSetTLI (page ,ThisTimeLineID );
257269MarkBufferDirty (buffer );
@@ -274,6 +286,16 @@ btree_xlog_split(bool onleft, bool isroot,
274286 (char * )xlrec + SizeOfBtreeSplit + xlrec -> leftlen ,
275287record -> xl_len - SizeOfBtreeSplit - xlrec -> leftlen );
276288
289+ if (!onleft && xlrec -> level > 0 )
290+ {
291+ IndexTuple itup ;
292+
293+ /* extract downlink in the target tuple */
294+ itup = (IndexTuple )PageGetItem (page ,PageGetItemId (page ,targetoff ));
295+ downlink = ItemPointerGetBlockNumber (& (itup -> t_tid ));
296+ Assert (ItemPointerGetOffsetNumber (& (itup -> t_tid ))== P_HIKEY );
297+ }
298+
277299PageSetLSN (page ,lsn );
278300PageSetTLI (page ,ThisTimeLineID );
279301MarkBufferDirty (buffer );
@@ -308,13 +330,8 @@ btree_xlog_split(bool onleft, bool isroot,
308330}
309331
310332/* Forget any split this insertion completes */
311- if (xlrec -> level > 0 && incomplete_splits != NIL )
312- {
313- forget_matching_split (reln ,xlrec -> target .node ,
314- ItemPointerGetBlockNumber (& (xlrec -> target .tid )),
315- ItemPointerGetOffsetNumber (& (xlrec -> target .tid )),
316- false);
317- }
333+ if (xlrec -> level > 0 )
334+ forget_matching_split (xlrec -> target .node ,downlink , false);
318335
319336/* The job ain't done till the parent link is inserted... */
320337log_incomplete_split (xlrec -> target .node ,
@@ -516,6 +533,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
516533Buffer buffer ;
517534Page page ;
518535BTPageOpaque pageop ;
536+ BlockNumber downlink = 0 ;
519537
520538reln = XLogOpenRelation (xlrec -> node );
521539buffer = XLogReadBuffer (reln ,xlrec -> rootblk , true);
@@ -532,9 +550,17 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
532550pageop -> btpo_flags |=BTP_LEAF ;
533551
534552if (record -> xl_len > SizeOfBtreeNewroot )
553+ {
554+ IndexTuple itup ;
555+
535556_bt_restore_page (page ,
536557 (char * )xlrec + SizeOfBtreeNewroot ,
537558record -> xl_len - SizeOfBtreeNewroot );
559+ /* extract downlink to the right-hand split page */
560+ itup = (IndexTuple )PageGetItem (page ,PageGetItemId (page ,P_FIRSTKEY ));
561+ downlink = ItemPointerGetBlockNumber (& (itup -> t_tid ));
562+ Assert (ItemPointerGetOffsetNumber (& (itup -> t_tid ))== P_HIKEY );
563+ }
538564
539565PageSetLSN (page ,lsn );
540566PageSetTLI (page ,ThisTimeLineID );
@@ -546,14 +572,8 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
546572xlrec -> rootblk ,xlrec -> level );
547573
548574/* Check to see if this satisfies any incomplete insertions */
549- if (record -> xl_len > SizeOfBtreeNewroot &&
550- incomplete_splits != NIL )
551- {
552- forget_matching_split (reln ,xlrec -> node ,
553- xlrec -> rootblk ,
554- P_FIRSTKEY ,
555- true);
556- }
575+ if (record -> xl_len > SizeOfBtreeNewroot )
576+ forget_matching_split (xlrec -> node ,downlink , true);
557577}
558578
559579