245245#include "catalog/pg_aggregate.h"
246246#include "catalog/pg_proc.h"
247247#include "catalog/pg_type.h"
248+ #include "common/hashfn.h"
248249#include "executor/execExpr.h"
249250#include "executor/executor.h"
250251#include "executor/nodeAgg.h"
252+ #include "lib/hyperloglog.h"
251253#include "miscadmin.h"
252254#include "nodes/makefuncs.h"
253255#include "nodes/nodeFuncs.h"
295297#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
296298#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
297299
300+ /*
301+ * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302+ * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303+ * worst-case error of around 18%. That's effective enough to choose a
304+ * reasonable number of partitions when recursing.
305+ */
306+ #define HASHAGG_HLL_BIT_WIDTH 5
307+
298308/*
299309 * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
300310 * improved?
@@ -339,6 +349,7 @@ typedef struct HashAggSpill
339349int64 * ntuples ;/* number of tuples in each partition */
340350uint32 mask ;/* mask to find partition from hash value */
341351int shift ;/* after masking, shift by this amount */
352+ hyperLogLogState * hll_card ;/* cardinality estimate for contents */
342353}HashAggSpill ;
343354
344355/*
@@ -357,6 +368,7 @@ typedef struct HashAggBatch
357368LogicalTapeSet * tapeset ;/* borrowed reference to tape set */
358369int input_tapenum ;/* input partition tape */
359370int64 input_tuples ;/* number of tuples in this batch */
371+ double input_card ;/* estimated group cardinality */
360372}HashAggBatch ;
361373
362374/* used to find referenced colnos */
@@ -411,7 +423,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
411423static long hash_choose_num_buckets (double hashentrysize ,
412424long estimated_nbuckets ,
413425Size memory );
414- static int hash_choose_num_partitions (uint64 input_groups ,
426+ static int hash_choose_num_partitions (double input_groups ,
415427double hashentrysize ,
416428int used_bits ,
417429int * log2_npartittions );
@@ -432,10 +444,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate);
432444static void hashagg_reset_spill_state (AggState * aggstate );
433445static HashAggBatch * hashagg_batch_new (LogicalTapeSet * tapeset ,
434446int input_tapenum ,int setno ,
435- int64 input_tuples ,int used_bits );
447+ int64 input_tuples ,double input_card ,
448+ int used_bits );
436449static MinimalTuple hashagg_batch_read (HashAggBatch * batch ,uint32 * hashp );
437450static void hashagg_spill_init (HashAggSpill * spill ,HashTapeInfo * tapeinfo ,
438- int used_bits ,uint64 input_tuples ,
451+ int used_bits ,double input_groups ,
439452double hashentrysize );
440453static Size hashagg_spill_tuple (AggState * aggstate ,HashAggSpill * spill ,
441454TupleTableSlot * slot ,uint32 hash );
@@ -1777,7 +1790,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
17771790 * substantially larger than the initial value.
17781791 */
17791792void
1780- hash_agg_set_limits (double hashentrysize ,uint64 input_groups ,int used_bits ,
1793+ hash_agg_set_limits (double hashentrysize ,double input_groups ,int used_bits ,
17811794Size * mem_limit ,uint64 * ngroups_limit ,
17821795int * num_partitions )
17831796{
@@ -1969,7 +1982,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
19691982 * *log2_npartitions to the log2() of the number of partitions.
19701983 */
19711984static int
1972- hash_choose_num_partitions (uint64 input_groups ,double hashentrysize ,
1985+ hash_choose_num_partitions (double input_groups ,double hashentrysize ,
19731986int used_bits ,int * log2_npartitions )
19741987{
19751988Size mem_wanted ;
@@ -2574,7 +2587,6 @@ agg_refill_hash_table(AggState *aggstate)
25742587AggStatePerHash perhash ;
25752588HashAggSpill spill ;
25762589HashTapeInfo * tapeinfo = aggstate -> hash_tapeinfo ;
2577- uint64 ngroups_estimate ;
25782590bool spill_initialized = false;
25792591
25802592if (aggstate -> hash_batches == NIL )
@@ -2583,16 +2595,7 @@ agg_refill_hash_table(AggState *aggstate)
25832595batch = linitial (aggstate -> hash_batches );
25842596aggstate -> hash_batches = list_delete_first (aggstate -> hash_batches );
25852597
2586- /*
2587- * Estimate the number of groups for this batch as the total number of
2588- * tuples in its input file. Although that's a worst case, it's not bad
2589- * here for two reasons: (1) overestimating is better than
2590- * underestimating; and (2) we've already scanned the relation once, so
2591- * it's likely that we've already finalized many of the common values.
2592- */
2593- ngroups_estimate = batch -> input_tuples ;
2594-
2595- hash_agg_set_limits (aggstate -> hashentrysize ,ngroups_estimate ,
2598+ hash_agg_set_limits (aggstate -> hashentrysize ,batch -> input_card ,
25962599batch -> used_bits ,& aggstate -> hash_mem_limit ,
25972600& aggstate -> hash_ngroups_limit ,NULL );
25982601
@@ -2678,7 +2681,7 @@ agg_refill_hash_table(AggState *aggstate)
26782681 */
26792682spill_initialized = true;
26802683hashagg_spill_init (& spill ,tapeinfo ,batch -> used_bits ,
2681- ngroups_estimate ,aggstate -> hashentrysize );
2684+ batch -> input_card ,aggstate -> hashentrysize );
26822685}
26832686/* no memory for a new group, spill */
26842687hashagg_spill_tuple (aggstate ,& spill ,spillslot ,hash );
@@ -2936,7 +2939,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
29362939 */
29372940static void
29382941hashagg_spill_init (HashAggSpill * spill ,HashTapeInfo * tapeinfo ,int used_bits ,
2939- uint64 input_groups ,double hashentrysize )
2942+ double input_groups ,double hashentrysize )
29402943{
29412944int npartitions ;
29422945int partition_bits ;
@@ -2946,13 +2949,17 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
29462949
29472950spill -> partitions = palloc0 (sizeof (int )* npartitions );
29482951spill -> ntuples = palloc0 (sizeof (int64 )* npartitions );
2952+ spill -> hll_card = palloc0 (sizeof (hyperLogLogState )* npartitions );
29492953
29502954hashagg_tapeinfo_assign (tapeinfo ,spill -> partitions ,npartitions );
29512955
29522956spill -> tapeset = tapeinfo -> tapeset ;
29532957spill -> shift = 32 - used_bits - partition_bits ;
29542958spill -> mask = (npartitions - 1 ) <<spill -> shift ;
29552959spill -> npartitions = npartitions ;
2960+
2961+ for (int i = 0 ;i < npartitions ;i ++ )
2962+ initHyperLogLog (& spill -> hll_card [i ],HASHAGG_HLL_BIT_WIDTH );
29562963}
29572964
29582965/*
@@ -3001,6 +3008,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
30013008partition = (hash & spill -> mask ) >>spill -> shift ;
30023009spill -> ntuples [partition ]++ ;
30033010
3011+ /*
3012+ * All hash values destined for a given partition have some bits in
3013+ * common, which causes bad HLL cardinality estimates. Hash the hash to
3014+ * get a more uniform distribution.
3015+ */
3016+ addHyperLogLog (& spill -> hll_card [partition ],hash_bytes_uint32 (hash ));
3017+
30043018tapenum = spill -> partitions [partition ];
30053019
30063020LogicalTapeWrite (tapeset ,tapenum , (void * )& hash ,sizeof (uint32 ));
@@ -3023,7 +3037,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
30233037 */
30243038static HashAggBatch *
30253039hashagg_batch_new (LogicalTapeSet * tapeset ,int tapenum ,int setno ,
3026- int64 input_tuples ,int used_bits )
3040+ int64 input_tuples ,double input_card , int used_bits )
30273041{
30283042HashAggBatch * batch = palloc0 (sizeof (HashAggBatch ));
30293043
@@ -3032,6 +3046,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
30323046batch -> tapeset = tapeset ;
30333047batch -> input_tapenum = tapenum ;
30343048batch -> input_tuples = input_tuples ;
3049+ batch -> input_card = input_card ;
30353050
30363051return batch ;
30373052}
@@ -3135,21 +3150,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
31353150
31363151for (i = 0 ;i < spill -> npartitions ;i ++ )
31373152{
3138- int tapenum = spill -> partitions [i ];
3139- HashAggBatch * new_batch ;
3153+ int tapenum = spill -> partitions [i ];
3154+ HashAggBatch * new_batch ;
3155+ double cardinality ;
31403156
31413157/* if the partition is empty, don't create a new batch of work */
31423158if (spill -> ntuples [i ]== 0 )
31433159continue ;
31443160
3161+ cardinality = estimateHyperLogLog (& spill -> hll_card [i ]);
3162+ freeHyperLogLog (& spill -> hll_card [i ]);
3163+
31453164new_batch = hashagg_batch_new (aggstate -> hash_tapeinfo -> tapeset ,
31463165tapenum ,setno ,spill -> ntuples [i ],
3147- used_bits );
3166+ cardinality , used_bits );
31483167aggstate -> hash_batches = lcons (new_batch ,aggstate -> hash_batches );
31493168aggstate -> hash_batches_used ++ ;
31503169}
31513170
31523171pfree (spill -> ntuples );
3172+ pfree (spill -> hll_card );
31533173pfree (spill -> partitions );
31543174}
31553175