Gandiva Expression, Projector, and Filter#

Building Expressions#

Gandiva provides a general expression representation where expressions arerepresented by a tree of nodes. The expression trees are built usingTreeExprBuilder. The leaves of the expression tree are typicallyfield references, created byTreeExprBuilder::MakeField(), andliteral values, created byTreeExprBuilder::MakeLiteral(). Nodescan be combined into more complex expression trees using:

  • TreeExprBuilder::MakeFunction() to create a functionnode. (You can callGetRegisteredFunctionSignatures() toget a list of valid function signatures.)

  • TreeExprBuilder::MakeIf() to create if-else logic.

  • TreeExprBuilder::MakeAnd() andTreeExprBuilder::MakeOr()to create boolean expressions. (For “not”, use thenot(bool) function inMakeFunction.)

  • TreeExprBuilder::MakeInExpressionInt32() and the other “in expression”functions to create set membership tests.

Each of these functions create new composite nodes, which contain the leaf nodes(literals and field references) or other composite nodes as children. Bycomposing these, you can create arbitrarily complex expression trees.

Once an expression tree is built, they are wrapped in eitherExpressionorCondition, depending on how they will be used.Expression is used in projections whileCondition is used in filters.

As an example, here is how to create an Expression representingx+3 and aCondition representingx<3:

std::shared_ptr<arrow::Field>field_x_raw=arrow::field("x",arrow::int32());std::shared_ptr<Node>field_x=TreeExprBuilder::MakeField(field_x_raw);std::shared_ptr<Node>literal_3=TreeExprBuilder::MakeLiteral(3);std::shared_ptr<arrow::Field>field_result=arrow::field("result",arrow::int32());std::shared_ptr<Node>add_node=TreeExprBuilder::MakeFunction("add",{field_x,literal_3},arrow::int32());std::shared_ptr<Expression>expression=TreeExprBuilder::MakeExpression(add_node,field_result);std::shared_ptr<Node>less_than_node=TreeExprBuilder::MakeFunction("less_than",{field_x,literal_3},arrow::boolean());std::shared_ptr<Condition>condition=TreeExprBuilder::MakeCondition(less_than_node);

Projectors and Filters#

Gandiva’s two execution kernels areProjector andFilter.Projector consumes a record batch and projectsinto a new record batch.Filter consumes a record batch and produces aSelectionVector containing the indices that matched the condition.

For bothProjector andFilter, optimization of the expression IR happenswhen creating instances. They are compiled against a static schema, so theschema of the record batches must be known at this point.

Continuing with theexpression andcondition created in the previoussection, here is an example of creating a Projector and a Filter:

std::shared_ptr<arrow::Schema>input_schema=arrow::schema({field_x_raw});std::shared_ptr<arrow::Schema>output_schema=arrow::schema({field_result});std::shared_ptr<Projector>projector;Statusstatus;std::vector<std::shared_ptr<Expression>>expressions={expression};status=Projector::Make(input_schema,expressions,&projector);ARROW_RETURN_NOT_OK(status);std::shared_ptr<Filter>filter;status=Filter::Make(input_schema,condition,&filter);ARROW_RETURN_NOT_OK(status);

Once a Projector or Filter is created, it can be evaluated on Arrow record batches.These execution kernels are single-threaded on their own, but are designed to bereused to process distinct record batches in parallel.

Evaluating projections#

Execution is performed withProjector::Evaluate(). This outputsa vector of arrays, which can be passed along with the output schema toarrow::RecordBatch::Make().

autopool=arrow::default_memory_pool();intnum_records=4;arrow::Int32Builderbuilder;int32_tvalues[4]={1,2,3,4};ARROW_RETURN_NOT_OK(builder.AppendValues(values,4));ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array>array,builder.Finish());autoin_batch=arrow::RecordBatch::Make(input_schema,num_records,{array});arrow::ArrayVectoroutputs;status=projector->Evaluate(*in_batch,pool,&outputs);ARROW_RETURN_NOT_OK(status);std::shared_ptr<arrow::RecordBatch>result=arrow::RecordBatch::Make(output_schema,outputs[0]->length(),outputs);

Evaluating filters#

Filter::Evaluate() producesSelectionVector,a vector of row indices that matched the filter condition. The selection vectoris a wrapper around an arrow integer array, parameterized by bitwidth. Whencreating the selection vector (you must initialize itbefore passing toEvaluate()), you must choose the bitwidth, which determines the max indexvalue it can hold, and the max number of slots, which determines how many indicesit may contain. In general, the max number of slots should be set to your batchsize and the bitwidth the smallest integer size that can represent all integersless than the batch size. For example, if your batch size is 100k, set themaximum number of slots to 100k and the bitwidth to 32 (since 2^16 = 64k whichwould be too small).

OnceEvaluate() has been run and theSelectionVector ispopulated, use theSelectionVector::ToArray() method to getthe underlying array and then::arrow::compute::Take() to materialize theoutput record batch.

std::shared_ptr<gandiva::SelectionVector>result_indices;// Use 16-bit integers for indices. Result can be no longer than input size,// so use batch num_rows as max_slots.status=gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(),pool,&result_indices);ARROW_RETURN_NOT_OK(status);status=filter->Evaluate(*in_batch,result_indices);ARROW_RETURN_NOT_OK(status);std::shared_ptr<arrow::Array>take_indices=result_indices->ToArray();Datummaybe_batch;ARROW_ASSIGN_OR_RAISE(maybe_batch,arrow::compute::Take(Datum(in_batch),Datum(take_indices),TakeOptions::NoBoundsCheck()));result=maybe_batch.record_batch();

Evaluating projections and filters#

Finally, you can also project while apply a selection vector, withProjector::Evaluate(). To do so, first make sure to initialize theProjector withSelectionVector::GetMode() so that the projectorcompiles with the correct bitwidth. Then you can pass theSelectionVector into theProjector::Evaluate() method.

// Make sure the projector is compiled for the appropriate selection vector modestatus=Projector::Make(input_schema,expressions,result_indices->GetMode(),ConfigurationBuilder::DefaultConfiguration(),&projector);ARROW_RETURN_NOT_OK(status);arrow::ArrayVectoroutputs_filtered;status=projector->Evaluate(*in_batch,result_indices.get(),pool,&outputs_filtered);ARROW_RETURN_NOT_OK(status);result=arrow::RecordBatch::Make(output_schema,outputs[0]->length(),outputs_filtered);