88 *
99 *
1010 * IDENTIFICATION
11- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.270 2008/11/19 10:34:50 heikki Exp $
11+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.271 2008/12/03 13:05:22 heikki Exp $
1212 *
1313 *
1414 * INTERFACE ROUTINES
4747#include "access/transam.h"
4848#include "access/tuptoaster.h"
4949#include "access/valid.h"
50+ #include "access/visibilitymap.h"
5051#include "access/xact.h"
5152#include "access/xlogutils.h"
5253#include "catalog/catalog.h"
@@ -195,6 +196,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
195196int ntup ;
196197OffsetNumber lineoff ;
197198ItemId lpp ;
199+ bool all_visible ;
198200
199201Assert (page < scan -> rs_nblocks );
200202
@@ -233,20 +235,32 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
233235lines = PageGetMaxOffsetNumber (dp );
234236ntup = 0 ;
235237
238+ /*
239+ * If the all-visible flag indicates that all tuples on the page are
240+ * visible to everyone, we can skip the per-tuple visibility tests.
241+ */
242+ all_visible = PageIsAllVisible (dp );
243+
236244for (lineoff = FirstOffsetNumber ,lpp = PageGetItemId (dp ,lineoff );
237245lineoff <=lines ;
238246lineoff ++ ,lpp ++ )
239247{
240248if (ItemIdIsNormal (lpp ))
241249{
242- HeapTupleData loctup ;
243250bool valid ;
244251
245- loctup .t_data = (HeapTupleHeader )PageGetItem ((Page )dp ,lpp );
246- loctup .t_len = ItemIdGetLength (lpp );
247- ItemPointerSet (& (loctup .t_self ),page ,lineoff );
252+ if (all_visible )
253+ valid = true;
254+ else
255+ {
256+ HeapTupleData loctup ;
257+
258+ loctup .t_data = (HeapTupleHeader )PageGetItem ((Page )dp ,lpp );
259+ loctup .t_len = ItemIdGetLength (lpp );
260+ ItemPointerSet (& (loctup .t_self ),page ,lineoff );
248261
249- valid = HeapTupleSatisfiesVisibility (& loctup ,snapshot ,buffer );
262+ valid = HeapTupleSatisfiesVisibility (& loctup ,snapshot ,buffer );
263+ }
250264if (valid )
251265scan -> rs_vistuples [ntup ++ ]= lineoff ;
252266}
@@ -1860,6 +1874,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
18601874TransactionId xid = GetCurrentTransactionId ();
18611875HeapTuple heaptup ;
18621876Buffer buffer ;
1877+ bool all_visible_cleared = false;
18631878
18641879if (relation -> rd_rel -> relhasoids )
18651880{
@@ -1920,6 +1935,12 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
19201935
19211936RelationPutHeapTuple (relation ,buffer ,heaptup );
19221937
1938+ if (PageIsAllVisible (BufferGetPage (buffer )))
1939+ {
1940+ all_visible_cleared = true;
1941+ PageClearAllVisible (BufferGetPage (buffer ));
1942+ }
1943+
19231944/*
19241945 * XXX Should we set PageSetPrunable on this page ?
19251946 *
@@ -1943,6 +1964,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
19431964Page page = BufferGetPage (buffer );
19441965uint8 info = XLOG_HEAP_INSERT ;
19451966
1967+ xlrec .all_visible_cleared = all_visible_cleared ;
19461968xlrec .target .node = relation -> rd_node ;
19471969xlrec .target .tid = heaptup -> t_self ;
19481970rdata [0 ].data = (char * )& xlrec ;
@@ -1994,6 +2016,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
19942016
19952017UnlockReleaseBuffer (buffer );
19962018
2019+ /* Clear the bit in the visibility map if necessary */
2020+ if (all_visible_cleared )
2021+ visibilitymap_clear (relation ,
2022+ ItemPointerGetBlockNumber (& (heaptup -> t_self )));
2023+
19972024/*
19982025 * If tuple is cachable, mark it for invalidation from the caches in case
19992026 * we abort. Note it is OK to do this after releasing the buffer, because
@@ -2070,6 +2097,7 @@ heap_delete(Relation relation, ItemPointer tid,
20702097Buffer buffer ;
20712098bool have_tuple_lock = false;
20722099bool iscombo ;
2100+ bool all_visible_cleared = false;
20732101
20742102Assert (ItemPointerIsValid (tid ));
20752103
@@ -2216,6 +2244,12 @@ heap_delete(Relation relation, ItemPointer tid,
22162244 */
22172245PageSetPrunable (page ,xid );
22182246
2247+ if (PageIsAllVisible (page ))
2248+ {
2249+ all_visible_cleared = true;
2250+ PageClearAllVisible (page );
2251+ }
2252+
22192253/* store transaction information of xact deleting the tuple */
22202254tp .t_data -> t_infomask &= ~(HEAP_XMAX_COMMITTED |
22212255HEAP_XMAX_INVALID |
@@ -2237,6 +2271,7 @@ heap_delete(Relation relation, ItemPointer tid,
22372271XLogRecPtr recptr ;
22382272XLogRecData rdata [2 ];
22392273
2274+ xlrec .all_visible_cleared = all_visible_cleared ;
22402275xlrec .target .node = relation -> rd_node ;
22412276xlrec .target .tid = tp .t_self ;
22422277rdata [0 ].data = (char * )& xlrec ;
@@ -2281,6 +2316,10 @@ heap_delete(Relation relation, ItemPointer tid,
22812316 */
22822317CacheInvalidateHeapTuple (relation ,& tp );
22832318
2319+ /* Clear the bit in the visibility map if necessary */
2320+ if (all_visible_cleared )
2321+ visibilitymap_clear (relation ,BufferGetBlockNumber (buffer ));
2322+
22842323/* Now we can release the buffer */
22852324ReleaseBuffer (buffer );
22862325
@@ -2388,6 +2427,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
23882427bool have_tuple_lock = false;
23892428bool iscombo ;
23902429bool use_hot_update = false;
2430+ bool all_visible_cleared = false;
2431+ bool all_visible_cleared_new = false;
23912432
23922433Assert (ItemPointerIsValid (otid ));
23932434
@@ -2763,6 +2804,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
27632804MarkBufferDirty (newbuf );
27642805MarkBufferDirty (buffer );
27652806
2807+ /*
2808+ * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL
2809+ * record, because log_heap_update looks at those flags to set the
2810+ * corresponding flags in the WAL record.
2811+ */
2812+
27662813/* XLOG stuff */
27672814if (!relation -> rd_istemp )
27682815{
@@ -2778,6 +2825,18 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
27782825PageSetTLI (BufferGetPage (buffer ),ThisTimeLineID );
27792826}
27802827
2828+ /* Clear PD_ALL_VISIBLE flags */
2829+ if (PageIsAllVisible (BufferGetPage (buffer )))
2830+ {
2831+ all_visible_cleared = true;
2832+ PageClearAllVisible (BufferGetPage (buffer ));
2833+ }
2834+ if (newbuf != buffer && PageIsAllVisible (BufferGetPage (newbuf )))
2835+ {
2836+ all_visible_cleared_new = true;
2837+ PageClearAllVisible (BufferGetPage (newbuf ));
2838+ }
2839+
27812840END_CRIT_SECTION ();
27822841
27832842if (newbuf != buffer )
@@ -2791,6 +2850,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
27912850 */
27922851CacheInvalidateHeapTuple (relation ,& oldtup );
27932852
2853+ /* Clear bits in visibility map */
2854+ if (all_visible_cleared )
2855+ visibilitymap_clear (relation ,BufferGetBlockNumber (buffer ));
2856+ if (all_visible_cleared_new )
2857+ visibilitymap_clear (relation ,BufferGetBlockNumber (newbuf ));
2858+
27942859/* Now we can release the buffer(s) */
27952860if (newbuf != buffer )
27962861ReleaseBuffer (newbuf );
@@ -3411,6 +3476,11 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
34113476
34123477LockBuffer (* buffer ,BUFFER_LOCK_UNLOCK );
34133478
3479+ /*
3480+ * Don't update the visibility map here. Locking a tuple doesn't
3481+ * change visibility info.
3482+ */
3483+
34143484/*
34153485 * Now that we have successfully marked the tuple as locked, we can
34163486 * release the lmgr tuple lock, if we had it.
@@ -3916,7 +3986,9 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
39163986
39173987xlrec .target .node = reln -> rd_node ;
39183988xlrec .target .tid = from ;
3989+ xlrec .all_visible_cleared = PageIsAllVisible (BufferGetPage (oldbuf ));
39193990xlrec .newtid = newtup -> t_self ;
3991+ xlrec .new_all_visible_cleared = PageIsAllVisible (BufferGetPage (newbuf ));
39203992
39213993rdata [0 ].data = (char * )& xlrec ;
39223994rdata [0 ].len = SizeOfHeapUpdate ;
@@ -4185,13 +4257,25 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
41854257OffsetNumber offnum ;
41864258ItemId lp = NULL ;
41874259HeapTupleHeader htup ;
4260+ BlockNumber blkno ;
4261+
4262+ blkno = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
4263+
4264+ /*
4265+ * The visibility map always needs to be updated, even if the heap page
4266+ * is already up-to-date.
4267+ */
4268+ if (xlrec -> all_visible_cleared )
4269+ {
4270+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4271+ visibilitymap_clear (reln ,blkno );
4272+ FreeFakeRelcacheEntry (reln );
4273+ }
41884274
41894275if (record -> xl_info & XLR_BKP_BLOCK_1 )
41904276return ;
41914277
4192- buffer = XLogReadBuffer (xlrec -> target .node ,
4193- ItemPointerGetBlockNumber (& (xlrec -> target .tid )),
4194- false);
4278+ buffer = XLogReadBuffer (xlrec -> target .node ,blkno , false);
41954279if (!BufferIsValid (buffer ))
41964280return ;
41974281page = (Page )BufferGetPage (buffer );
@@ -4223,6 +4307,9 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
42234307/* Mark the page as a candidate for pruning */
42244308PageSetPrunable (page ,record -> xl_xid );
42254309
4310+ if (xlrec -> all_visible_cleared )
4311+ PageClearAllVisible (page );
4312+
42264313/* Make sure there is no forward chain link in t_ctid */
42274314htup -> t_ctid = xlrec -> target .tid ;
42284315PageSetLSN (page ,lsn );
@@ -4249,11 +4336,22 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
42494336Size freespace ;
42504337BlockNumber blkno ;
42514338
4339+ blkno = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
4340+
4341+ /*
4342+ * The visibility map always needs to be updated, even if the heap page
4343+ * is already up-to-date.
4344+ */
4345+ if (xlrec -> all_visible_cleared )
4346+ {
4347+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4348+ visibilitymap_clear (reln ,blkno );
4349+ FreeFakeRelcacheEntry (reln );
4350+ }
4351+
42524352if (record -> xl_info & XLR_BKP_BLOCK_1 )
42534353return ;
42544354
4255- blkno = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
4256-
42574355if (record -> xl_info & XLOG_HEAP_INIT_PAGE )
42584356{
42594357buffer = XLogReadBuffer (xlrec -> target .node ,blkno , true);
@@ -4307,6 +4405,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
43074405
43084406PageSetLSN (page ,lsn );
43094407PageSetTLI (page ,ThisTimeLineID );
4408+
4409+ if (xlrec -> all_visible_cleared )
4410+ PageClearAllVisible (page );
4411+
43104412MarkBufferDirty (buffer );
43114413UnlockReleaseBuffer (buffer );
43124414
@@ -4347,6 +4449,18 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
43474449uint32 newlen ;
43484450Size freespace ;
43494451
4452+ /*
4453+ * The visibility map always needs to be updated, even if the heap page
4454+ * is already up-to-date.
4455+ */
4456+ if (xlrec -> all_visible_cleared )
4457+ {
4458+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4459+ visibilitymap_clear (reln ,
4460+ ItemPointerGetBlockNumber (& xlrec -> target .tid ));
4461+ FreeFakeRelcacheEntry (reln );
4462+ }
4463+
43504464if (record -> xl_info & XLR_BKP_BLOCK_1 )
43514465{
43524466if (samepage )
@@ -4411,6 +4525,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
44114525/* Mark the page as a candidate for pruning */
44124526PageSetPrunable (page ,record -> xl_xid );
44134527
4528+ if (xlrec -> all_visible_cleared )
4529+ PageClearAllVisible (page );
4530+
44144531/*
44154532 * this test is ugly, but necessary to avoid thinking that insert change
44164533 * is already applied
@@ -4426,6 +4543,17 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
44264543
44274544newt :;
44284545
4546+ /*
4547+ * The visibility map always needs to be updated, even if the heap page
4548+ * is already up-to-date.
4549+ */
4550+ if (xlrec -> new_all_visible_cleared )
4551+ {
4552+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4553+ visibilitymap_clear (reln ,ItemPointerGetBlockNumber (& xlrec -> newtid ));
4554+ FreeFakeRelcacheEntry (reln );
4555+ }
4556+
44294557if (record -> xl_info & XLR_BKP_BLOCK_2 )
44304558return ;
44314559
@@ -4504,6 +4632,9 @@ newsame:;
45044632if (offnum == InvalidOffsetNumber )
45054633elog (PANIC ,"heap_update_redo: failed to add tuple" );
45064634
4635+ if (xlrec -> new_all_visible_cleared )
4636+ PageClearAllVisible (page );
4637+
45074638freespace = PageGetHeapFreeSpace (page );/* needed to update FSM below */
45084639
45094640PageSetLSN (page ,lsn );