1717#include "access/gin_private.h"
1818#include "access/relscan.h"
1919#include "miscadmin.h"
20+ #include "storage/predicate.h"
2021#include "utils/datum.h"
2122#include "utils/memutils.h"
23+ #include "utils/rel.h"
2224
2325/* GUC parameter */
2426int GinFuzzySearchLimit = 0 ;
@@ -33,11 +35,25 @@ typedef struct pendingPosition
3335}pendingPosition ;
3436
3537
38+ /*
39+ * Place predicate lock on GIN page if needed.
40+ */
41+ static void
42+ GinPredicateLockPage (Relation index ,BlockNumber blkno ,Snapshot snapshot )
43+ {
44+ /*
45+ * When fast update is on then no need in locking pages, because we
46+ * anyway need to lock the whole index.
47+ */
48+ if (!GinGetUseFastUpdate (index ))
49+ PredicateLockPage (index ,blkno ,snapshot );
50+ }
51+
3652/*
3753 * Goes to the next page if current offset is outside of bounds
3854 */
3955static bool
40- moveRightIfItNeeded (GinBtreeData * btree ,GinBtreeStack * stack )
56+ moveRightIfItNeeded (GinBtreeData * btree ,GinBtreeStack * stack , Snapshot snapshot )
4157{
4258Page page = BufferGetPage (stack -> buffer );
4359
@@ -52,6 +68,7 @@ moveRightIfItNeeded(GinBtreeData *btree, GinBtreeStack *stack)
5268stack -> buffer = ginStepRight (stack -> buffer ,btree -> index ,GIN_SHARE );
5369stack -> blkno = BufferGetBlockNumber (stack -> buffer );
5470stack -> off = FirstOffsetNumber ;
71+ GinPredicateLockPage (btree -> index ,stack -> blkno ,snapshot );
5572}
5673
5774return true;
@@ -73,6 +90,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
7390/* Descend to the leftmost leaf page */
7491stack = ginScanBeginPostingTree (& btree ,index ,rootPostingTree ,snapshot );
7592buffer = stack -> buffer ;
93+
7694IncrBufferRefCount (buffer );/* prevent unpin in freeGinBtreeStack */
7795
7896freeGinBtreeStack (stack );
@@ -82,6 +100,11 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
82100 */
83101for (;;)
84102{
103+ /*
104+ * Predicate lock each leaf page in posting tree
105+ */
106+ GinPredicateLockPage (index ,BufferGetBlockNumber (buffer ),snapshot );
107+
85108page = BufferGetPage (buffer );
86109if ((GinPageGetOpaque (page )-> flags & GIN_DELETED )== 0 )
87110{
@@ -131,6 +154,12 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
131154attnum = scanEntry -> attnum ;
132155attr = TupleDescAttr (btree -> ginstate -> origTupdesc ,attnum - 1 );
133156
157+ /*
158+ * Predicate lock entry leaf page, following pages will be locked by
159+ * moveRightIfItNeeded()
160+ */
161+ GinPredicateLockPage (btree -> index ,stack -> buffer ,snapshot );
162+
134163for (;;)
135164{
136165Page page ;
@@ -141,7 +170,7 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
141170/*
142171 * stack->off points to the interested entry, buffer is already locked
143172 */
144- if (moveRightIfItNeeded (btree ,stack )== false)
173+ if (moveRightIfItNeeded (btree ,stack , snapshot )== false)
145174return true;
146175
147176page = BufferGetPage (stack -> buffer );
@@ -250,7 +279,7 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
250279Datum newDatum ;
251280GinNullCategory newCategory ;
252281
253- if (moveRightIfItNeeded (btree ,stack )== false)
282+ if (moveRightIfItNeeded (btree ,stack , snapshot )== false)
254283elog (ERROR ,"lost saved point in index" );/* must not happen !!! */
255284
256285page = BufferGetPage (stack -> buffer );
@@ -323,6 +352,7 @@ startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot)
323352ginstate );
324353stackEntry = ginFindLeafPage (& btreeEntry , true,snapshot );
325354page = BufferGetPage (stackEntry -> buffer );
355+
326356/* ginFindLeafPage() will have already checked snapshot age. */
327357needUnlock = true;
328358
@@ -370,6 +400,10 @@ startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot)
370400{
371401IndexTuple itup = (IndexTuple )PageGetItem (page ,PageGetItemId (page ,stackEntry -> off ));
372402
403+ /* Predicate lock visited entry leaf page */
404+ GinPredicateLockPage (ginstate -> index ,
405+ BufferGetBlockNumber (stackEntry -> buffer ),snapshot );
406+
373407if (GinIsPostingTree (itup ))
374408{
375409BlockNumber rootPostingTree = GinGetPostingTree (itup );
@@ -391,6 +425,12 @@ startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot)
391425rootPostingTree ,snapshot );
392426entry -> buffer = stack -> buffer ;
393427
428+ /*
429+ * Predicate lock visited posting tree page, following pages
430+ * will be locked by moveRightIfItNeeded or entryLoadMoreItems
431+ */
432+ GinPredicateLockPage (ginstate -> index ,BufferGetBlockNumber (entry -> buffer ),snapshot );
433+
394434/*
395435 * We keep buffer pinned because we need to prevent deletion of
396436 * page during scan. See GIN's vacuum implementation. RefCount is
@@ -493,7 +533,7 @@ startScanKey(GinState *ginstate, GinScanOpaque so, GinScanKey key)
493533
494534for (i = 0 ;i < key -> nentries - 1 ;i ++ )
495535{
496- /* Pass all entries <= i asFALSE , and the rest as MAYBE */
536+ /* Pass all entries <= i asfalse , and the rest as MAYBE */
497537for (j = 0 ;j <=i ;j ++ )
498538key -> entryRes [entryIndexes [j ]]= GIN_FALSE ;
499539for (j = i + 1 ;j < key -> nentries ;j ++ )
@@ -633,6 +673,8 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
633673entry -> btree .fullScan = false;
634674stack = ginFindLeafPage (& entry -> btree , true,snapshot );
635675
676+ GinPredicateLockPage (ginstate -> index ,BufferGetBlockNumber (stack -> buffer ),snapshot );
677+
636678/* we don't need the stack, just the buffer. */
637679entry -> buffer = stack -> buffer ;
638680IncrBufferRefCount (entry -> buffer );
@@ -677,6 +719,10 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
677719entry -> buffer = ginStepRight (entry -> buffer ,
678720ginstate -> index ,
679721GIN_SHARE );
722+
723+ GinPredicateLockPage (ginstate -> index ,BufferGetBlockNumber (entry -> buffer ),snapshot );
724+
725+
680726page = BufferGetPage (entry -> buffer );
681727}
682728stepright = true;
@@ -1038,8 +1084,8 @@ keyGetItem(GinState *ginstate, MemoryContext tempCtx, GinScanKey key,
10381084 * lossy page even when none of the other entries match.
10391085 *
10401086 * Our strategy is to call the tri-state consistent function, with the
1041- * lossy-page entries set to MAYBE, and all the other entriesFALSE . If it
1042- * returnsFALSE , none of the lossy items alone are enough for a match, so
1087+ * lossy-page entries set to MAYBE, and all the other entriesfalse . If it
1088+ * returnsfalse , none of the lossy items alone are enough for a match, so
10431089 * we don't need to return a lossy-page pointer. Otherwise, return a
10441090 * lossy-page pointer to indicate that the whole heap page must be
10451091 * checked. (On subsequent calls, we'll do nothing until minItem is past
@@ -1700,7 +1746,8 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos)
17001746}
17011747
17021748/*
1703- * Collect all matched rows from pending list into bitmap
1749+ * Collect all matched rows from pending list into bitmap. Also function
1750+ * takes PendingLockRelation if it's needed.
17041751 */
17051752static void
17061753scanPendingInsert (IndexScanDesc scan ,TIDBitmap * tbm ,int64 * ntids )
@@ -1730,9 +1777,24 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
17301777{
17311778/* No pending list, so proceed with normal scan */
17321779UnlockReleaseBuffer (metabuffer );
1780+
1781+ /*
1782+ * If fast update is enabled, we acquire a predicate lock on the entire
1783+ * relation as fast update postpones the insertion of tuples into index
1784+ * structure due to which we can't detect rw conflicts.
1785+ */
1786+ if (GinGetUseFastUpdate (scan -> indexRelation ))
1787+ PredicateLockRelation (scan -> indexRelation ,scan -> xs_snapshot );
1788+
17331789return ;
17341790}
17351791
1792+ /*
1793+ * Pending list is not empty, we need to lock the index doesn't despite on
1794+ * fastupdate state
1795+ */
1796+ PredicateLockRelation (scan -> indexRelation ,scan -> xs_snapshot );
1797+
17361798pos .pendingBuffer = ReadBuffer (scan -> indexRelation ,blkno );
17371799LockBuffer (pos .pendingBuffer ,GIN_SHARE );
17381800pos .firstOffset = FirstOffsetNumber ;