Compute Functions#
Arrow supports logical compute operations over inputs of possiblyvarying types.
The standard compute operations are provided by thepyarrow.computemodule and can be used directly:
>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>a=pa.array([1,1,2,3])>>>pc.sum(a)<pyarrow.Int64Scalar: 7>
The grouped aggregation functions raise an exception insteadand need to be used through thepyarrow.Table.group_by() capabilities.SeeGrouped Aggregations for more details.
Standard Compute Functions#
Many compute functions support both array (chunked or not)and scalar inputs, but some will mandate either. For example,sort_indices requires its first and only input to be an array.
Below are a few simple examples:
>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>a=pa.array([1,1,2,3])>>>b=pa.array([4,1,2,8])>>>pc.equal(a,b)<pyarrow.lib.BooleanArray object at 0x7f686e4eef30>[ false, true, true, false]>>>x,y=pa.scalar(7.8),pa.scalar(9.3)>>>pc.multiply(x,y)<pyarrow.DoubleScalar: 72.54>
If you are using a compute function which returns more than one value, resultswill be returned as aStructScalar. You can extract the individual values bycalling thepyarrow.StructScalar.values() method:
>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>a=pa.array([1,1,2,3])>>>pc.min_max(a)<pyarrow.StructScalar: [('min', 1), ('max', 3)]>>>>a,b=pc.min_max(a).values()>>>a<pyarrow.Int64Scalar: 1>>>>b<pyarrow.Int64Scalar: 3>
These functions can do more than just element-by-element operations.Here is an example of sorting a table:
>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>t=pa.table({'x':[1,2,3],'y':[3,2,1]})>>>i=pc.sort_indices(t,sort_keys=[('y','ascending')])>>>i<pyarrow.lib.UInt64Array object at 0x7fcee5df75e8>[ 2, 1, 0]
For a complete list of the compute functions that PyArrow providesyou can refer toCompute Functions reference.
Grouped Aggregations#
PyArrow supports grouped aggregations overpyarrow.Table through thepyarrow.Table.group_by() method.The method will return a grouping declarationto which the hash aggregation functions can be applied:
>>>importpyarrowaspa>>>t=pa.table([...pa.array(["a","a","b","b","c"]),...pa.array([1,2,3,4,5]),...],names=["keys","values"])>>>t.group_by("keys").aggregate([("values","sum")])pyarrow.Tablevalues_sum: int64keys: string----values_sum: [[3,7,5]]keys: [["a","b","c"]]
The"sum" aggregation passed to theaggregate method in the previousexample is thehash_sum compute function.
Multiple aggregations can be performed at the same time by providing themto theaggregate method:
>>>importpyarrowaspa>>>t=pa.table([...pa.array(["a","a","b","b","c"]),...pa.array([1,2,3,4,5]),...],names=["keys","values"])>>>t.group_by("keys").aggregate([...("values","sum"),...("keys","count")...])pyarrow.Tablevalues_sum: int64keys_count: int64keys: string----values_sum: [[3,7,5]]keys_count: [[2,2,1]]keys: [["a","b","c"]]
Aggregation options can also be provided for each aggregation function,for example we can useCountOptions to change how we countnull values:
>>>importpyarrowaspa>>>importpyarrow.computeaspc>>>table_with_nulls=pa.table([...pa.array(["a","a","a"]),...pa.array([1,None,None])...],names=["keys","values"])>>>table_with_nulls.group_by(["keys"]).aggregate([...("values","count",pc.CountOptions(mode="all"))...])pyarrow.Tablevalues_count: int64keys: string----values_count: [[3]]keys: [["a"]]>>>table_with_nulls.group_by(["keys"]).aggregate([...("values","count",pc.CountOptions(mode="only_valid"))...])pyarrow.Tablevalues_count: int64keys: string----values_count: [[1]]keys: [["a"]]
Following is a list of all supported grouped aggregation functions.You can use them with or without the"hash_" prefix.
hash_all | Whether all elements in each group evaluate to true | |
hash_any | Whether any element in each group evaluates to true | |
hash_approximate_median | Compute approximate medians of values in each group | |
hash_count | Count the number of null / non-null values in each group | |
hash_count_all | Count the number of rows in each group | |
hash_count_distinct | Count the distinct values in each group | |
hash_distinct | Keep the distinct values in each group | |
hash_first | Compute the first value in each group | |
hash_first_last | Compute the first and last of values in each group | |
hash_kurtosis | Compute the kurtosis of values in each group | |
hash_last | Compute the first value in each group | |
hash_list | List all values in each group | |
hash_max | Compute the minimum or maximum of values in each group | |
hash_mean | Compute the mean of values in each group | |
hash_min | Compute the minimum or maximum of values in each group | |
hash_min_max | Compute the minimum and maximum of values in each group | |
hash_one | Get one value from each group | |
hash_pivot_wider | Pivot values according to a pivot key column | |
hash_product | Compute the product of values in each group | |
hash_skew | Compute the skewness of values in each group | |
hash_stddev | Compute the standard deviation of values in each group | |
hash_sum | Sum values in each group | |
hash_tdigest | Compute approximate quantiles of values in each group | |
hash_variance | Compute the variance of values in each group |
Table and Dataset Joins#
BothTable andDataset supportjoin operations throughTable.join()andDataset.join() methods.
The methods accept a right table or dataset that willbe joined to the initial one and one or more keys thatshould be used from the two entities to perform the join.
By default aleftouterjoin is performed, but it’s possibleto ask for any of the supported join types:
left semi
right semi
left anti
right anti
inner
left outer
right outer
full outer
A basic join can be performed just by providing a table and a keyon which the join should be performed:
importpyarrowaspatable1=pa.table({'id':[1,2,3],'year':[2020,2022,2019]})table2=pa.table({'id':[3,4],'n_legs':[5,100],'animal':["Brittle stars","Centipede"]})joined_table=table1.join(table2,keys="id")
The result will be a new table created by joiningtable1 withtable2 on theid key with aleftouterjoin:
pyarrow.Tableid:int64year:int64n_legs:int64animal:string----id:[[3,1,2]]year:[[2019,2020,2022]]n_legs:[[5,null,null]]animal:[["Brittle stars",null,null]]
We can perform additional type of joins, likefullouterjoin bypassing them to thejoin_type argument:
table1.join(table2,keys='id',join_type="full outer")
In that case the result would be:
pyarrow.Tableid:int64year:int64n_legs:int64animal:string----id:[[3,1,2,4]]year:[[2019,2020,2022,null]]n_legs:[[5,null,null,100]]animal:[["Brittle stars",null,null,"Centipede"]]
It’s also possible to provide additional join keys, so that thejoin happens on two keys instead of one. For example we can addanyear column totable2 so that we can join on('id','year'):
table2_withyear=table2.append_column("year",pa.array([2019,2022]))table1.join(table2_withyear,keys=["id","year"])
The result will be a table where only entries withid=3 andyear=2019have data, the rest will benull:
pyarrow.Tableid:int64year:int64animal:stringn_legs:int64----id:[[3,1,2]]year:[[2019,2020,2022]]animal:[["Brittle stars",null,null]]n_legs:[[5,null,null]]
The same capabilities are available forDataset.join() too, so you cantake two datasets and join them:
importpyarrow.datasetasdsds1=ds.dataset(table1)ds2=ds.dataset(table2)joined_ds=ds1.join(ds2,keys="id")
The resulting dataset will be anInMemoryDataset containing the joined data:
>>>joined_ds.head(5)pyarrow.Tableid: int64year: int64animal: stringn_legs: int64----id: [[3,1,2]]year: [[2019,2020,2022]]animal: [["Brittle stars",null,null]]n_legs: [[5,null,null]]
Filtering by Expressions#
Table andDataset canboth be filtered using a booleanExpression.
The expression can be built starting from apyarrow.compute.field(). Comparisons and transformationscan then be applied to one or more fields to build the filterexpression you care about.
MostCompute Functions can be used to perform transformationson afield.
For example we could build a filter to find all rows that are evenin column"nums"
importpyarrow.computeaspceven_filter=(pc.bit_wise_and(pc.field("nums"),pc.scalar(1))==pc.scalar(0))
Note
The filter finds even numbers by performing a bitwise and operation between the number and1.As1 is to00000001 in binary form, only numbers that have the last bit set to1will return a non-zero result from thebit_wise_and operation. This way we are identifying allodd numbers. Given that we are interested in the even ones, we then check that the number returnedby thebit_wise_and operation equals0. Only the numbers where the last bit was0 willreturn a0 as the result ofnum&1 and as all numbers where the last bit is0 aremultiples of2 we will be filtering for the even numbers only.
Once we have our filter, we can provide it to theTable.filter() methodto filter our table only for the matching rows:
>>>table=pa.table({'nums':[1,2,3,4,5,6,7,8,9,10],...'chars':["a","b","c","d","e","f","g","h","i","l"]})>>>table.filter(even_filter)pyarrow.Tablenums: int64chars: string----nums: [[2,4,6,8,10]]chars: [["b","d","f","h","l"]]
Multiple filters can be joined using&,|,~ to performand,orandnot operations. For example using~even_filter will actually end up filteringfor all numbers that are odd:
>>>table.filter(~even_filter)pyarrow.Tablenums: int64chars: string----nums: [[1,3,5,7,9]]chars: [["a","c","e","g","i"]]
and we could build a filter that finds all even numbers greater than 5 by combiningoureven_filter with apc.field("nums")>5 filter:
>>>table.filter(even_filter&(pc.field("nums")>5))pyarrow.Tablenums: int64chars: string----nums: [[6,8,10]]chars: [["f","h","l"]]
Dataset can similarly be filtered with theDataset.filter() method.The method will return an instance ofDataset which will lazilyapply the filter as soon as actual data of the dataset is accessed:
>>>dataset=ds.dataset(table)>>>filtered=dataset.filter(pc.field("nums")<5).filter(pc.field("nums")>2)>>>filtered.to_table()pyarrow.Tablenums: int64chars: string----nums: [[3,4]]chars: [["c","d"]]
User-Defined Functions#
Warning
This API isexperimental.
PyArrow allows defining and registering custom compute functions.These functions can then be called from Python as well as C++ (and potentiallyany other implementation wrapping Arrow C++, such as the Rarrow package)using their registered function name.
UDF support is limited to scalar functions. A scalar function is a function whichexecutes elementwise operations on arrays or scalars. In general, the output of ascalar function does not depend on the order of values in the arguments. Note thatsuch functions have a rough correspondence to the functions used in SQL expressions,or to NumPyuniversal functions.
To register a UDF, a function name, function docs, input types andoutput type need to be defined. Usingpyarrow.compute.register_scalar_function(),
importnumpyasnpimportpyarrowaspaimportpyarrow.computeaspcfunction_name="numpy_gcd"function_docs={"summary":"Calculates the greatest common divisor","description":"Given 'x' and 'y' find the greatest number that divides\n""evenly into both x and y."}input_types={"x":pa.int64(),"y":pa.int64()}output_type=pa.int64()defto_np(val):ifisinstance(val,pa.Scalar):returnval.as_py()else:returnnp.array(val)defgcd_numpy(ctx,x,y):np_x=to_np(x)np_y=to_np(y)returnpa.array(np.gcd(np_x,np_y))pc.register_scalar_function(gcd_numpy,function_name,function_docs,input_types,output_type)
The implementation of a user-defined function always takes a firstcontextparameter (namedctx in the example above) which is an instance ofpyarrow.compute.UdfContext.This context exposes several useful attributes, particularly amemory_pool to be used forallocations in the context of the user-defined function.
You can call a user-defined function directly usingpyarrow.compute.call_function():
>>>pc.call_function("numpy_gcd",[pa.scalar(27),pa.scalar(63)])<pyarrow.Int64Scalar: 9>>>>pc.call_function("numpy_gcd",[pa.scalar(27),pa.array([81,12,5])])<pyarrow.lib.Int64Array object at 0x7fcfa0e7b100>[ 27, 3, 1]
Working with Datasets#
More generally, user-defined functions are usable everywhere a compute functioncan be referred by its name. For example, they can be called on a dataset’scolumn usingExpression._call().
Consider an instance where the data is in a table and we want to computethe GCD of one column with the scalar value 30. We will be re-using the“numpy_gcd” user-defined function that was created above:
>>>importpyarrow.datasetasds>>>data_table=pa.table({'category':['A','B','C','D'],'value':[90,630,1827,2709]})>>>dataset=ds.dataset(data_table)>>>func_args=[pc.scalar(30),ds.field("value")]>>>dataset.to_table(...columns={...'gcd_value':ds.field('')._call("numpy_gcd",func_args),...'value':ds.field('value'),...'category':ds.field('category')...})pyarrow.Tablegcd_value: int64value: int64category: string----gcd_value: [[30,30,3,3]]value: [[90,630,1827,2709]]category: [["A","B","C","D"]]
Note thatds.field('')._call(...) returns apyarrow.compute.Expression().The arguments passed to this function call are expressions, not scalar values(notice the difference betweenpyarrow.scalar() andpyarrow.compute.scalar(),the latter produces an expression).This expression is evaluated when the projection operator executes it.
Projection Expressions#
In the above example we used an expression to add a new column (gcd_value)to our table. Adding new, dynamically computed, columns to a table is known as “projection”and there are limitations on what kinds of functions can be used in projection expressions.A projection function must emit a single output value for each input row. That output valueshould be calculated entirely from the input row and should not depend on any other row.For example, the “numpy_gcd” function that we’ve been using as an example above is a validfunction to use in a projection. A “cumulative sum” function would not be a valid functionsince the result of each input row depends on the rows that came before. A “drop nulls”function would also be invalid because it doesn’t emit a value for some rows.

