3535#include "storage/freespace.h"
3636#include "utils/acl.h"
3737#include "utils/builtins.h"
38+ #include "utils/datum.h"
3839#include "utils/index_selfuncs.h"
3940#include "utils/memutils.h"
4041#include "utils/rel.h"
@@ -77,7 +78,9 @@ static void form_and_insert_tuple(BrinBuildState *state);
7778static void union_tuples (BrinDesc * bdesc ,BrinMemTuple * a ,
7879BrinTuple * b );
7980static void brin_vacuum_scan (Relation idxrel ,BufferAccessStrategy strategy );
80-
81+ static bool add_values_to_range (Relation idxRel ,BrinDesc * bdesc ,
82+ BrinMemTuple * dtup ,Datum * values ,bool * nulls );
83+ static bool check_null_keys (BrinValues * bval ,ScanKey * nullkeys ,int nnullkeys );
8184
8285/*
8386 * BRIN handler function: return IndexAmRoutine with access method parameters
@@ -179,7 +182,6 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
179182OffsetNumber off ;
180183BrinTuple * brtup ;
181184BrinMemTuple * dtup ;
182- int keyno ;
183185
184186CHECK_FOR_INTERRUPTS ();
185187
@@ -243,31 +245,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
243245
244246dtup = brin_deform_tuple (bdesc ,brtup ,NULL );
245247
246- /*
247- * Compare the key values of the new tuple to the stored index values;
248- * our deformed tuple will get updated if the new tuple doesn't fit
249- * the original range (note this means we can't break out of the loop
250- * early). Make a note of whether this happens, so that we know to
251- * insert the modified tuple later.
252- */
253- for (keyno = 0 ;keyno < bdesc -> bd_tupdesc -> natts ;keyno ++ )
254- {
255- Datum result ;
256- BrinValues * bval ;
257- FmgrInfo * addValue ;
258-
259- bval = & dtup -> bt_columns [keyno ];
260- addValue = index_getprocinfo (idxRel ,keyno + 1 ,
261- BRIN_PROCNUM_ADDVALUE );
262- result = FunctionCall4Coll (addValue ,
263- idxRel -> rd_indcollation [keyno ],
264- PointerGetDatum (bdesc ),
265- PointerGetDatum (bval ),
266- values [keyno ],
267- nulls [keyno ]);
268- /* if that returned true, we need to insert the updated tuple */
269- need_insert |=DatumGetBool (result );
270- }
248+ need_insert = add_values_to_range (idxRel ,bdesc ,dtup ,values ,nulls );
271249
272250if (!need_insert )
273251{
@@ -390,8 +368,10 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
390368BrinMemTuple * dtup ;
391369BrinTuple * btup = NULL ;
392370Size btupsz = 0 ;
393- ScanKey * * keys ;
394- int * nkeys ;
371+ ScanKey * * keys ,
372+ * * nullkeys ;
373+ int * nkeys ,
374+ * nnullkeys ;
395375int keyno ;
396376
397377opaque = (BrinOpaque * )scan -> opaque ;
@@ -420,13 +400,18 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
420400 * keys, so we allocate space for all attributes. That may use more memory
421401 * but it's probably cheaper than determining which attributes are used.
422402 *
403+ * We keep null and regular keys separate, so that we can pass just the
404+ * regular keys to the consistent function easily.
405+ *
423406 * XXX The widest index can have 32 attributes, so the amount of wasted
424407 * memory is negligible. We could invent a more compact approach (with
425408 * just space for used attributes) but that would make the matching more
426409 * complex so it's not a good trade-off.
427410 */
428411keys = palloc0 (sizeof (ScanKey * )* bdesc -> bd_tupdesc -> natts );
412+ nullkeys = palloc0 (sizeof (ScanKey * )* bdesc -> bd_tupdesc -> natts );
429413nkeys = palloc0 (sizeof (int )* bdesc -> bd_tupdesc -> natts );
414+ nnullkeys = palloc0 (sizeof (int )* bdesc -> bd_tupdesc -> natts );
430415
431416/* Preprocess the scan keys - split them into per-attribute arrays. */
432417for (keyno = 0 ;keyno < scan -> numberOfKeys ;keyno ++ )
@@ -444,33 +429,47 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
444429TupleDescAttr (bdesc -> bd_tupdesc ,
445430keyattno - 1 )-> attcollation ));
446431
447- /* First time we see this attribute, so init the array of keys. */
448- if (!keys [keyattno - 1 ])
432+ /*
433+ * First time we see this index attribute, so init as needed.
434+ *
435+ * This is a bit of an overkill - we don't know how many scan keys are
436+ * there for this attribute, so we simply allocate the largest number
437+ * possible (as if all keys were for this attribute). This may waste a
438+ * bit of memory, but we only expect small number of scan keys in
439+ * general, so this should be negligible, and repeated repalloc calls
440+ * are not free either.
441+ */
442+ if (consistentFn [keyattno - 1 ].fn_oid == InvalidOid )
449443{
450444FmgrInfo * tmp ;
451445
452- /*
453- * This is a bit of an overkill - we don't know how many scan keys
454- * are there for this attribute, so we simply allocate the largest
455- * number possible (as if all keys were for this attribute). This
456- * may waste a bit of memory, but we only expect small number of
457- * scan keys in general, so this should be negligible, and
458- * repeated repalloc calls are not free either.
459- */
460- keys [keyattno - 1 ]= palloc0 (sizeof (ScanKey )* scan -> numberOfKeys );
461-
462- /* First time this column, so look up consistent function */
463- Assert (consistentFn [keyattno - 1 ].fn_oid == InvalidOid );
446+ /* No key/null arrays for this attribute. */
447+ Assert ((keys [keyattno - 1 ]== NULL )&& (nkeys [keyattno - 1 ]== 0 ));
448+ Assert ((nullkeys [keyattno - 1 ]== NULL )&& (nnullkeys [keyattno - 1 ]== 0 ));
464449
465450tmp = index_getprocinfo (idxRel ,keyattno ,
466451BRIN_PROCNUM_CONSISTENT );
467452fmgr_info_copy (& consistentFn [keyattno - 1 ],tmp ,
468453CurrentMemoryContext );
469454}
470455
471- /* Add key to the per-attribute array. */
472- keys [keyattno - 1 ][nkeys [keyattno - 1 ]]= key ;
473- nkeys [keyattno - 1 ]++ ;
456+ /* Add key to the proper per-attribute array. */
457+ if (key -> sk_flags & SK_ISNULL )
458+ {
459+ if (!nullkeys [keyattno - 1 ])
460+ nullkeys [keyattno - 1 ]= palloc0 (sizeof (ScanKey )* scan -> numberOfKeys );
461+
462+ nullkeys [keyattno - 1 ][nnullkeys [keyattno - 1 ]]= key ;
463+ nnullkeys [keyattno - 1 ]++ ;
464+ }
465+ else
466+ {
467+ if (!keys [keyattno - 1 ])
468+ keys [keyattno - 1 ]= palloc0 (sizeof (ScanKey )* scan -> numberOfKeys );
469+
470+ keys [keyattno - 1 ][nkeys [keyattno - 1 ]]= key ;
471+ nkeys [keyattno - 1 ]++ ;
472+ }
474473}
475474
476475/* allocate an initial in-memory tuple, out of the per-range memcxt */
@@ -549,15 +548,58 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
549548Datum add ;
550549Oid collation ;
551550
552- /* skip attributes without any scan keys */
553- if (nkeys [attno - 1 ]== 0 )
551+ /*
552+ * skip attributes without any scan keys (both regular and
553+ * IS [NOT] NULL)
554+ */
555+ if (nkeys [attno - 1 ]== 0 && nnullkeys [attno - 1 ]== 0 )
554556continue ;
555557
556558bval = & dtup -> bt_columns [attno - 1 ];
557559
560+ /*
561+ * First check if there are any IS [NOT] NULL scan keys,
562+ * and if we're violating them. In that case we can
563+ * terminate early, without invoking the support function.
564+ *
565+ * As there may be more keys, we can only detemine
566+ * mismatch within this loop.
567+ */
568+ if (bdesc -> bd_info [attno - 1 ]-> oi_regular_nulls &&
569+ !check_null_keys (bval ,nullkeys [attno - 1 ],
570+ nnullkeys [attno - 1 ]))
571+ {
572+ /*
573+ * If any of the IS [NOT] NULL keys failed, the page
574+ * range as a whole can't pass. So terminate the loop.
575+ */
576+ addrange = false;
577+ break ;
578+ }
579+
580+ /*
581+ * So either there are no IS [NOT] NULL keys, or all
582+ * passed. If there are no regular scan keys, we're done -
583+ * the page range matches. If there are regular keys, but
584+ * the page range is marked as 'all nulls' it can't
585+ * possibly pass (we're assuming the operators are
586+ * strict).
587+ */
588+
589+ /* No regular scan keys - page range as a whole passes. */
590+ if (!nkeys [attno - 1 ])
591+ continue ;
592+
558593Assert ((nkeys [attno - 1 ]> 0 )&&
559594 (nkeys [attno - 1 ] <=scan -> numberOfKeys ));
560595
596+ /* If it is all nulls, it cannot possibly be consistent. */
597+ if (bval -> bv_allnulls )
598+ {
599+ addrange = false;
600+ break ;
601+ }
602+
561603/*
562604 * Check whether the scan key is consistent with the page
563605 * range values; if so, have the pages in the range added
@@ -663,7 +705,6 @@ brinbuildCallback(Relation index,
663705{
664706BrinBuildState * state = (BrinBuildState * )brstate ;
665707BlockNumber thisblock ;
666- int i ;
667708
668709thisblock = ItemPointerGetBlockNumber (tid );
669710
@@ -692,25 +733,8 @@ brinbuildCallback(Relation index,
692733}
693734
694735/* Accumulate the current tuple into the running state */
695- for (i = 0 ;i < state -> bs_bdesc -> bd_tupdesc -> natts ;i ++ )
696- {
697- FmgrInfo * addValue ;
698- BrinValues * col ;
699- Form_pg_attribute attr = TupleDescAttr (state -> bs_bdesc -> bd_tupdesc ,i );
700-
701- col = & state -> bs_dtuple -> bt_columns [i ];
702- addValue = index_getprocinfo (index ,i + 1 ,
703- BRIN_PROCNUM_ADDVALUE );
704-
705- /*
706- * Update dtuple state, if and as necessary.
707- */
708- FunctionCall4Coll (addValue ,
709- attr -> attcollation ,
710- PointerGetDatum (state -> bs_bdesc ),
711- PointerGetDatum (col ),
712- values [i ],isnull [i ]);
713- }
736+ (void )add_values_to_range (index ,state -> bs_bdesc ,state -> bs_dtuple ,
737+ values ,isnull );
714738}
715739
716740/*
@@ -1489,6 +1513,39 @@ union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
14891513FmgrInfo * unionFn ;
14901514BrinValues * col_a = & a -> bt_columns [keyno ];
14911515BrinValues * col_b = & db -> bt_columns [keyno ];
1516+ BrinOpcInfo * opcinfo = bdesc -> bd_info [keyno ];
1517+
1518+ if (opcinfo -> oi_regular_nulls )
1519+ {
1520+ /* Adjust "hasnulls". */
1521+ if (!col_a -> bv_hasnulls && col_b -> bv_hasnulls )
1522+ col_a -> bv_hasnulls = true;
1523+
1524+ /* If there are no values in B, there's nothing left to do. */
1525+ if (col_b -> bv_allnulls )
1526+ continue ;
1527+
1528+ /*
1529+ * Adjust "allnulls". If A doesn't have values, just copy the
1530+ * values from B into A, and we're done. We cannot run the
1531+ * operators in this case, because values in A might contain
1532+ * garbage. Note we already established that B contains values.
1533+ */
1534+ if (col_a -> bv_allnulls )
1535+ {
1536+ int i ;
1537+
1538+ col_a -> bv_allnulls = false;
1539+
1540+ for (i = 0 ;i < opcinfo -> oi_nstored ;i ++ )
1541+ col_a -> bv_values [i ]=
1542+ datumCopy (col_b -> bv_values [i ],
1543+ opcinfo -> oi_typcache [i ]-> typbyval ,
1544+ opcinfo -> oi_typcache [i ]-> typlen );
1545+
1546+ continue ;
1547+ }
1548+ }
14921549
14931550unionFn = index_getprocinfo (bdesc -> bd_index ,keyno + 1 ,
14941551BRIN_PROCNUM_UNION );
@@ -1542,3 +1599,103 @@ brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
15421599 */
15431600FreeSpaceMapVacuum (idxrel );
15441601}
1602+
1603+ static bool
1604+ add_values_to_range (Relation idxRel ,BrinDesc * bdesc ,BrinMemTuple * dtup ,
1605+ Datum * values ,bool * nulls )
1606+ {
1607+ int keyno ;
1608+ bool modified = false;
1609+
1610+ /*
1611+ * Compare the key values of the new tuple to the stored index values; our
1612+ * deformed tuple will get updated if the new tuple doesn't fit the
1613+ * original range (note this means we can't break out of the loop early).
1614+ * Make a note of whether this happens, so that we know to insert the
1615+ * modified tuple later.
1616+ */
1617+ for (keyno = 0 ;keyno < bdesc -> bd_tupdesc -> natts ;keyno ++ )
1618+ {
1619+ Datum result ;
1620+ BrinValues * bval ;
1621+ FmgrInfo * addValue ;
1622+
1623+ bval = & dtup -> bt_columns [keyno ];
1624+
1625+ if (bdesc -> bd_info [keyno ]-> oi_regular_nulls && nulls [keyno ])
1626+ {
1627+ /*
1628+ * If the new value is null, we record that we saw it if it's the
1629+ * first one; otherwise, there's nothing to do.
1630+ */
1631+ if (!bval -> bv_hasnulls )
1632+ {
1633+ bval -> bv_hasnulls = true;
1634+ modified = true;
1635+ }
1636+
1637+ continue ;
1638+ }
1639+
1640+ addValue = index_getprocinfo (idxRel ,keyno + 1 ,
1641+ BRIN_PROCNUM_ADDVALUE );
1642+ result = FunctionCall4Coll (addValue ,
1643+ idxRel -> rd_indcollation [keyno ],
1644+ PointerGetDatum (bdesc ),
1645+ PointerGetDatum (bval ),
1646+ values [keyno ],
1647+ nulls [keyno ]);
1648+ /* if that returned true, we need to insert the updated tuple */
1649+ modified |=DatumGetBool (result );
1650+ }
1651+
1652+ return modified ;
1653+ }
1654+
1655+ static bool
1656+ check_null_keys (BrinValues * bval ,ScanKey * nullkeys ,int nnullkeys )
1657+ {
1658+ int keyno ;
1659+
1660+ /*
1661+ * First check if there are any IS [NOT] NULL scan keys, and if we're
1662+ * violating them.
1663+ */
1664+ for (keyno = 0 ;keyno < nnullkeys ;keyno ++ )
1665+ {
1666+ ScanKey key = nullkeys [keyno ];
1667+
1668+ Assert (key -> sk_attno == bval -> bv_attno );
1669+
1670+ /* Handle only IS NULL/IS NOT NULL tests */
1671+ if (!(key -> sk_flags & SK_ISNULL ))
1672+ continue ;
1673+
1674+ if (key -> sk_flags & SK_SEARCHNULL )
1675+ {
1676+ /* IS NULL scan key, but range has no NULLs */
1677+ if (!bval -> bv_allnulls && !bval -> bv_hasnulls )
1678+ return false;
1679+ }
1680+ else if (key -> sk_flags & SK_SEARCHNOTNULL )
1681+ {
1682+ /*
1683+ * For IS NOT NULL, we can only skip ranges that are known to have
1684+ * only nulls.
1685+ */
1686+ if (bval -> bv_allnulls )
1687+ return false;
1688+ }
1689+ else
1690+ {
1691+ /*
1692+ * Neither IS NULL nor IS NOT NULL was used; assume all indexable
1693+ * operators are strict and thus return false with NULL value in
1694+ * the scan key.
1695+ */
1696+ return false;
1697+ }
1698+ }
1699+
1700+ return true;
1701+ }