Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit719cc49

Browse files
committed
Add WAVG aggregate
1 parent4af48e3 commit719cc49

File tree

4 files changed

+145
-3
lines changed

4 files changed

+145
-3
lines changed

‎expected/test.out‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ insert into s values(1.0),(2.0),(null),(3.0),(null),(4.0);
55
select populate(destination:='v'::regclass, source:='s'::regclass);
66
populate
77
----------
8-
8+
6
99
(1 row)
1010

1111
select unnest(v.*) from v where x > 1;

‎vops--1.0.sql‎

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ create function vops_var_pop_final(state internal) returns float8 as 'MODULE_PAT
256256
createfunctionvops_var_samp_final(state internal) returns float8as'MODULE_PATHNAME' language C parallel safe strict;
257257
createfunctionvops_stddev_pop_final(state internal) returns float8as'MODULE_PATHNAME' language C parallel safe strict;
258258
createfunctionvops_stddev_samp_final(state internal) returns float8as'MODULE_PATHNAME' language C parallel safe strict;
259+
createfunctionvops_wavg_final(state internal) returns float8as'MODULE_PATHNAME' language C parallel safe;
259260

260261
createfunctionvops_char_var_accumulate(state internal, val vops_char) returns internalas'MODULE_PATHNAME' language C parallel safe;
261262
CREATEAGGREGATEvar_pop(vops_char) (
@@ -324,6 +325,18 @@ CREATE AGGREGATE stddev(vops_char) (
324325
PARALLEL= SAFE
325326
);
326327

328+
createfunctionvops_char_wavg_accumulate(state internal, x vops_char, y vops_char) returns internalas'MODULE_PATHNAME' language C parallel safe;
329+
CREATEAGGREGATEwavg(vops_char, vops_char) (
330+
SFUNC= vops_char_wavg_accumulate,
331+
STYPE= internal,
332+
SSPACE=24,
333+
FINALFUNC= vops_wavg_final,
334+
COMBINEFUNC= vops_var_combine,
335+
SERIALFUNC= vops_var_serial,
336+
DESERIALFUNC= vops_var_deserial,
337+
PARALLEL= SAFE
338+
);
339+
327340
createfunctionvops_char_max_accumulate(statechar, val vops_char) returnscharas'MODULE_PATHNAME' language C parallel safe;
328341
CREATEAGGREGATEmax(vops_char) (
329342
SFUNC= vops_char_max_accumulate,
@@ -584,6 +597,17 @@ CREATE AGGREGATE stddev(vops_int2) (
584597
PARALLEL= SAFE
585598
);
586599

600+
createfunctionvops_int2_wavg_accumulate(state internal, x vops_int2, y vops_int2) returns internalas'MODULE_PATHNAME' language C parallel safe;
601+
CREATEAGGREGATEwavg(vops_int2, vops_int2) (
602+
SFUNC= vops_int2_wavg_accumulate,
603+
STYPE= internal,
604+
SSPACE=24,
605+
FINALFUNC= vops_wavg_final,
606+
COMBINEFUNC= vops_var_combine,
607+
SERIALFUNC= vops_var_serial,
608+
DESERIALFUNC= vops_var_deserial,
609+
PARALLEL= SAFE
610+
);
587611

588612
createfunctionvops_int2_avg_accumulate(state internal, val vops_int2) returns internalas'MODULE_PATHNAME' language C parallel safe;
589613
CREATEAGGREGATEavg(vops_int2) (
@@ -870,6 +894,18 @@ CREATE AGGREGATE stddev(vops_int4) (
870894
PARALLEL= SAFE
871895
);
872896

897+
createfunctionvops_int4_wavg_accumulate(state internal, x vops_int4, y vops_int4) returns internalas'MODULE_PATHNAME' language C parallel safe;
898+
CREATEAGGREGATEwavg(vops_int4, vops_int4) (
899+
SFUNC= vops_int4_wavg_accumulate,
900+
STYPE= internal,
901+
SSPACE=24,
902+
FINALFUNC= vops_wavg_final,
903+
COMBINEFUNC= vops_var_combine,
904+
SERIALFUNC= vops_var_serial,
905+
DESERIALFUNC= vops_var_deserial,
906+
PARALLEL= SAFE
907+
);
908+
873909
createfunctionvops_int4_avg_accumulate(state internal, val vops_int4) returns internalas'MODULE_PATHNAME' language C parallel safe;
874910
CREATEAGGREGATEavg(vops_int4) (
875911
SFUNC= vops_int4_avg_accumulate,
@@ -1152,6 +1188,18 @@ CREATE AGGREGATE stddev(vops_date) (
11521188
PARALLEL= SAFE
11531189
);
11541190

1191+
createfunctionvops_date_wavg_accumulate(state internal, x vops_date, y vops_date) returns internalas'MODULE_PATHNAME','vops_int4_wavg_accumulate' language C parallel safe;
1192+
CREATEAGGREGATEwavg(vops_date, vops_date) (
1193+
SFUNC= vops_date_wavg_accumulate,
1194+
STYPE= internal,
1195+
SSPACE=24,
1196+
FINALFUNC= vops_wavg_final,
1197+
COMBINEFUNC= vops_var_combine,
1198+
SERIALFUNC= vops_var_serial,
1199+
DESERIALFUNC= vops_var_deserial,
1200+
PARALLEL= SAFE
1201+
);
1202+
11551203
createfunctionvops_date_avg_accumulate(state internal, val vops_date) returns internalas'MODULE_PATHNAME','vops_int4_avg_accumulate' language C parallel safe;
11561204
CREATEAGGREGATEavg(vops_date) (
11571205
SFUNC= vops_date_avg_accumulate,
@@ -1436,6 +1484,18 @@ CREATE AGGREGATE stddev(vops_timestamp) (
14361484
PARALLEL= SAFE
14371485
);
14381486

1487+
createfunctionvops_timestamp_wavg_accumulate(state internal, x vops_timestamp, y vops_timestamp) returns internalas'MODULE_PATHNAME','vops_int8_wavg_accumulate' language C parallel safe;
1488+
CREATEAGGREGATEwavg(vops_timestamp, vops_timestamp) (
1489+
SFUNC= vops_timestamp_wavg_accumulate,
1490+
STYPE= internal,
1491+
SSPACE=24,
1492+
FINALFUNC= vops_wavg_final,
1493+
COMBINEFUNC= vops_var_combine,
1494+
SERIALFUNC= vops_var_serial,
1495+
DESERIALFUNC= vops_var_deserial,
1496+
PARALLEL= SAFE
1497+
);
1498+
14391499
createfunctionvops_timestamp_avg_accumulate(state internal, val vops_timestamp) returns internalas'MODULE_PATHNAME','vops_int8_avg_accumulate' language C parallel safe;
14401500
CREATEAGGREGATEavg(vops_timestamp) (
14411501
SFUNC= vops_timestamp_avg_accumulate,
@@ -1721,6 +1781,18 @@ CREATE AGGREGATE stddev(vops_int8) (
17211781
PARALLEL= SAFE
17221782
);
17231783

1784+
createfunctionvops_int8_wavg_accumulate(state internal, x vops_int8, y vops_int8) returns internalas'MODULE_PATHNAME' language C parallel safe;
1785+
CREATEAGGREGATEwavg(vops_int8, vops_int8) (
1786+
SFUNC= vops_int8_wavg_accumulate,
1787+
STYPE= internal,
1788+
SSPACE=24,
1789+
FINALFUNC= vops_wavg_final,
1790+
COMBINEFUNC= vops_var_combine,
1791+
SERIALFUNC= vops_var_serial,
1792+
DESERIALFUNC= vops_var_deserial,
1793+
PARALLEL= SAFE
1794+
);
1795+
17241796
createfunctionvops_int8_avg_accumulate(state internal, val vops_int8) returns internalas'MODULE_PATHNAME' language C parallel safe;
17251797
CREATEAGGREGATEavg(vops_int8) (
17261798
SFUNC= vops_int8_avg_accumulate,
@@ -1995,6 +2067,18 @@ CREATE AGGREGATE stddev(vops_float4) (
19952067
PARALLEL= SAFE
19962068
);
19972069

2070+
createfunctionvops_float4_wavg_accumulate(state internal, x vops_float4, y vops_float4) returns internalas'MODULE_PATHNAME' language C parallel safe;
2071+
CREATEAGGREGATEwavg(vops_float4,vops_float4) (
2072+
SFUNC= vops_float4_wavg_accumulate,
2073+
STYPE= internal,
2074+
SSPACE=24,
2075+
FINALFUNC= vops_wavg_final,
2076+
COMBINEFUNC= vops_var_combine,
2077+
SERIALFUNC= vops_var_serial,
2078+
DESERIALFUNC= vops_var_deserial,
2079+
PARALLEL= SAFE
2080+
);
2081+
19982082
createfunctionvops_float4_avg_accumulate(state internal, val vops_float4) returns internalas'MODULE_PATHNAME' language C parallel safe;
19992083
CREATEAGGREGATEavg(vops_float4) (
20002084
SFUNC= vops_float4_avg_accumulate,
@@ -2269,6 +2353,19 @@ CREATE AGGREGATE stddev(vops_float8) (
22692353
PARALLEL= SAFE
22702354
);
22712355

2356+
createfunctionvops_float8_wavg_accumulate(state internal, x vops_float8, y vops_float8) returns internalas'MODULE_PATHNAME' language C parallel safe;
2357+
CREATEAGGREGATEwavg(vops_float8, vops_float8) (
2358+
SFUNC= vops_float8_wavg_accumulate,
2359+
STYPE= internal,
2360+
SSPACE=24,
2361+
FINALFUNC= vops_wavg_final,
2362+
COMBINEFUNC= vops_var_combine,
2363+
SERIALFUNC= vops_var_serial,
2364+
DESERIALFUNC= vops_var_deserial,
2365+
PARALLEL= SAFE
2366+
);
2367+
2368+
22722369
createfunctionvops_float8_avg_accumulate(state internal, val vops_float8) returns internalas'MODULE_PATHNAME' language C parallel safe;
22732370
CREATEAGGREGATEavg(vops_float8) (
22742371
SFUNC= vops_float8_avg_accumulate,

‎vops.c‎

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,36 @@ static bool is_vops_type(Oid typeid)
710710
}\
711711
} \
712712

713+
#defineWAVG_AGG(TYPE)\
714+
PG_FUNCTION_INFO_V1(vops_##TYPE##_wavg_accumulate);\
715+
Datum vops_##TYPE##_wavg_accumulate(PG_FUNCTION_ARGS)\
716+
{\
717+
int i;\
718+
vops_var_state* state = PG_ARGISNULL(0) ? NULL : (vops_var_state*)PG_GETARG_POINTER(0); \
719+
vops_##TYPE* price = (vops_##TYPE*)PG_GETARG_POINTER(1);\
720+
vops_##TYPE* volume = (vops_##TYPE*)PG_GETARG_POINTER(2);\
721+
for (i = 0; i < TILE_SIZE; i++) {\
722+
if ((filter_mask & ~price->nullmask & ~volume->nullmask) & ((uint64)1 << i)) { \
723+
if (state == NULL) {\
724+
MemoryContext agg_context;\
725+
MemoryContext old_context;\
726+
if (!AggCheckCallContext(fcinfo, &agg_context))\
727+
elog(ERROR, "aggregate function called in non-aggregate context"); \
728+
old_context = MemoryContextSwitchTo(agg_context);\
729+
state = (vops_var_state*)palloc0(sizeof(vops_var_state)); \
730+
MemoryContextSwitchTo(old_context);\
731+
}\
732+
state->sum += (double)price->payload[i]*volume->payload[i]; \
733+
state->sum2 += volume->payload[i];\
734+
}\
735+
}\
736+
if (state == NULL) {\
737+
PG_RETURN_NULL();\
738+
} else {\
739+
PG_RETURN_POINTER(state);\
740+
}\
741+
} \
742+
713743
#defineVAR_AGG(TYPE)\
714744
PG_FUNCTION_INFO_V1(vops_##TYPE##_var_accumulate);\
715745
Datum vops_##TYPE##_var_accumulate(PG_FUNCTION_ARGS)\
@@ -1108,7 +1138,6 @@ static void end_batch_insert()
11081138
FreeExecutorState(estate);
11091139
}
11101140

1111-
11121141
PG_FUNCTION_INFO_V1(vops_avg_final);
11131142
Datumvops_avg_final(PG_FUNCTION_ARGS)
11141143
{
@@ -1173,6 +1202,13 @@ Datum vops_avg_deserial(PG_FUNCTION_ARGS)
11731202
PG_RETURN_POINTER(state);
11741203
}
11751204

1205+
PG_FUNCTION_INFO_V1(vops_wavg_final);
1206+
Datumvops_wavg_final(PG_FUNCTION_ARGS)
1207+
{
1208+
vops_var_state*state= (vops_var_state*)PG_GETARG_POINTER(0);
1209+
PG_RETURN_FLOAT8(state->sum /state->sum2);
1210+
}
1211+
11761212
PG_FUNCTION_INFO_V1(vops_var_samp_final);
11771213
Datumvops_var_samp_final(PG_FUNCTION_ARGS)
11781214
{
@@ -1385,6 +1421,7 @@ Datum vops_agg_deserial(PG_FUNCTION_ARGS)
13851421
LAG_WIN(TYPE,CTYPE)\
13861422
AVG_AGG(TYPE)\
13871423
VAR_AGG(TYPE)\
1424+
WAVG_AGG(TYPE)\
13881425
MINMAX_AGG(TYPE,CTYPE,GCTYPE,min,<)\
13891426
MINMAX_AGG(TYPE,CTYPE,GCTYPE,max,>)\
13901427
MINMAX_WIN(TYPE,CTYPE,min,<)\

‎vops.html‎

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ <h3><a name="aggregates">Vector aggregates</a></h3>
267267
<p>
268268
OLAP queries usually perform some kind of aggregation of large volumes of data. These includes<code>grand</code> aggregates which are calculated for the whole
269269
table or aggregates with<code>group by</code> which are calculated for each group.
270-
VOPS implements all standard SQL aggregates:<code>count, min, max, sum, avg</code>. Them can be used exactly in the same way as in normal SQL queries:
270+
VOPS implements all standard SQL aggregates:<code>count, min, max, sum, avg, var_pop, var_sampl, variance, stddev_pop, stddev_samp, stddev</code>. Them can be used exactly in the same way as in normal SQL queries:
271271
</p>
272272
<pre>
273273
select sum(l_extendedprice*l_discount) as revenue
@@ -277,6 +277,14 @@ <h3><a name="aggregates">Vector aggregates</a></h3>
277277
& (l_quantity &lt; 24));
278278
</pre>
279279

280+
<p>
281+
Also VOPS provides weighted average aggregate VWAP which can be used to calculate volume-weighted average price:
282+
</p>
283+
284+
<pre>
285+
select wavg(l_extendedprice,l_quantity) from vops_lineitem;
286+
</pre>
287+
280288
<p>
281289
Using aggregation with group by is more complex. VOPS provides two functions for it:<code>map</code> and<code>reduce</code>.
282290
The work is actually done by<b>map</b>(<i>group_by_expression</i>,<i>aggregate_list</i>,<i>expr</i> {,<i>expr</i> })

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp