Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc4649cc

Browse files
committed
Refactor LogicalTapeSet/LogicalTape interface.
All the tape functions, like LogicalTapeRead and LogicalTapeWrite, nowtake a LogicalTape as argument, instead of LogicalTapeSet+tape number.You can create any number of LogicalTapes in a single LogicalTapeSet, andyou don't need to decide the number upfront, when you create the tape set.This makes the tape management in hash agg spilling in nodeAgg.c simpler.Discussion:https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fiReviewed-by: Peter Geoghegan, Zhihong Yu, John Naylor
1 parent409f9ca commitc4649cc

File tree

5 files changed

+359
-553
lines changed

5 files changed

+359
-553
lines changed

‎src/backend/executor/nodeAgg.c

Lines changed: 49 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,16 @@
208208
*
209209
* Spilled data is written to logical tapes. These provide better control
210210
* over memory usage, disk space, and the number of files than if we were
211-
* to use a BufFile for each spill.
211+
* to use a BufFile for each spill. We don't know the number of tapes needed
212+
* at the start of the algorithm (because it can recurse), so a tape set is
213+
* allocated at the beginning, and individual tapes are created as needed.
214+
* As a particular tape is read, logtape.c recycles its disk space. When a
215+
* tape is read to completion, it is destroyed entirely.
216+
*
217+
* Tapes' buffers can take up substantial memory when many tapes are open at
218+
* once. We only need one tape open at a time in read mode (using a buffer
219+
* that's a multiple of BLCKSZ); but we need one tape open in write mode (each
220+
* requiring a buffer of size BLCKSZ) for each partition.
212221
*
213222
* Note that it's possible for transition states to start small but then
214223
* grow very large; for instance in the case of ARRAY_AGG. In such cases,
@@ -311,27 +320,6 @@
311320
*/
312321
#defineCHUNKHDRSZ 16
313322

314-
/*
315-
* Track all tapes needed for a HashAgg that spills. We don't know the maximum
316-
* number of tapes needed at the start of the algorithm (because it can
317-
* recurse), so one tape set is allocated and extended as needed for new
318-
* tapes. When a particular tape is already read, rewind it for write mode and
319-
* put it in the free list.
320-
*
321-
* Tapes' buffers can take up substantial memory when many tapes are open at
322-
* once. We only need one tape open at a time in read mode (using a buffer
323-
* that's a multiple of BLCKSZ); but we need one tape open in write mode (each
324-
* requiring a buffer of size BLCKSZ) for each partition.
325-
*/
326-
typedefstructHashTapeInfo
327-
{
328-
LogicalTapeSet*tapeset;
329-
intntapes;
330-
int*freetapes;
331-
intnfreetapes;
332-
intfreetapes_alloc;
333-
}HashTapeInfo;
334-
335323
/*
336324
* Represents partitioned spill data for a single hashtable. Contains the
337325
* necessary information to route tuples to the correct partition, and to
@@ -343,9 +331,8 @@ typedef struct HashTapeInfo
343331
*/
344332
typedefstructHashAggSpill
345333
{
346-
LogicalTapeSet*tapeset;/* borrowed reference to tape set */
347334
intnpartitions;/* number of partitions */
348-
int*partitions;/* spill partitiontape numbers */
335+
LogicalTape**partitions;/* spill partitiontapes */
349336
int64*ntuples;/* number of tuples in each partition */
350337
uint32mask;/* mask to find partition from hash value */
351338
intshift;/* after masking, shift by this amount */
@@ -365,8 +352,7 @@ typedef struct HashAggBatch
365352
{
366353
intsetno;/* grouping set */
367354
intused_bits;/* number of bits of hash already used */
368-
LogicalTapeSet*tapeset;/* borrowed reference to tape set */
369-
intinput_tapenum;/* input partition tape */
355+
LogicalTape*input_tape;/* input partition tape */
370356
int64input_tuples;/* number of tuples in this batch */
371357
doubleinput_card;/* estimated group cardinality */
372358
}HashAggBatch;
@@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
442428
intnpartitions);
443429
staticvoidhashagg_finish_initial_spills(AggState*aggstate);
444430
staticvoidhashagg_reset_spill_state(AggState*aggstate);
445-
staticHashAggBatch*hashagg_batch_new(LogicalTapeSet*tapeset,
446-
intinput_tapenum,intsetno,
431+
staticHashAggBatch*hashagg_batch_new(LogicalTape*input_tape,intsetno,
447432
int64input_tuples,doubleinput_card,
448433
intused_bits);
449434
staticMinimalTuplehashagg_batch_read(HashAggBatch*batch,uint32*hashp);
450-
staticvoidhashagg_spill_init(HashAggSpill*spill,HashTapeInfo*tapeinfo,
435+
staticvoidhashagg_spill_init(HashAggSpill*spill,LogicalTapeSet*lts,
451436
intused_bits,doubleinput_groups,
452437
doublehashentrysize);
453438
staticSizehashagg_spill_tuple(AggState*aggstate,HashAggSpill*spill,
454439
TupleTableSlot*slot,uint32hash);
455440
staticvoidhashagg_spill_finish(AggState*aggstate,HashAggSpill*spill,
456441
intsetno);
457-
staticvoidhashagg_tapeinfo_init(AggState*aggstate);
458-
staticvoidhashagg_tapeinfo_assign(HashTapeInfo*tapeinfo,int*dest,
459-
intndest);
460-
staticvoidhashagg_tapeinfo_release(HashTapeInfo*tapeinfo,inttapenum);
461442
staticDatumGetAggInitVal(DatumtextInitVal,Oidtranstype);
462443
staticvoidbuild_pertrans_for_aggref(AggStatePerTranspertrans,
463444
AggState*aggstate,EState*estate,
@@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
18871868

18881869
if (!aggstate->hash_ever_spilled)
18891870
{
1890-
Assert(aggstate->hash_tapeinfo==NULL);
1871+
Assert(aggstate->hash_tapeset==NULL);
18911872
Assert(aggstate->hash_spills==NULL);
18921873

18931874
aggstate->hash_ever_spilled= true;
18941875

1895-
hashagg_tapeinfo_init(aggstate);
1876+
aggstate->hash_tapeset=LogicalTapeSetCreate(true,NULL,-1);
18961877

18971878
aggstate->hash_spills=palloc(sizeof(HashAggSpill)*aggstate->num_hashes);
18981879

@@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
19011882
AggStatePerHashperhash=&aggstate->perhash[setno];
19021883
HashAggSpill*spill=&aggstate->hash_spills[setno];
19031884

1904-
hashagg_spill_init(spill,aggstate->hash_tapeinfo,0,
1885+
hashagg_spill_init(spill,aggstate->hash_tapeset,0,
19051886
perhash->aggnode->numGroups,
19061887
aggstate->hashentrysize);
19071888
}
@@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
19431924
aggstate->hash_mem_peak=total_mem;
19441925

19451926
/* update disk usage */
1946-
if (aggstate->hash_tapeinfo!=NULL)
1927+
if (aggstate->hash_tapeset!=NULL)
19471928
{
1948-
uint64disk_used=LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset)* (BLCKSZ /1024);
1929+
uint64disk_used=LogicalTapeSetBlocks(aggstate->hash_tapeset)* (BLCKSZ /1024);
19491930

19501931
if (aggstate->hash_disk_used<disk_used)
19511932
aggstate->hash_disk_used=disk_used;
@@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
21322113
TupleTableSlot*slot=aggstate->tmpcontext->ecxt_outertuple;
21332114

21342115
if (spill->partitions==NULL)
2135-
hashagg_spill_init(spill,aggstate->hash_tapeinfo,0,
2116+
hashagg_spill_init(spill,aggstate->hash_tapeset,0,
21362117
perhash->aggnode->numGroups,
21372118
aggstate->hashentrysize);
21382119

@@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate)
25972578
HashAggBatch*batch;
25982579
AggStatePerHashperhash;
25992580
HashAggSpillspill;
2600-
HashTapeInfo*tapeinfo=aggstate->hash_tapeinfo;
2581+
LogicalTapeSet*tapeset=aggstate->hash_tapeset;
26012582
boolspill_initialized= false;
26022583

26032584
if (aggstate->hash_batches==NIL)
@@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate)
26932674
* that we don't assign tapes that will never be used.
26942675
*/
26952676
spill_initialized= true;
2696-
hashagg_spill_init(&spill,tapeinfo,batch->used_bits,
2677+
hashagg_spill_init(&spill,tapeset,batch->used_bits,
26972678
batch->input_card,aggstate->hashentrysize);
26982679
}
26992680
/* no memory for a new group, spill */
@@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate)
27092690
ResetExprContext(aggstate->tmpcontext);
27102691
}
27112692

2712-
hashagg_tapeinfo_release(tapeinfo,batch->input_tapenum);
2693+
LogicalTapeClose(batch->input_tape);
27132694

27142695
/* change back to phase 0 */
27152696
aggstate->current_phase=0;
@@ -2884,75 +2865,14 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
28842865
returnNULL;
28852866
}
28862867

2887-
/*
2888-
* Initialize HashTapeInfo
2889-
*/
2890-
staticvoid
2891-
hashagg_tapeinfo_init(AggState*aggstate)
2892-
{
2893-
HashTapeInfo*tapeinfo=palloc(sizeof(HashTapeInfo));
2894-
intinit_tapes=16;/* expanded dynamically */
2895-
2896-
tapeinfo->tapeset=LogicalTapeSetCreate(init_tapes, true,NULL,NULL,-1);
2897-
tapeinfo->ntapes=init_tapes;
2898-
tapeinfo->nfreetapes=init_tapes;
2899-
tapeinfo->freetapes_alloc=init_tapes;
2900-
tapeinfo->freetapes=palloc(init_tapes*sizeof(int));
2901-
for (inti=0;i<init_tapes;i++)
2902-
tapeinfo->freetapes[i]=i;
2903-
2904-
aggstate->hash_tapeinfo=tapeinfo;
2905-
}
2906-
2907-
/*
2908-
* Assign unused tapes to spill partitions, extending the tape set if
2909-
* necessary.
2910-
*/
2911-
staticvoid
2912-
hashagg_tapeinfo_assign(HashTapeInfo*tapeinfo,int*partitions,
2913-
intnpartitions)
2914-
{
2915-
intpartidx=0;
2916-
2917-
/* use free tapes if available */
2918-
while (partidx<npartitions&&tapeinfo->nfreetapes>0)
2919-
partitions[partidx++]=tapeinfo->freetapes[--tapeinfo->nfreetapes];
2920-
2921-
if (partidx<npartitions)
2922-
{
2923-
LogicalTapeSetExtend(tapeinfo->tapeset,npartitions-partidx);
2924-
2925-
while (partidx<npartitions)
2926-
partitions[partidx++]=tapeinfo->ntapes++;
2927-
}
2928-
}
2929-
2930-
/*
2931-
* After a tape has already been written to and then read, this function
2932-
* rewinds it for writing and adds it to the free list.
2933-
*/
2934-
staticvoid
2935-
hashagg_tapeinfo_release(HashTapeInfo*tapeinfo,inttapenum)
2936-
{
2937-
/* rewinding frees the buffer while not in use */
2938-
LogicalTapeRewindForWrite(tapeinfo->tapeset,tapenum);
2939-
if (tapeinfo->freetapes_alloc==tapeinfo->nfreetapes)
2940-
{
2941-
tapeinfo->freetapes_alloc <<=1;
2942-
tapeinfo->freetapes=repalloc(tapeinfo->freetapes,
2943-
tapeinfo->freetapes_alloc*sizeof(int));
2944-
}
2945-
tapeinfo->freetapes[tapeinfo->nfreetapes++]=tapenum;
2946-
}
2947-
29482868
/*
29492869
* hashagg_spill_init
29502870
*
29512871
* Called after we determined that spilling is necessary. Chooses the number
29522872
* of partitions to create, and initializes them.
29532873
*/
29542874
staticvoid
2955-
hashagg_spill_init(HashAggSpill*spill,HashTapeInfo*tapeinfo,intused_bits,
2875+
hashagg_spill_init(HashAggSpill*spill,LogicalTapeSet*tapeset,intused_bits,
29562876
doubleinput_groups,doublehashentrysize)
29572877
{
29582878
intnpartitions;
@@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
29612881
npartitions=hash_choose_num_partitions(input_groups,hashentrysize,
29622882
used_bits,&partition_bits);
29632883

2964-
spill->partitions=palloc0(sizeof(int)*npartitions);
2884+
spill->partitions=palloc0(sizeof(LogicalTape*)*npartitions);
29652885
spill->ntuples=palloc0(sizeof(int64)*npartitions);
29662886
spill->hll_card=palloc0(sizeof(hyperLogLogState)*npartitions);
29672887

2968-
hashagg_tapeinfo_assign(tapeinfo,spill->partitions,npartitions);
2888+
for (inti=0;i<npartitions;i++)
2889+
spill->partitions[i]=LogicalTapeCreate(tapeset);
29692890

2970-
spill->tapeset=tapeinfo->tapeset;
29712891
spill->shift=32-used_bits-partition_bits;
29722892
spill->mask= (npartitions-1) <<spill->shift;
29732893
spill->npartitions=npartitions;
@@ -2986,11 +2906,10 @@ static Size
29862906
hashagg_spill_tuple(AggState*aggstate,HashAggSpill*spill,
29872907
TupleTableSlot*inputslot,uint32hash)
29882908
{
2989-
LogicalTapeSet*tapeset=spill->tapeset;
29902909
TupleTableSlot*spillslot;
29912910
intpartition;
29922911
MinimalTupletuple;
2993-
inttapenum;
2912+
LogicalTape*tape;
29942913
inttotal_written=0;
29952914
boolshouldFree;
29962915

@@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
30292948
*/
30302949
addHyperLogLog(&spill->hll_card[partition],hash_bytes_uint32(hash));
30312950

3032-
tapenum=spill->partitions[partition];
2951+
tape=spill->partitions[partition];
30332952

3034-
LogicalTapeWrite(tapeset,tapenum, (void*)&hash,sizeof(uint32));
2953+
LogicalTapeWrite(tape, (void*)&hash,sizeof(uint32));
30352954
total_written+=sizeof(uint32);
30362955

3037-
LogicalTapeWrite(tapeset,tapenum, (void*)tuple,tuple->t_len);
2956+
LogicalTapeWrite(tape, (void*)tuple,tuple->t_len);
30382957
total_written+=tuple->t_len;
30392958

30402959
if (shouldFree)
@@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
30502969
* be done.
30512970
*/
30522971
staticHashAggBatch*
3053-
hashagg_batch_new(LogicalTapeSet*tapeset,inttapenum,intsetno,
2972+
hashagg_batch_new(LogicalTape*input_tape,intsetno,
30542973
int64input_tuples,doubleinput_card,intused_bits)
30552974
{
30562975
HashAggBatch*batch=palloc0(sizeof(HashAggBatch));
30572976

30582977
batch->setno=setno;
30592978
batch->used_bits=used_bits;
3060-
batch->tapeset=tapeset;
3061-
batch->input_tapenum=tapenum;
2979+
batch->input_tape=input_tape;
30622980
batch->input_tuples=input_tuples;
30632981
batch->input_card=input_card;
30642982

@@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
30722990
staticMinimalTuple
30732991
hashagg_batch_read(HashAggBatch*batch,uint32*hashp)
30742992
{
3075-
LogicalTapeSet*tapeset=batch->tapeset;
3076-
inttapenum=batch->input_tapenum;
2993+
LogicalTape*tape=batch->input_tape;
30772994
MinimalTupletuple;
30782995
uint32t_len;
30792996
size_tnread;
30802997
uint32hash;
30812998

3082-
nread=LogicalTapeRead(tapeset,tapenum,&hash,sizeof(uint32));
2999+
nread=LogicalTapeRead(tape,&hash,sizeof(uint32));
30833000
if (nread==0)
30843001
returnNULL;
30853002
if (nread!=sizeof(uint32))
30863003
ereport(ERROR,
30873004
(errcode_for_file_access(),
3088-
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3089-
tapenum,sizeof(uint32),nread)));
3005+
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3006+
tape,sizeof(uint32),nread)));
30903007
if (hashp!=NULL)
30913008
*hashp=hash;
30923009

3093-
nread=LogicalTapeRead(tapeset,tapenum,&t_len,sizeof(t_len));
3010+
nread=LogicalTapeRead(tape,&t_len,sizeof(t_len));
30943011
if (nread!=sizeof(uint32))
30953012
ereport(ERROR,
30963013
(errcode_for_file_access(),
3097-
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3098-
tapenum,sizeof(uint32),nread)));
3014+
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3015+
tape,sizeof(uint32),nread)));
30993016

31003017
tuple= (MinimalTuple)palloc(t_len);
31013018
tuple->t_len=t_len;
31023019

3103-
nread=LogicalTapeRead(tapeset,tapenum,
3020+
nread=LogicalTapeRead(tape,
31043021
(void*) ((char*)tuple+sizeof(uint32)),
31053022
t_len-sizeof(uint32));
31063023
if (nread!=t_len-sizeof(uint32))
31073024
ereport(ERROR,
31083025
(errcode_for_file_access(),
3109-
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3110-
tapenum,t_len-sizeof(uint32),nread)));
3026+
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3027+
tape,t_len-sizeof(uint32),nread)));
31113028

31123029
returntuple;
31133030
}
@@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
31643081

31653082
for (i=0;i<spill->npartitions;i++)
31663083
{
3167-
LogicalTapeSet*tapeset=aggstate->hash_tapeinfo->tapeset;
3168-
inttapenum=spill->partitions[i];
3084+
LogicalTape*tape=spill->partitions[i];
31693085
HashAggBatch*new_batch;
31703086
doublecardinality;
31713087

@@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
31773093
freeHyperLogLog(&spill->hll_card[i]);
31783094

31793095
/* rewinding frees the buffer while not in use */
3180-
LogicalTapeRewindForRead(tapeset,tapenum,
3181-
HASHAGG_READ_BUFFER_SIZE);
3096+
LogicalTapeRewindForRead(tape,HASHAGG_READ_BUFFER_SIZE);
31823097

3183-
new_batch=hashagg_batch_new(tapeset,tapenum,setno,
3098+
new_batch=hashagg_batch_new(tape,setno,
31843099
spill->ntuples[i],cardinality,
31853100
used_bits);
31863101
aggstate->hash_batches=lcons(new_batch,aggstate->hash_batches);
@@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate)
32273142
aggstate->hash_batches=NIL;
32283143

32293144
/* close tape set */
3230-
if (aggstate->hash_tapeinfo!=NULL)
3145+
if (aggstate->hash_tapeset!=NULL)
32313146
{
3232-
HashTapeInfo*tapeinfo=aggstate->hash_tapeinfo;
3233-
3234-
LogicalTapeSetClose(tapeinfo->tapeset);
3235-
pfree(tapeinfo->freetapes);
3236-
pfree(tapeinfo);
3237-
aggstate->hash_tapeinfo=NULL;
3147+
LogicalTapeSetClose(aggstate->hash_tapeset);
3148+
aggstate->hash_tapeset=NULL;
32383149
}
32393150
}
32403151

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp