Compute Functions#

Arrow supports logical compute operations over inputs of possiblyvarying types.

The standard compute operations are provided by thepyarrow.computemodule and can be used directly:

>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>a=pa.array([1,1,2,3])>>>pc.sum(a)<pyarrow.Int64Scalar: 7>

The grouped aggregation functions raise an exception insteadand need to be used through thepyarrow.Table.group_by() capabilities.SeeGrouped Aggregations for more details.

Standard Compute Functions#

Many compute functions support both array (chunked or not)and scalar inputs, but some will mandate either. For example,sort_indices requires its first and only input to be an array.

Below are a few simple examples:

>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>a=pa.array([1,1,2,3])>>>b=pa.array([4,1,2,8])>>>pc.equal(a,b)<pyarrow.lib.BooleanArray object at 0x7f686e4eef30>[  false,  true,  true,  false]>>>x,y=pa.scalar(7.8),pa.scalar(9.3)>>>pc.multiply(x,y)<pyarrow.DoubleScalar: 72.54>

If you are using a compute function which returns more than one value, resultswill be returned as aStructScalar. You can extract the individual values bycalling thepyarrow.StructScalar.values() method:

>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>a=pa.array([1,1,2,3])>>>pc.min_max(a)<pyarrow.StructScalar: [('min', 1), ('max', 3)]>>>>a,b=pc.min_max(a).values()>>>a<pyarrow.Int64Scalar: 1>>>>b<pyarrow.Int64Scalar: 3>

These functions can do more than just element-by-element operations.Here is an example of sorting a table:

>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>t=pa.table({'x':[1,2,3],'y':[3,2,1]})>>>i=pc.sort_indices(t,sort_keys=[('y','ascending')])>>>i<pyarrow.lib.UInt64Array object at 0x7fcee5df75e8>[  2,  1,  0]

For a complete list of the compute functions that PyArrow providesyou can refer toCompute Functions reference.

Grouped Aggregations#

PyArrow supports grouped aggregations overpyarrow.Table through thepyarrow.Table.group_by() method.The method will return a grouping declarationto which the hash aggregation functions can be applied:

>>>importpyarrowaspa>>>t=pa.table([...pa.array(["a","a","b","b","c"]),...pa.array([1,2,3,4,5]),...],names=["keys","values"])>>>t.group_by("keys").aggregate([("values","sum")])pyarrow.Tablevalues_sum: int64keys: string----values_sum: [[3,7,5]]keys: [["a","b","c"]]

The"sum" aggregation passed to theaggregate method in the previousexample is thehash_sum compute function.

Multiple aggregations can be performed at the same time by providing themto theaggregate method:

>>>importpyarrowaspa>>>t=pa.table([...pa.array(["a","a","b","b","c"]),...pa.array([1,2,3,4,5]),...],names=["keys","values"])>>>t.group_by("keys").aggregate([...("values","sum"),...("keys","count")...])pyarrow.Tablevalues_sum: int64keys_count: int64keys: string----values_sum: [[3,7,5]]keys_count: [[2,2,1]]keys: [["a","b","c"]]

Aggregation options can also be provided for each aggregation function,for example we can useCountOptions to change how we countnull values:

>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>table_with_nulls=pa.table([...pa.array(["a","a","a"]),...pa.array([1,None,None])...],names=["keys","values"])>>>table_with_nulls.group_by(["keys"]).aggregate([...("values","count",pc.CountOptions(mode="all"))...])pyarrow.Tablevalues_count: int64keys: string----values_count: [[3]]keys: [["a"]]>>>table_with_nulls.group_by(["keys"]).aggregate([...("values","count",pc.CountOptions(mode="only_valid"))...])pyarrow.Tablevalues_count: int64keys: string----values_count: [[1]]keys: [["a"]]

Following is a list of all supported grouped aggregation functions.You can use them with or without the"hash_" prefix.

hash_all

Whether all elements in each group evaluate to true

ScalarAggregateOptions

hash_any

Whether any element in each group evaluates to true

ScalarAggregateOptions

hash_approximate_median

Compute approximate medians of values in each group

ScalarAggregateOptions

hash_count

Count the number of null / non-null values in each group

CountOptions

hash_count_all

Count the number of rows in each group

hash_count_distinct

Count the distinct values in each group

CountOptions

hash_distinct

Keep the distinct values in each group

CountOptions

hash_first

Compute the first value in each group

ScalarAggregateOptions

hash_first_last

Compute the first and last of values in each group

ScalarAggregateOptions

hash_kurtosis

Compute the kurtosis of values in each group

hash_last

Compute the first value in each group

ScalarAggregateOptions

hash_list

List all values in each group

hash_max

Compute the minimum or maximum of values in each group

ScalarAggregateOptions

hash_mean

Compute the mean of values in each group

ScalarAggregateOptions

hash_min

Compute the minimum or maximum of values in each group

ScalarAggregateOptions

hash_min_max

Compute the minimum and maximum of values in each group

ScalarAggregateOptions

hash_one

Get one value from each group

hash_pivot_wider

Pivot values according to a pivot key column

PivotWiderOptions

hash_product

Compute the product of values in each group

ScalarAggregateOptions

hash_skew

Compute the skewness of values in each group

hash_stddev

Compute the standard deviation of values in each group

hash_sum

Sum values in each group

ScalarAggregateOptions

hash_tdigest

Compute approximate quantiles of values in each group

TDigestOptions

hash_variance

Compute the variance of values in each group

Table and Dataset Joins#

BothTable andDataset supportjoin operations throughTable.join()andDataset.join() methods.

The methods accept a right table or dataset that willbe joined to the initial one and one or more keys thatshould be used from the two entities to perform the join.

By default aleftouterjoin is performed, but it’s possibleto ask for any of the supported join types:

  • left semi

  • right semi

  • left anti

  • right anti

  • inner

  • left outer

  • right outer

  • full outer

A basic join can be performed just by providing a table and a keyon which the join should be performed:

importpyarrowaspatable1=pa.table({'id':[1,2,3],'year':[2020,2022,2019]})table2=pa.table({'id':[3,4],'n_legs':[5,100],'animal':["Brittle stars","Centipede"]})joined_table=table1.join(table2,keys="id")

The result will be a new table created by joiningtable1 withtable2 on theid key with aleftouterjoin:

pyarrow.Tableid:int64year:int64n_legs:int64animal:string----id:[[3,1,2]]year:[[2019,2020,2022]]n_legs:[[5,null,null]]animal:[["Brittle stars",null,null]]

We can perform additional type of joins, likefullouterjoin bypassing them to thejoin_type argument:

table1.join(table2,keys='id',join_type="full outer")

In that case the result would be:

pyarrow.Tableid:int64year:int64n_legs:int64animal:string----id:[[3,1,2,4]]year:[[2019,2020,2022,null]]n_legs:[[5,null,null,100]]animal:[["Brittle stars",null,null,"Centipede"]]

It’s also possible to provide additional join keys, so that thejoin happens on two keys instead of one. For example we can addanyear column totable2 so that we can join on('id','year'):

table2_withyear=table2.append_column("year",pa.array([2019,2022]))table1.join(table2_withyear,keys=["id","year"])

The result will be a table where only entries withid=3 andyear=2019have data, the rest will benull:

pyarrow.Tableid:int64year:int64animal:stringn_legs:int64----id:[[3,1,2]]year:[[2019,2020,2022]]animal:[["Brittle stars",null,null]]n_legs:[[5,null,null]]

The same capabilities are available forDataset.join() too, so you cantake two datasets and join them:

importpyarrow.datasetasdsds1=ds.dataset(table1)ds2=ds.dataset(table2)joined_ds=ds1.join(ds2,keys="id")

The resulting dataset will be anInMemoryDataset containing the joined data:

>>>joined_ds.head(5)pyarrow.Tableid: int64year: int64animal: stringn_legs: int64----id: [[3,1,2]]year: [[2019,2020,2022]]animal: [["Brittle stars",null,null]]n_legs: [[5,null,null]]

Filtering by Expressions#

Table andDataset canboth be filtered using a booleanExpression.

The expression can be built starting from apyarrow.compute.field(). Comparisons and transformationscan then be applied to one or more fields to build the filterexpression you care about.

MostCompute Functions can be used to perform transformationson afield.

For example we could build a filter to find all rows that are evenin column"nums"

importpyarrow.computeaspceven_filter=(pc.bit_wise_and(pc.field("nums"),pc.scalar(1))==pc.scalar(0))

Note

The filter finds even numbers by performing a bitwise and operation between the number and1.As1 is to00000001 in binary form, only numbers that have the last bit set to1will return a non-zero result from thebit_wise_and operation. This way we are identifying allodd numbers. Given that we are interested in the even ones, we then check that the number returnedby thebit_wise_and operation equals0. Only the numbers where the last bit was0 willreturn a0 as the result ofnum&1 and as all numbers where the last bit is0 aremultiples of2 we will be filtering for the even numbers only.

Once we have our filter, we can provide it to theTable.filter() methodto filter our table only for the matching rows:

>>>table=pa.table({'nums':[1,2,3,4,5,6,7,8,9,10],...'chars':["a","b","c","d","e","f","g","h","i","l"]})>>>table.filter(even_filter)pyarrow.Tablenums: int64chars: string----nums: [[2,4,6,8,10]]chars: [["b","d","f","h","l"]]

Multiple filters can be joined using&,|,~ to performand,orandnot operations. For example using~even_filter will actually end up filteringfor all numbers that are odd:

>>>table.filter(~even_filter)pyarrow.Tablenums: int64chars: string----nums: [[1,3,5,7,9]]chars: [["a","c","e","g","i"]]

and we could build a filter that finds all even numbers greater than 5 by combiningoureven_filter with apc.field("nums")>5 filter:

>>>table.filter(even_filter&(pc.field("nums")>5))pyarrow.Tablenums: int64chars: string----nums: [[6,8,10]]chars: [["f","h","l"]]

Dataset can similarly be filtered with theDataset.filter() method.The method will return an instance ofDataset which will lazilyapply the filter as soon as actual data of the dataset is accessed:

>>>dataset=ds.dataset(table)>>>filtered=dataset.filter(pc.field("nums")<5).filter(pc.field("nums")>2)>>>filtered.to_table()pyarrow.Tablenums: int64chars: string----nums: [[3,4]]chars: [["c","d"]]

User-Defined Functions#

Warning

This API isexperimental.

PyArrow allows defining and registering custom compute functions.These functions can then be called from Python as well as C++ (and potentiallyany other implementation wrapping Arrow C++, such as the Rarrow package)using their registered function name.

UDF support is limited to scalar functions. A scalar function is a function whichexecutes elementwise operations on arrays or scalars. In general, the output of ascalar function does not depend on the order of values in the arguments. Note thatsuch functions have a rough correspondence to the functions used in SQL expressions,or to NumPyuniversal functions.

To register a UDF, a function name, function docs, input types andoutput type need to be defined. Usingpyarrow.compute.register_scalar_function(),

importnumpyasnpimportpyarrowaspaimportpyarrow.computeaspcfunction_name="numpy_gcd"function_docs={"summary":"Calculates the greatest common divisor","description":"Given 'x' and 'y' find the greatest number that divides\n""evenly into both x and y."}input_types={"x":pa.int64(),"y":pa.int64()}output_type=pa.int64()defto_np(val):ifisinstance(val,pa.Scalar):returnval.as_py()else:returnnp.array(val)defgcd_numpy(ctx,x,y):np_x=to_np(x)np_y=to_np(y)returnpa.array(np.gcd(np_x,np_y))pc.register_scalar_function(gcd_numpy,function_name,function_docs,input_types,output_type)

The implementation of a user-defined function always takes a firstcontextparameter (namedctx in the example above) which is an instance ofpyarrow.compute.UdfContext.This context exposes several useful attributes, particularly amemory_pool to be used forallocations in the context of the user-defined function.

You can call a user-defined function directly usingpyarrow.compute.call_function():

>>>pc.call_function("numpy_gcd",[pa.scalar(27),pa.scalar(63)])<pyarrow.Int64Scalar: 9>>>>pc.call_function("numpy_gcd",[pa.scalar(27),pa.array([81,12,5])])<pyarrow.lib.Int64Array object at 0x7fcfa0e7b100>[  27,  3,  1]

Working with Datasets#

More generally, user-defined functions are usable everywhere a compute functioncan be referred by its name. For example, they can be called on a dataset’scolumn usingExpression._call().

Consider an instance where the data is in a table and we want to computethe GCD of one column with the scalar value 30. We will be re-using the“numpy_gcd” user-defined function that was created above:

>>>importpyarrow.datasetasds>>>data_table=pa.table({'category':['A','B','C','D'],'value':[90,630,1827,2709]})>>>dataset=ds.dataset(data_table)>>>func_args=[pc.scalar(30),ds.field("value")]>>>dataset.to_table(...columns={...'gcd_value':ds.field('')._call("numpy_gcd",func_args),...'value':ds.field('value'),...'category':ds.field('category')...})pyarrow.Tablegcd_value: int64value: int64category: string----gcd_value: [[30,30,3,3]]value: [[90,630,1827,2709]]category: [["A","B","C","D"]]

Note thatds.field('')._call(...) returns apyarrow.compute.Expression().The arguments passed to this function call are expressions, not scalar values(notice the difference betweenpyarrow.scalar() andpyarrow.compute.scalar(),the latter produces an expression).This expression is evaluated when the projection operator executes it.

Projection Expressions#

In the above example we used an expression to add a new column (gcd_value)to our table. Adding new, dynamically computed, columns to a table is known as “projection”and there are limitations on what kinds of functions can be used in projection expressions.A projection function must emit a single output value for each input row. That output valueshould be calculated entirely from the input row and should not depend on any other row.For example, the “numpy_gcd” function that we’ve been using as an example above is a validfunction to use in a projection. A “cumulative sum” function would not be a valid functionsince the result of each input row depends on the rows that came before. A “drop nulls”function would also be invalid because it doesn’t emit a value for some rows.