Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita7de3dc

Browse files
committed
Support multi-stage aggregation.
Aggregate nodes now have two new modes: a "partial" mode where theyoutput the unfinalized transition state, and a "finalize" mode wherethey accept unfinalized transition states rather than individualvalues as input.These new modes are not used anywhere yet, but they will be necessaryfor parallel aggregation. The infrastructure also figures to beuseful for cases where we want to aggregate local data and remotedata via the FDW interface, and want to bring back partial aggregatesfrom the remote side that can then be combined with locally generatedpartial aggregates to produce the final value. It may also be usefuleven when neither FDWs nor parallelism are in play, as explained inthe comments in nodeAgg.c.David Rowley and Simon Riggs, reviewed by KaiGai Kohei, HeikkiLinnakangas, Haribabu Kommi, and me.
1 parentc8642d9 commita7de3dc

File tree

21 files changed

+652
-234
lines changed

21 files changed

+652
-234
lines changed

‎doc/src/sgml/ref/create_aggregate.sgml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea
2727
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
2828
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
2929
[ , FINALFUNC_EXTRA ]
30+
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
3031
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
3132
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
3233
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
@@ -45,6 +46,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replac
4546
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
4647
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
4748
[ , FINALFUNC_EXTRA ]
49+
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
4850
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
4951
[ , HYPOTHETICAL ]
5052
)
@@ -58,6 +60,7 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
5860
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
5961
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
6062
[ , FINALFUNC_EXTRA ]
63+
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
6164
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
6265
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
6366
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
@@ -105,12 +108,15 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
105108
functions:
106109
a state transition function
107110
<replaceable class="PARAMETER">sfunc</replaceable>,
108-
and an optional final calculation function
109-
<replaceable class="PARAMETER">ffunc</replaceable>.
111+
an optional final calculation function
112+
<replaceable class="PARAMETER">ffunc</replaceable>,
113+
and an optional combine function
114+
<replaceable class="PARAMETER">combinefunc</replaceable>.
110115
These are used as follows:
111116
<programlisting>
112117
<replaceable class="PARAMETER">sfunc</replaceable>( internal-state, next-data-values ) ---> next-internal-state
113118
<replaceable class="PARAMETER">ffunc</replaceable>( internal-state ) ---> aggregate-value
119+
<replaceable class="PARAMETER">combinefunc</replaceable>( internal-state, internal-state ) ---> next-internal-state
114120
</programlisting>
115121
</para>
116122

@@ -127,6 +133,12 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
127133
is returned as-is.
128134
</para>
129135

136+
<para>
137+
An aggregate function may also supply a combining function, which allows
138+
the aggregation process to be broken down into multiple steps. This
139+
facilitates query optimization techniques such as parallel query.
140+
</para>
141+
130142
<para>
131143
An aggregate function can provide an initial condition,
132144
that is, an initial value for the internal state value.

‎src/backend/catalog/pg_aggregate.c

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ AggregateCreate(const char *aggName,
5757
OidvariadicArgType,
5858
List*aggtransfnName,
5959
List*aggfinalfnName,
60+
List*aggcombinefnName,
6061
List*aggmtransfnName,
6162
List*aggminvtransfnName,
6263
List*aggmfinalfnName,
@@ -77,6 +78,7 @@ AggregateCreate(const char *aggName,
7778
Form_pg_procproc;
7879
Oidtransfn;
7980
Oidfinalfn=InvalidOid;/* can be omitted */
81+
Oidcombinefn=InvalidOid;/* can be omitted */
8082
Oidmtransfn=InvalidOid;/* can be omitted */
8183
Oidminvtransfn=InvalidOid;/* can be omitted */
8284
Oidmfinalfn=InvalidOid;/* can be omitted */
@@ -396,6 +398,30 @@ AggregateCreate(const char *aggName,
396398
}
397399
Assert(OidIsValid(finaltype));
398400

401+
/* handle the combinefn, if supplied */
402+
if (aggcombinefnName)
403+
{
404+
OidcombineType;
405+
406+
/*
407+
* Combine function must have 2 argument, each of which is the
408+
* trans type
409+
*/
410+
fnArgs[0]=aggTransType;
411+
fnArgs[1]=aggTransType;
412+
413+
combinefn=lookup_agg_function(aggcombinefnName,2,fnArgs,
414+
variadicArgType,&combineType);
415+
416+
/* Ensure the return type matches the aggregates trans type */
417+
if (combineType!=aggTransType)
418+
ereport(ERROR,
419+
(errcode(ERRCODE_DATATYPE_MISMATCH),
420+
errmsg("return type of combine function %s is not %s",
421+
NameListToString(aggcombinefnName),
422+
format_type_be(aggTransType))));
423+
}
424+
399425
/*
400426
* If finaltype (i.e. aggregate return type) is polymorphic, inputs must
401427
* be polymorphic also, else parser will fail to deduce result type.
@@ -567,6 +593,7 @@ AggregateCreate(const char *aggName,
567593
values[Anum_pg_aggregate_aggnumdirectargs-1]=Int16GetDatum(numDirectArgs);
568594
values[Anum_pg_aggregate_aggtransfn-1]=ObjectIdGetDatum(transfn);
569595
values[Anum_pg_aggregate_aggfinalfn-1]=ObjectIdGetDatum(finalfn);
596+
values[Anum_pg_aggregate_aggcombinefn-1]=ObjectIdGetDatum(combinefn);
570597
values[Anum_pg_aggregate_aggmtransfn-1]=ObjectIdGetDatum(mtransfn);
571598
values[Anum_pg_aggregate_aggminvtransfn-1]=ObjectIdGetDatum(minvtransfn);
572599
values[Anum_pg_aggregate_aggmfinalfn-1]=ObjectIdGetDatum(mfinalfn);
@@ -618,6 +645,15 @@ AggregateCreate(const char *aggName,
618645
recordDependencyOn(&myself,&referenced,DEPENDENCY_NORMAL);
619646
}
620647

648+
/* Depends on combine function, if any */
649+
if (OidIsValid(combinefn))
650+
{
651+
referenced.classId=ProcedureRelationId;
652+
referenced.objectId=combinefn;
653+
referenced.objectSubId=0;
654+
recordDependencyOn(&myself,&referenced,DEPENDENCY_NORMAL);
655+
}
656+
621657
/* Depends on forward transition function, if any */
622658
if (OidIsValid(mtransfn))
623659
{
@@ -659,7 +695,7 @@ AggregateCreate(const char *aggName,
659695

660696
/*
661697
* lookup_agg_function
662-
* common code for finding transfn, invtransfnandfinalfn
698+
* common code for finding transfn, invtransfn, finalfn,andcombinefn
663699
*
664700
* Returns OID of function, and stores its return type into *rettype
665701
*

‎src/backend/commands/aggregatecmds.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
6161
charaggKind=AGGKIND_NORMAL;
6262
List*transfuncName=NIL;
6363
List*finalfuncName=NIL;
64+
List*combinefuncName=NIL;
6465
List*mtransfuncName=NIL;
6566
List*minvtransfuncName=NIL;
6667
List*mfinalfuncName=NIL;
@@ -124,6 +125,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
124125
transfuncName=defGetQualifiedName(defel);
125126
elseif (pg_strcasecmp(defel->defname,"finalfunc")==0)
126127
finalfuncName=defGetQualifiedName(defel);
128+
elseif (pg_strcasecmp(defel->defname,"combinefunc")==0)
129+
combinefuncName=defGetQualifiedName(defel);
127130
elseif (pg_strcasecmp(defel->defname,"msfunc")==0)
128131
mtransfuncName=defGetQualifiedName(defel);
129132
elseif (pg_strcasecmp(defel->defname,"minvfunc")==0)
@@ -383,6 +386,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
383386
variadicArgType,
384387
transfuncName,/* step function name */
385388
finalfuncName,/* final function name */
389+
combinefuncName,/* combine function name */
386390
mtransfuncName,/* fwd trans function name */
387391
minvtransfuncName,/* inv trans function name */
388392
mfinalfuncName,/* final function name */

‎src/backend/commands/explain.c

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -909,24 +909,36 @@ ExplainNode(PlanState *planstate, List *ancestors,
909909
break;
910910
caseT_Agg:
911911
sname="Aggregate";
912-
switch (((Agg*)plan)->aggstrategy)
913912
{
914-
caseAGG_PLAIN:
915-
pname="Aggregate";
916-
strategy="Plain";
917-
break;
918-
caseAGG_SORTED:
919-
pname="GroupAggregate";
920-
strategy="Sorted";
921-
break;
922-
caseAGG_HASHED:
923-
pname="HashAggregate";
924-
strategy="Hashed";
925-
break;
926-
default:
927-
pname="Aggregate ???";
928-
strategy="???";
929-
break;
913+
Agg*agg= (Agg*)plan;
914+
915+
if (agg->finalizeAggs== false)
916+
operation="Partial";
917+
elseif (agg->combineStates== true)
918+
operation="Finalize";
919+
920+
switch (agg->aggstrategy)
921+
{
922+
caseAGG_PLAIN:
923+
pname="Aggregate";
924+
strategy="Plain";
925+
break;
926+
caseAGG_SORTED:
927+
pname="GroupAggregate";
928+
strategy="Sorted";
929+
break;
930+
caseAGG_HASHED:
931+
pname="HashAggregate";
932+
strategy="Hashed";
933+
break;
934+
default:
935+
pname="Aggregate ???";
936+
strategy="???";
937+
break;
938+
}
939+
940+
if (operation!=NULL)
941+
pname=psprintf("%s %s",operation,pname);
930942
}
931943
break;
932944
caseT_WindowAgg:

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp