Acero User’s Guide#
This page describes how to use Acero. It’s recommended that you read theoverview first and familiarize yourself with the basic concepts.
Using Acero#
The basic workflow for Acero is this:
First, create a graph of
Declarationobjects describing the planCall one of the DeclarationToXyz methods to execute the Declaration.
A new ExecPlan is created from the graph of Declarations. Each Declaration will correspond to oneExecNode in the plan. In addition, a sink node will be added, depending on which DeclarationToXyz methodwas used.
The ExecPlan is executed. Typically this happens as part of the DeclarationToXyz call but inDeclarationToReader the reader is returned before the plan is finished executing.
Once the plan is finished it is destroyed
Creating a Plan#
Using Substrait#
Substrait is the preferred mechanism for creating a plan (graph ofDeclaration). There are a fewreasons for this:
Substrait producers spend a lot of time and energy in creating user-friendly APIs for producing complexexecution plans in a simple way. For example, the
pivot_wideroperation can be achieved using a complexseries ofaggregatenodes. Rather than create all of thoseaggregatenodes by hand a producer willgive you a much simpler API.If you are using Substrait then you can easily switch out to any other Substrait-consuming engine should youat some point find that it serves your needs better than Acero.
We hope that tools will eventually emerge for Substrait-based optimizers and planners. By using Substraityou will be making it much easier to use these tools in the future.
You could create the Substrait plan yourself but you’ll probably have a much easier time finding an existingSubstrait producer. For example, you could useibis-substraitto easily create Substrait plans from python expressions. There are a few different tools that are able to createSubstrait plans from SQL. Eventually, we hope that C++ based Substrait producers will emerge. However, weare not aware of any at this time.
Detailed instructions on creating an execution plan from Substrait can be found inthe Substrait page
Programmatic Plan Creation#
Creating an execution plan programmatically is simpler than creating a plan from Substrait, though loses some ofthe flexibility and future-proofing guarantees. The simplest way to create a Declaration is to simply instantiateone. You will need the name of the declaration, a vector of inputs, and an options object. For example:
381/// \brief An example showing a project node382///383/// Scan-Project-Table384/// This example shows how a Scan operation can be used to load the data385/// into the execution plan, how a project operation can be applied on the386/// data stream and how the output is collected into a table387arrow::StatusScanProjectSinkExample(){388ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());389390autooptions=std::make_shared<arrow::dataset::ScanOptions>();391// projection392cp::Expressiona_times_2=cp::call("multiply",{cp::field_ref("a"),cp::literal(2)});393options->projection=cp::project({},{});394395autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};396397ac::Declarationscan{"scan",std::move(scan_node_options)};398ac::Declarationproject{399"project",{std::move(scan)},ac::ProjectNodeOptions({a_times_2})};400401returnExecutePlanAndCollectAsTable(std::move(project));402}
The above code creates a scan declaration (which has no inputs) and a project declaration (using the scan asinput). This is simple enough but we can make it slightly easier. If you are creating a linear sequence ofdeclarations (like in the above example) then you can also use theDeclaration::Sequence() function.
420// Inputs do not have to be passed to the project node when using Sequence421ac::Declarationplan=422ac::Declaration::Sequence({{"scan",std::move(scan_node_options)},423{"project",ac::ProjectNodeOptions({a_times_2})}});
There are many more examples of programmatic plan creation later in this document.
Executing a Plan#
There are a number of different methods that can be used to execute a declaration. Each one provides thedata in a slightly different form. Since all of these methods start withDeclarationTo... this guidewill often refer to these methods as theDeclarationToXyz methods.
DeclarationToTable#
TheDeclarationToTable() method will accumulate all of the results into a singlearrow::Table.This is perhaps the simplest way to collect results from Acero. The main disadvantage to this approach isthat it requires accumulating all results into memory.
Note
Acero processes large datasets in small chunks. This is described in more detail in the developer’s guide.As a result, you may be surprised to find that a table collected with DeclarationToTable is chunkeddifferently than your input. For example, your input might be a large table with a single chunk with 2million rows. Your output table might then have 64 chunks with 32Ki rows each. There is a current requestto specify the chunk size for the output inGH-15155.
DeclarationToReader#
TheDeclarationToReader() method allows you to iteratively consume the results. It will create anarrow::RecordBatchReader which you can read from at your leisure. If you do not read from thereader quickly enough then backpressure will be applied and the execution plan will pause. Closing thereader will cancel the running execution plan and the reader’s destructor will wait for the execution planto finish whatever it is doing and so it may block.
DeclarationToStatus#
TheDeclarationToStatus() method is useful if you want to run the plan but do not actually want toconsume the results. For example, this is useful when benchmarking or when the plan has side effects suchas a dataset write node. If the plan generates any results then they will be immediately discarded.
Running a Plan Directly#
If one of theDeclarationToXyz methods is not sufficient for some reason then it is possible to run a plandirectly. This should only be needed if you are doing something unique. For example, if you have created acustom sink node or if you need a plan that has multiple outputs.
Note
In academic literature and many existing systems there is a general assumption that an execution plan hasat most one output. There are some things in Acero, such as the DeclarationToXyz methods, which will expectthis. However, there is nothing in the design that strictly prevents having multiple sink nodes.
Detailed instructions on how to do this are out of scope for this guide but the rough steps are:
Create a new
ExecPlanobject.Add sink nodes to your graph of
Declarationobjects (this is the only type you will needto create declarations for sink nodes)Use
Declaration::AddToPlan()to add your declaration to your plan (if you have more than one outputthen you will not be able to use this method and will need to add your nodes one at a time)Validate the plan with
ExecPlan::Validate()Start the plan with
ExecPlan::StartProducing()Wait for the future returned by
ExecPlan::finished()to complete.
Providing Input#
Input data for an exec plan can come from a variety of sources. It is often read from files stored on somekind of filesystem. It is also common for input to come from in-memory data. In-memory data is typical, forexample, in a pandas-like frontend. Input could also come from network streams like a Flight request. Acerocan support all of these cases and can even support unique and custom situations not mentioned here.
There are pre-defined source nodes that cover the most common input scenarios. These are listed below. However,if your source data is unique then you will need to use the genericsource node. This node expects you toprovide an asynchronous stream of batches and is covered in more detailhere.
AvailableExecNode Implementations#
The following tables quickly summarize the available operators.
Sources#
These nodes can be used as sources of data
Factory Name | Options | Brief Description |
|---|---|---|
| A generic source node that wraps an asynchronous stream of data (example) | |
| Generates data from an | |
| Generates data from an iterator of | |
| Generates data from an | |
| Generates data from an iterator of | |
| Generates data from an iterator of vectors of | |
| Generates data from an |
Compute Nodes#
These nodes perform computations on data and may transform or reshape the data
Factory Name | Options | Brief Description |
|---|---|---|
| Removes rows that do not match a given filter expression(example) | |
| Creates new columns by evaluating compute expressions. Can also drop and reorder columns(example) | |
| Calculates summary statistics across the entire input stream or on groups of data(example) | |
| Reshapes data by converting some columns into additional rows |
Arrangement Nodes#
These nodes reorder, combine, or slice streams of data
Factory Name | Options | Brief Description |
|---|---|---|
| Joins two inputs based on common columns (example) | |
| Joins multiple inputs to the first input based on a common ordered column (often time) | |
| N/A | Merges two inputs with identical schemas (example) |
| Reorders a stream | |
| Slices a range of rows from a stream |
Sink Nodes#
These nodes terminate a plan. Users do not typically create sink nodes as they areselected based on the DeclarationToXyz method used to consume the plan. However, thislist may be useful for those developing new sink nodes or using Acero in advanced ways.
Factory Name | Options | Brief Description |
|---|---|---|
| Collects batches into a FIFO queue with optional backpressure | |
| Writes batches to a filesystem (example) | |
| Consumes batches using a user provided callback function | |
| Collects batches into an | |
| Deprecated | |
| Deprecated |
Examples#
The rest of this document contains example execution plans. Each example highlights the behaviorof a specific execution node.
source#
Asource operation can be considered as an entry point to create a streaming execution plan.SourceNodeOptions are used to create thesource operation. Thesource operation is the most generic and flexible type of source currently available but it canbe quite tricky to configure. First you should review the other source node types to ensure thereisn’t a simpler choice.
The source node requires some kind of function that can be called to poll for more data. Thisfunction should take no arguments and should return anarrow::Future<std::optional<arrow::ExecBatch>>.This function might be reading a file, iterating through an in memory structure, or receiving datafrom a network connection. The arrow library refers to these functions asarrow::AsyncGeneratorand there are a number of utilities for working with these functions. For this example we usea vector of record batches that we’ve already stored in memory.In addition, the schema of the data must be known up front. Acero must know the schema of the dataat each stage of the execution graph before any processing has begun. This means we must supply theschema for a source node separately from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schemaand a function that serves as a data generator :
156structBatchesWithSchema{157std::vector<cp::ExecBatch>batches;158std::shared_ptr<arrow::Schema>schema;159// This method uses internal arrow utilities to160// convert a vector of record batches to an AsyncGenerator of optional batches161arrow::AsyncGenerator<std::optional<cp::ExecBatch>>gen()const{162autoopt_batches=::arrow::internal::MapVector(163[](cp::ExecBatchbatch){returnstd::make_optional(std::move(batch));},164batches);165arrow::AsyncGenerator<std::optional<cp::ExecBatch>>gen;166gen=arrow::MakeVectorGenerator(std::move(opt_batches));167returngen;168}169};
Generating sample batches for computation:
173arrow::Result<BatchesWithSchema>MakeBasicBatches(){174BatchesWithSchemaout;175autofield_vector={arrow::field("a",arrow::int32()),176arrow::field("b",arrow::boolean())};177ARROW_ASSIGN_OR_RAISE(autob1_int,GetArrayDataSample<arrow::Int32Type>({0,4}));178ARROW_ASSIGN_OR_RAISE(autob2_int,GetArrayDataSample<arrow::Int32Type>({5,6,7}));179ARROW_ASSIGN_OR_RAISE(autob3_int,GetArrayDataSample<arrow::Int32Type>({8,9,10}));180181ARROW_ASSIGN_OR_RAISE(autob1_bool,182GetArrayDataSample<arrow::BooleanType>({false,true}));183ARROW_ASSIGN_OR_RAISE(autob2_bool,184GetArrayDataSample<arrow::BooleanType>({true,false,true}));185ARROW_ASSIGN_OR_RAISE(autob3_bool,186GetArrayDataSample<arrow::BooleanType>({false,true,false}));187188ARROW_ASSIGN_OR_RAISE(autob1,189GetExecBatchFromVectors(field_vector,{b1_int,b1_bool}));190ARROW_ASSIGN_OR_RAISE(autob2,191GetExecBatchFromVectors(field_vector,{b2_int,b2_bool}));192ARROW_ASSIGN_OR_RAISE(autob3,193GetExecBatchFromVectors(field_vector,{b3_int,b3_bool}));194195out.batches={b1,b2,b3};196out.schema=arrow::schema(field_vector);197returnout;198}
Example of usingsource (usage of sink is explained in detail insink):
294/// \brief An example demonstrating a source and sink node295///296/// Source-Table Example297/// This example shows how a custom source can be used298/// in an execution plan. This includes source node using pregenerated299/// data and collecting it into a table.300///301/// This sort of custom source is often not needed. In most cases you can302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,303/// exec_batch_source, or record_batch_source (for in-memory data)304arrow::StatusSourceSinkExample(){305ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());306307autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};308309ac::Declarationsource{"source",std::move(source_node_options)};310311returnExecutePlanAndCollectAsTable(std::move(source));312}
table_source#
In the previous example,source node, a source nodewas used to input the data. But when developing an application, if the data is already in memoryas a table, it is much easier, and more performant to useTableSourceNodeOptions.Here the input data can be passed as astd::shared_ptr<arrow::Table> along with amax_batch_size.Themax_batch_size is to break up large record batches so that they can be processed in parallel.It is important to note that the table batches will not get merged to form larger batches when the sourcetable has a smaller batch size.
Example of usingtable_source
317/// \brief An example showing a table source node318///319/// TableSource-Table Example320/// This example shows how a table_source can be used321/// in an execution plan. This includes a table source node322/// receiving data from a table. This plan simply collects the323/// data back into a table but nodes could be added that modify324/// or transform the data as well (as is shown in later examples)325arrow::StatusTableSourceSinkExample(){326ARROW_ASSIGN_OR_RAISE(autotable,GetTable());327328arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;329intmax_batch_size=2;330autotable_source_options=ac::TableSourceNodeOptions{table,max_batch_size};331332ac::Declarationsource{"table_source",std::move(table_source_options)};333334returnExecutePlanAndCollectAsTable(std::move(source));335}
filter#
filter operation, as the name suggests, provides an option to define data filteringcriteria. It selects rows where the given expression evaluates to true. Filters can be written usingarrow::compute::Expression, and the expression should have a return type of boolean.For example, if we wish to keep rows where the valueof columnb is greater than 3, then we can use the following expression.
Filter example:
340/// \brief An example showing a filter node341///342/// Source-Filter-Table343/// This example shows how a filter can be used in an execution plan,344/// to filter data from a source. The output from the execution plan345/// is collected into a table.346arrow::StatusScanFilterSinkExample(){347ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());348349autooptions=std::make_shared<arrow::dataset::ScanOptions>();350// specify the filter. This filter removes all rows where the351// value of the "a" column is greater than 3.352cp::Expressionfilter_expr=cp::greater(cp::field_ref("a"),cp::literal(3));353// set filter for scanner : on-disk / push-down filtering.354// This step can be skipped if you are not reading from disk.355options->filter=filter_expr;356// empty projection357options->projection=cp::project({},{});358359// construct the scan node360std::cout<<"Initialized Scanning Options"<<std::endl;361362autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};363std::cout<<"Scan node options created"<<std::endl;364365ac::Declarationscan{"scan",std::move(scan_node_options)};366367// pipe the scan node into the filter node368// Need to set the filter in scan node options and filter node options.369// At scan node it is used for on-disk / push-down filtering.370// At filter node it is used for in-memory filtering.371ac::Declarationfilter{372"filter",{std::move(scan)},ac::FilterNodeOptions(std::move(filter_expr))};373374returnExecutePlanAndCollectAsTable(std::move(filter));375}
project#
project operation rearranges, deletes, transforms, and creates columns.Each output column is computed by evaluating an expressionagainst the source record batch. These must be scalar expressions(expressions consisting of scalar literals, field references and scalarfunctions, i.e. elementwise functions that return one value for each inputrow independent of the value of all other rows).This is exposed viaProjectNodeOptions which requires,anarrow::compute::Expression and name for each of the output columns (if names are notprovided, the string representations of exprs will be used).
Project example:
381/// \brief An example showing a project node382///383/// Scan-Project-Table384/// This example shows how a Scan operation can be used to load the data385/// into the execution plan, how a project operation can be applied on the386/// data stream and how the output is collected into a table387arrow::StatusScanProjectSinkExample(){388ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());389390autooptions=std::make_shared<arrow::dataset::ScanOptions>();391// projection392cp::Expressiona_times_2=cp::call("multiply",{cp::field_ref("a"),cp::literal(2)});393options->projection=cp::project({},{});394395autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};396397ac::Declarationscan{"scan",std::move(scan_node_options)};398ac::Declarationproject{399"project",{std::move(scan)},ac::ProjectNodeOptions({a_times_2})};400401returnExecutePlanAndCollectAsTable(std::move(project));402}
aggregate#
Theaggregate node computes various types of aggregates over data.
Arrow supports two types of aggregates: “scalar” aggregates, and“hash” aggregates. Scalar aggregates reduce an array or scalar inputto a single scalar output (e.g. computing the mean of a column). Hashaggregates act likeGROUPBY in SQL and first partition data basedon one or more key columns, then reduce the data in eachpartition. Theaggregate node supports both types of computation,and can compute any number of aggregations at once.
AggregateNodeOptions is used to define theaggregation criteria. It takes a list of aggregation functions andtheir options; a list of target fields to aggregate, one per function;and a list of names for the output fields, one per function.Optionally, it takes a list of columns that are used to partition thedata, in the case of a hash aggregation. The aggregation functionscan be selected fromthis list of aggregation functions.
Note
This node is a “pipeline breaker” and will fully materializethe dataset in memory. In the future, spillover mechanismswill be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances,an operation likehash_count provides the counts per each unique recordas a grouped result while an operation likesum provides a single record.
Scalar Aggregation example:
430/// \brief An example showing an aggregation node to aggregate an entire table431///432/// Source-Aggregation-Table433/// This example shows how an aggregation operation can be applied on a434/// execution plan resulting in a scalar output. The source node loads the435/// data and the aggregation (counting unique types in column 'a')436/// is applied on this data. The output is collected into a table (that will437/// have exactly one row)438arrow::StatusSourceScalarAggregateSinkExample(){439ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());440441autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};442443ac::Declarationsource{"source",std::move(source_node_options)};444autoaggregate_options=445ac::AggregateNodeOptions{/*aggregates=*/{{"sum",nullptr,"a","sum(a)"}}};446ac::Declarationaggregate{447"aggregate",{std::move(source)},std::move(aggregate_options)};448449returnExecutePlanAndCollectAsTable(std::move(aggregate));450}
Group Aggregation example:
455/// \brief An example showing an aggregation node to perform a group-by operation456///457/// Source-Aggregation-Table458/// This example shows how an aggregation operation can be applied on a459/// execution plan resulting in grouped output. The source node loads the460/// data and the aggregation (counting unique types in column 'a') is461/// applied on this data. The output is collected into a table that will contain462/// one row for each unique combination of group keys.463arrow::StatusSourceGroupAggregateSinkExample(){464ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());465466arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;467468autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};469470ac::Declarationsource{"source",std::move(source_node_options)};471autooptions=std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);472autoaggregate_options=473ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count",options,"a","count(a)"}},474/*keys=*/{"b"}};475ac::Declarationaggregate{476"aggregate",{std::move(source)},std::move(aggregate_options)};477478returnExecutePlanAndCollectAsTable(std::move(aggregate));479}
sink#
sink operation provides output and is the final node of a streamingexecution definition.SinkNodeOptions interface is used to passthe required options. Similar to the source operator the sink operator exposes the outputwith a function that returns a record batch future each time it is called. It is expected thecaller will repeatedly call this function until the generator function is exhausted (returnsstd::optional::nullopt). If this function is not called often enough then record batcheswill accumulate in memory. An execution plan should only have one“terminal” node (one sink node). AnExecPlan can terminate early due to cancellation oran error, before the output is fully consumed. However, the plan can be safely destroyed independentlyof the sink, which will hold the unconsumed batches byexec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
294/// \brief An example demonstrating a source and sink node295///296/// Source-Table Example297/// This example shows how a custom source can be used298/// in an execution plan. This includes source node using pregenerated299/// data and collecting it into a table.300///301/// This sort of custom source is often not needed. In most cases you can302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,303/// exec_batch_source, or record_batch_source (for in-memory data)304arrow::StatusSourceSinkExample(){305ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());306307autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};308309ac::Declarationsource{"source",std::move(source_node_options)};310311returnExecutePlanAndCollectAsTable(std::move(source));312}
consuming_sink#
consuming_sink operator is a sink operation containing consuming operation within theexecution plan (i.e. the exec plan should not complete until the consumption has completed).Unlike thesink node this node takes in a callback function that is expected to consume thebatch. Once this callback has finished the execution plan will no longer hold any reference tothe batch.The consuming function may be called before a previous invocation has completed. If the consumingfunction does not run quickly enough then many concurrent executions could pile up, blocking theCPU thread pool. The execution plan will not be marked finished until all consuming function callbackshave been completed.Once all batches have been delivered the execution plan will wait for thefinish future to completebefore marking the execution plan finished. This allows for workflows where the consumption functionconverts batches into async tasks (this is currently done internally for the dataset write node).
Example:
// define a Custom SinkNodeConsumerstd::atomic<uint32_t>batches_seen{0};arrow::Future<>finish=arrow::Future<>::Make();structCustomSinkNodeConsumer:publiccp::SinkNodeConsumer{CustomSinkNodeConsumer(std::atomic<uint32_t>*batches_seen,arrow::Future<>finish):batches_seen(batches_seen),finish(std::move(finish)){}// Consumption logic can be written herearrow::StatusConsume(cp::ExecBatchbatch)override{// data can be consumed in the expected way// transfer to another system or just do some work// and write to disk(*batches_seen)++;returnarrow::Status::OK();}arrow::Future<>Finish()override{returnfinish;}std::atomic<uint32_t>*batches_seen;arrow::Future<>finish;};std::shared_ptr<CustomSinkNodeConsumer>consumer=std::make_shared<CustomSinkNodeConsumer>(&batches_seen,finish);arrow::acero::ExecNode*consuming_sink;ARROW_ASSIGN_OR_RAISE(consuming_sink,MakeExecNode("consuming_sink",plan.get(),{source},cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
484/// \brief An example showing a consuming sink node485///486/// Source-Consuming-Sink487/// This example shows how the data can be consumed within the execution plan488/// by using a ConsumingSink node. There is no data output from this execution plan.489arrow::StatusSourceConsumingSinkExample(){490ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());491492autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};493494ac::Declarationsource{"source",std::move(source_node_options)};495496std::atomic<uint32_t>batches_seen{0};497arrow::Future<>finish=arrow::Future<>::Make();498structCustomSinkNodeConsumer:publicac::SinkNodeConsumer{499CustomSinkNodeConsumer(std::atomic<uint32_t>*batches_seen,arrow::Future<>finish)500:batches_seen(batches_seen),finish(std::move(finish)){}501502arrow::StatusInit(conststd::shared_ptr<arrow::Schema>&schema,503ac::BackpressureControl*backpressure_control,504ac::ExecPlan*plan)override{505// This will be called as the plan is started (before the first call to Consume)506// and provides the schema of the data coming into the node, controls for pausing /507// resuming input, and a pointer to the plan itself which can be used to access508// other utilities such as the thread indexer or async task scheduler.509returnarrow::Status::OK();510}511512arrow::StatusConsume(cp::ExecBatchbatch)override{513(*batches_seen)++;514returnarrow::Status::OK();515}516517arrow::Future<>Finish()override{518// Here you can perform whatever (possibly async) cleanup is needed, e.g. closing519// output file handles and flushing remaining work520returnarrow::Future<>::MakeFinished();521}522523std::atomic<uint32_t>*batches_seen;524arrow::Future<>finish;525};526std::shared_ptr<CustomSinkNodeConsumer>consumer=527std::make_shared<CustomSinkNodeConsumer>(&batches_seen,finish);528529ac::Declarationconsuming_sink{"consuming_sink",530{std::move(source)},531ac::ConsumingSinkNodeOptions(std::move(consumer))};532533// Since we are consuming the data within the plan there is no output and we simply534// run the plan to completion instead of collecting into a table.535ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));536537std::cout<<"The consuming sink node saw "<<batches_seen.load()<<" batches"538<<std::endl;539returnarrow::Status::OK();540}
order_by_sink#
order_by_sink operation is an extension to thesink operation.This operation provides the ability to guarantee the ordering of thestream by providing theOrderBySinkNodeOptions.Here thearrow::compute::SortOptions are provided to define which columnsare used for sorting and whether to sort by ascending or descending values.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory.In the future, spillover mechanisms will be added which should alleviate thisconstraint.
Order-By-Sink example:
545arrow::StatusExecutePlanAndCollectAsTableWithCustomSink(546std::shared_ptr<ac::ExecPlan>plan,std::shared_ptr<arrow::Schema>schema,547arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen){548// translate sink_gen (async) to sink_reader (sync)549std::shared_ptr<arrow::RecordBatchReader>sink_reader=550ac::MakeGeneratorReader(schema,std::move(sink_gen),arrow::default_memory_pool());551552// validate the ExecPlan553ARROW_RETURN_NOT_OK(plan->Validate());554std::cout<<"ExecPlan created : "<<plan->ToString()<<std::endl;555// start the ExecPlan556plan->StartProducing();557558// collect sink_reader into a Table559std::shared_ptr<arrow::Table>response_table;560561ARROW_ASSIGN_OR_RAISE(response_table,562arrow::Table::FromRecordBatchReader(sink_reader.get()));563564std::cout<<"Results : "<<response_table->ToString()<<std::endl;565566// stop producing567plan->StopProducing();568// plan mark finished569autofuture=plan->finished();570returnfuture.status();571}572573/// \brief An example showing an order-by node574///575/// Source-OrderBy-Sink576/// In this example, the data enters through the source node577/// and the data is ordered in the sink node. The order can be578/// ASCENDING or DESCENDING and it is configurable. The output579/// is obtained as a table from the sink node.580arrow::StatusSourceOrderBySinkExample(){581ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan>plan,582ac::ExecPlan::Make(*cp::threaded_exec_context()));583584ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeSortTestBasicBatches());585586arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;587588autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};589ARROW_ASSIGN_OR_RAISE(ac::ExecNode*source,590ac::MakeExecNode("source",plan.get(),{},source_node_options));591592ARROW_RETURN_NOT_OK(ac::MakeExecNode(593"order_by_sink",plan.get(),{source},594ac::OrderBySinkNodeOptions{595cp::SortOptions{{cp::SortKey{"a",cp::SortOrder::Descending}}},&sink_gen}));596597returnExecutePlanAndCollectAsTableWithCustomSink(plan,basic_data.schema,sink_gen);598}
select_k_sink#
select_k_sink option enables selecting the top/bottom K elements,similar to a SQLORDERBY...LIMITK clause.SelectKOptions which is a defined byusingOrderBySinkNode definition. This option returns a sink node that receivesinputs and then compute top_k/bottom_k.
Note
This node is a “pipeline breaker” and will fully materialize the input in memory.In the future, spillover mechanisms will be added which should alleviate thisconstraint.
SelectK example:
631/// \brief An example showing a select-k node632///633/// Source-KSelect634/// This example shows how K number of elements can be selected635/// either from the top or bottom. The output node is a modified636/// sink node where output can be obtained as a table.637arrow::StatusSourceKSelectExample(){638ARROW_ASSIGN_OR_RAISE(autoinput,MakeGroupableBatches());639ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan>plan,640ac::ExecPlan::Make(*cp::threaded_exec_context()));641arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;642643ARROW_ASSIGN_OR_RAISE(644ac::ExecNode*source,645ac::MakeExecNode("source",plan.get(),{},646ac::SourceNodeOptions{input.schema,input.gen()}));647648cp::SelectKOptionsoptions=cp::SelectKOptions::TopKDefault(/*k=*/2,{"i32"});649650ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink",plan.get(),{source},651ac::SelectKSinkNodeOptions{options,&sink_gen}));652653autoschema=arrow::schema(654{arrow::field("i32",arrow::int32()),arrow::field("str",arrow::utf8())});655656returnExecutePlanAndCollectAsTableWithCustomSink(plan,schema,sink_gen);657}
table_sink#
Thetable_sink node provides the ability to receive the output as an in-memory table.This is simpler to use than the other sink nodes provided by the streaming execution enginebut it only makes sense when the output fits comfortably in memory.The node is created usingTableSinkNodeOptions.
Example of usingtable_sink
749/// \brief An example showing a table sink node750///751/// TableSink Example752/// This example shows how a table_sink can be used753/// in an execution plan. This includes a source node754/// receiving data as batches and the table sink node755/// which emits the output as a table.756arrow::StatusTableSinkExample(){757ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan>plan,758ac::ExecPlan::Make(*cp::threaded_exec_context()));759760ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());761762autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};763764ARROW_ASSIGN_OR_RAISE(ac::ExecNode*source,765ac::MakeExecNode("source",plan.get(),{},source_node_options));766767std::shared_ptr<arrow::Table>output_table;768autotable_sink_options=ac::TableSinkNodeOptions{&output_table};769770ARROW_RETURN_NOT_OK(771ac::MakeExecNode("table_sink",plan.get(),{source},table_sink_options));772// validate the ExecPlan773ARROW_RETURN_NOT_OK(plan->Validate());774std::cout<<"ExecPlan created : "<<plan->ToString()<<std::endl;775// start the ExecPlan776plan->StartProducing();777778// Wait for the plan to finish779autofinished=plan->finished();780RETURN_NOT_OK(finished.status());781std::cout<<"Results : "<<output_table->ToString()<<std::endl;782returnarrow::Status::OK();783}
scan#
scan is an operation used to load and process datasets. It should be preferred over themore genericsource node when your input is a dataset. The behavior is defined usingarrow::dataset::ScanNodeOptions. More information on datasets and the variousscan options can be found inTabular Datasets.
This node is capable of applying pushdown filters to the file readers which reducethe amount of data that needs to be read. This means you may supply the samefilter expression to the scan node that you also supply to the FilterNode becausethe filtering is done in two different places.
Scan example:
271/// \brief An example demonstrating a scan and sink node272///273/// Scan-Table274/// This example shows how scan operation can be applied on a dataset.275/// There are operations that can be applied on the scan (project, filter)276/// and the input data can be processed. The output is obtained as a table277arrow::StatusScanSinkExample(){278ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());279280autooptions=std::make_shared<arrow::dataset::ScanOptions>();281options->projection=cp::project({},{});// create empty projection282283// construct the scan node284autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};285286ac::Declarationscan{"scan",std::move(scan_node_options)};287288returnExecutePlanAndCollectAsTable(std::move(scan));289}
write#
Thewrite node saves query results as a dataset of files in aformat like Parquet, Feather, CSV, etc. using theTabular Datasetsfunctionality in Arrow. The write options are provided via thearrow::dataset::WriteNodeOptions which in turn containsarrow::dataset::FileSystemDatasetWriteOptions.arrow::dataset::FileSystemDatasetWriteOptions providescontrol over the written dataset, including options like the outputdirectory, file naming scheme, and so on.
Write example:
663/// \brief An example showing a write node664/// \param file_path The destination to write to665///666/// Scan-Filter-Write667/// This example shows how scan node can be used to load the data668/// and after processing how it can be written to disk.669arrow::StatusScanFilterWriteExample(conststd::string&file_path){670ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());671672autooptions=std::make_shared<arrow::dataset::ScanOptions>();673// empty projection674options->projection=cp::project({},{});675676autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};677678ac::Declarationscan{"scan",std::move(scan_node_options)};679680arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;681682std::stringroot_path="";683std::stringuri="file://"+file_path;684ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem>filesystem,685arrow::fs::FileSystemFromUri(uri,&root_path));686687autobase_path=root_path+"/parquet_dataset";688// Uncomment the following line, if run repeatedly689// ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));690ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));691692// The partition schema determines which fields are part of the partitioning.693autopartition_schema=arrow::schema({arrow::field("a",arrow::int32())});694// We'll use Hive-style partitioning,695// which creates directories with "key=value" pairs.696697autopartitioning=698std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);699// We'll write Parquet files.700autoformat=std::make_shared<arrow::dataset::ParquetFileFormat>();701702arrow::dataset::FileSystemDatasetWriteOptionswrite_options;703write_options.file_write_options=format->DefaultWriteOptions();704write_options.filesystem=filesystem;705write_options.base_dir=base_path;706write_options.partitioning=partitioning;707write_options.basename_template="part{i}.parquet";708709arrow::dataset::WriteNodeOptionswrite_node_options{write_options};710711ac::Declarationwrite{"write",{std::move(scan)},std::move(write_node_options)};712713// Since the write node has no output we simply run the plan to completion and the714// data should be written715ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));716717std::cout<<"Dataset written to "<<base_path<<std::endl;718returnarrow::Status::OK();719}
union#
union merges multiple data streams with the same schema into one, similar toa SQLUNIONALL clause.
The following example demonstrates how this can be achieved usingtwo data sources.
Union example:
725/// \brief An example showing a union node726///727/// Source-Union-Table728/// This example shows how a union operation can be applied on two729/// data sources. The output is collected into a table.730arrow::StatusSourceUnionSinkExample(){731ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());732733ac::Declarationlhs{"source",734ac::SourceNodeOptions{basic_data.schema,basic_data.gen()}};735lhs.label="lhs";736ac::Declarationrhs{"source",737ac::SourceNodeOptions{basic_data.schema,basic_data.gen()}};738rhs.label="rhs";739ac::Declarationunion_plan{740"union",{std::move(lhs),std::move(rhs)},ac::ExecNodeOptions{}};741742returnExecutePlanAndCollectAsTable(std::move(union_plan));743}
hash_join#
hash_join operation provides the relational algebra operation, join using hash-basedalgorithm.HashJoinNodeOptions contains the options required indefining a join. The hash_join supportsleft/right/full semi/anti/outerjoins.Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x”which can be appended as a suffix for column names duplicated in both left and rightrelations.) can be set via the join options.Read more on hash-joins.
Hash-Join example:
604/// \brief An example showing a hash join node605///606/// Source-HashJoin-Table607/// This example shows how source node gets the data and how a self-join608/// is applied on the data. The join options are configurable. The output609/// is collected into a table.610arrow::StatusSourceHashJoinSinkExample(){611ARROW_ASSIGN_OR_RAISE(autoinput,MakeGroupableBatches());612613ac::Declarationleft{"source",ac::SourceNodeOptions{input.schema,input.gen()}};614ac::Declarationright{"source",ac::SourceNodeOptions{input.schema,input.gen()}};615616ac::HashJoinNodeOptionsjoin_opts{617ac::JoinType::INNER,618/*left_keys=*/{"str"},619/*right_keys=*/{"str"},cp::literal(true),"l_","r_"};620621ac::Declarationhashjoin{622"hashjoin",{std::move(left),std::move(right)},std::move(join_opts)};623624returnExecutePlanAndCollectAsTable(std::move(hashjoin));625}
Summary#
There are examples of these nodes which can be found incpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.
Complete Example:
19#include<arrow/array.h> 20#include<arrow/builder.h> 21 22#include<arrow/acero/exec_plan.h> 23#include<arrow/compute/api.h> 24#include<arrow/compute/api_vector.h> 25#include<arrow/compute/cast.h> 26 27#include<arrow/csv/api.h> 28 29#include<arrow/dataset/dataset.h> 30#include<arrow/dataset/file_base.h> 31#include<arrow/dataset/file_parquet.h> 32#include<arrow/dataset/plan.h> 33#include<arrow/dataset/scanner.h> 34 35#include<arrow/io/interfaces.h> 36#include<arrow/io/memory.h> 37 38#include<arrow/result.h> 39#include<arrow/status.h> 40#include<arrow/table.h> 41 42#include<arrow/ipc/api.h> 43 44#include<arrow/util/future.h> 45#include<arrow/util/range.h> 46#include<arrow/util/thread_pool.h> 47#include<arrow/util/vector.h> 48 49#include<iostream> 50#include<memory> 51#include<utility> 52 53// Demonstrate various operators in Arrow Streaming Execution Engine 54 55namespacecp=::arrow::compute; 56namespaceac=::arrow::acero; 57 58constexprcharkSep[]="******"; 59 60voidPrintBlock(conststd::string&msg){ 61std::cout<<"\n\t"<<kSep<<" "<<msg<<" "<<kSep<<"\n"<<std::endl; 62} 63 64template<typenameTYPE, 65typename=typenamestd::enable_if<arrow::is_number_type<TYPE>::value| 66arrow::is_boolean_type<TYPE>::value| 67arrow::is_temporal_type<TYPE>::value>::type> 68arrow::Result<std::shared_ptr<arrow::Array>>GetArrayDataSample( 69conststd::vector<typenameTYPE::c_type>&values){ 70usingArrowBuilderType=typenamearrow::TypeTraits<TYPE>::BuilderType; 71ArrowBuilderTypebuilder; 72ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); 73ARROW_RETURN_NOT_OK(builder.AppendValues(values)); 74returnbuilder.Finish(); 75} 76 77template<classTYPE> 78arrow::Result<std::shared_ptr<arrow::Array>>GetBinaryArrayDataSample( 79conststd::vector<std::string>&values){ 80usingArrowBuilderType=typenamearrow::TypeTraits<TYPE>::BuilderType; 81ArrowBuilderTypebuilder; 82ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); 83ARROW_RETURN_NOT_OK(builder.AppendValues(values)); 84returnbuilder.Finish(); 85} 86 87arrow::Result<std::shared_ptr<arrow::RecordBatch>>GetSampleRecordBatch( 88constarrow::ArrayVectorarray_vector,constarrow::FieldVector&field_vector){ 89std::shared_ptr<arrow::RecordBatch>record_batch; 90ARROW_ASSIGN_OR_RAISE(autostruct_result, 91arrow::StructArray::Make(array_vector,field_vector)); 92returnrecord_batch->FromStructArray(struct_result); 93} 94 95/// \brief Create a sample table 96/// The table's contents will be: 97/// a,b 98/// 1,null 99/// 2,true100/// null,true101/// 3,false102/// null,true103/// 4,false104/// 5,null105/// 6,false106/// 7,false107/// 8,true108/// \return The created table109110arrow::Result<std::shared_ptr<arrow::Table>>GetTable(){111autonull_long=std::numeric_limits<int64_t>::quiet_NaN();112ARROW_ASSIGN_OR_RAISE(autoint64_array,113GetArrayDataSample<arrow::Int64Type>(114{1,2,null_long,3,null_long,4,5,6,7,8}));115116arrow::BooleanBuilderboolean_builder;117std::shared_ptr<arrow::BooleanArray>bool_array;118119std::vector<uint8_t>bool_values={false,true,true,false,true,120false,false,false,false,true};121std::vector<bool>is_valid={false,true,true,true,true,122true,false,true,true,true};123124ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));125126ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values,is_valid));127128ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));129130autorecord_batch=131arrow::RecordBatch::Make(arrow::schema({arrow::field("a",arrow::int64()),132arrow::field("b",arrow::boolean())}),13310,{int64_array,bool_array});134ARROW_ASSIGN_OR_RAISE(autotable,arrow::Table::FromRecordBatches({record_batch}));135returntable;136}137138/// \brief Create a sample dataset139/// \return An in-memory dataset based on GetTable()140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>>GetDataset(){141ARROW_ASSIGN_OR_RAISE(autotable,GetTable());142autods=std::make_shared<arrow::dataset::InMemoryDataset>(table);143returnds;144}145146arrow::Result<cp::ExecBatch>GetExecBatchFromVectors(147constarrow::FieldVector&field_vector,constarrow::ArrayVector&array_vector){148std::shared_ptr<arrow::RecordBatch>record_batch;149ARROW_ASSIGN_OR_RAISE(autores_batch,GetSampleRecordBatch(array_vector,field_vector));150cp::ExecBatchbatch{*res_batch};151returnbatch;152}153154// (Doc section: BatchesWithSchema Definition)155structBatchesWithSchema{156std::vector<cp::ExecBatch>batches;157std::shared_ptr<arrow::Schema>schema;158// This method uses internal arrow utilities to159// convert a vector of record batches to an AsyncGenerator of optional batches160arrow::AsyncGenerator<std::optional<cp::ExecBatch>>gen()const{161autoopt_batches=::arrow::internal::MapVector(162[](cp::ExecBatchbatch){returnstd::make_optional(std::move(batch));},163batches);164arrow::AsyncGenerator<std::optional<cp::ExecBatch>>gen;165gen=arrow::MakeVectorGenerator(std::move(opt_batches));166returngen;167}168};169// (Doc section: BatchesWithSchema Definition)170171// (Doc section: MakeBasicBatches Definition)172arrow::Result<BatchesWithSchema>MakeBasicBatches(){173BatchesWithSchemaout;174autofield_vector={arrow::field("a",arrow::int32()),175arrow::field("b",arrow::boolean())};176ARROW_ASSIGN_OR_RAISE(autob1_int,GetArrayDataSample<arrow::Int32Type>({0,4}));177ARROW_ASSIGN_OR_RAISE(autob2_int,GetArrayDataSample<arrow::Int32Type>({5,6,7}));178ARROW_ASSIGN_OR_RAISE(autob3_int,GetArrayDataSample<arrow::Int32Type>({8,9,10}));179180ARROW_ASSIGN_OR_RAISE(autob1_bool,181GetArrayDataSample<arrow::BooleanType>({false,true}));182ARROW_ASSIGN_OR_RAISE(autob2_bool,183GetArrayDataSample<arrow::BooleanType>({true,false,true}));184ARROW_ASSIGN_OR_RAISE(autob3_bool,185GetArrayDataSample<arrow::BooleanType>({false,true,false}));186187ARROW_ASSIGN_OR_RAISE(autob1,188GetExecBatchFromVectors(field_vector,{b1_int,b1_bool}));189ARROW_ASSIGN_OR_RAISE(autob2,190GetExecBatchFromVectors(field_vector,{b2_int,b2_bool}));191ARROW_ASSIGN_OR_RAISE(autob3,192GetExecBatchFromVectors(field_vector,{b3_int,b3_bool}));193194out.batches={b1,b2,b3};195out.schema=arrow::schema(field_vector);196returnout;197}198// (Doc section: MakeBasicBatches Definition)199200arrow::Result<BatchesWithSchema>MakeSortTestBasicBatches(){201BatchesWithSchemaout;202autofield=arrow::field("a",arrow::int32());203ARROW_ASSIGN_OR_RAISE(autob1_int,GetArrayDataSample<arrow::Int32Type>({1,3,0,2}));204ARROW_ASSIGN_OR_RAISE(autob2_int,205GetArrayDataSample<arrow::Int32Type>({121,101,120,12}));206ARROW_ASSIGN_OR_RAISE(autob3_int,207GetArrayDataSample<arrow::Int32Type>({10,110,210,121}));208ARROW_ASSIGN_OR_RAISE(autob4_int,209GetArrayDataSample<arrow::Int32Type>({51,101,2,34}));210ARROW_ASSIGN_OR_RAISE(autob5_int,211GetArrayDataSample<arrow::Int32Type>({11,31,1,12}));212ARROW_ASSIGN_OR_RAISE(autob6_int,213GetArrayDataSample<arrow::Int32Type>({12,101,120,12}));214ARROW_ASSIGN_OR_RAISE(autob7_int,215GetArrayDataSample<arrow::Int32Type>({0,110,210,11}));216ARROW_ASSIGN_OR_RAISE(autob8_int,217GetArrayDataSample<arrow::Int32Type>({51,10,2,3}));218219ARROW_ASSIGN_OR_RAISE(autob1,GetExecBatchFromVectors({field},{b1_int}));220ARROW_ASSIGN_OR_RAISE(autob2,GetExecBatchFromVectors({field},{b2_int}));221ARROW_ASSIGN_OR_RAISE(autob3,222GetExecBatchFromVectors({field,field},{b3_int,b8_int}));223ARROW_ASSIGN_OR_RAISE(autob4,224GetExecBatchFromVectors({field,field,field,field},225{b4_int,b5_int,b6_int,b7_int}));226out.batches={b1,b2,b3,b4};227out.schema=arrow::schema({field});228returnout;229}230231arrow::Result<BatchesWithSchema>MakeGroupableBatches(intmultiplicity=1){232BatchesWithSchemaout;233autofields={arrow::field("i32",arrow::int32()),arrow::field("str",arrow::utf8())};234ARROW_ASSIGN_OR_RAISE(autob1_int,GetArrayDataSample<arrow::Int32Type>({12,7,3}));235ARROW_ASSIGN_OR_RAISE(autob2_int,GetArrayDataSample<arrow::Int32Type>({-2,-1,3}));236ARROW_ASSIGN_OR_RAISE(autob3_int,GetArrayDataSample<arrow::Int32Type>({5,3,-8}));237ARROW_ASSIGN_OR_RAISE(autob1_str,GetBinaryArrayDataSample<arrow::StringType>(238{"alpha","beta","alpha"}));239ARROW_ASSIGN_OR_RAISE(autob2_str,GetBinaryArrayDataSample<arrow::StringType>(240{"alpha","gamma","alpha"}));241ARROW_ASSIGN_OR_RAISE(autob3_str,GetBinaryArrayDataSample<arrow::StringType>(242{"gamma","beta","alpha"}));243ARROW_ASSIGN_OR_RAISE(autob1,GetExecBatchFromVectors(fields,{b1_int,b1_str}));244ARROW_ASSIGN_OR_RAISE(autob2,GetExecBatchFromVectors(fields,{b2_int,b2_str}));245ARROW_ASSIGN_OR_RAISE(autob3,GetExecBatchFromVectors(fields,{b3_int,b3_str}));246out.batches={b1,b2,b3};247248size_tbatch_count=out.batches.size();249for(intrepeat=1;repeat<multiplicity;++repeat){250for(size_ti=0;i<batch_count;++i){251out.batches.push_back(out.batches[i]);252}253}254255out.schema=arrow::schema(fields);256returnout;257}258259arrow::StatusExecutePlanAndCollectAsTable(ac::Declarationplan){260// collect sink_reader into a Table261std::shared_ptr<arrow::Table>response_table;262ARROW_ASSIGN_OR_RAISE(response_table,ac::DeclarationToTable(std::move(plan)));263264std::cout<<"Results : "<<response_table->ToString()<<std::endl;265266returnarrow::Status::OK();267}268269// (Doc section: Scan Example)270271/// \brief An example demonstrating a scan and sink node272///273/// Scan-Table274/// This example shows how scan operation can be applied on a dataset.275/// There are operations that can be applied on the scan (project, filter)276/// and the input data can be processed. The output is obtained as a table277arrow::StatusScanSinkExample(){278ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());279280autooptions=std::make_shared<arrow::dataset::ScanOptions>();281options->projection=cp::project({},{});// create empty projection282283// construct the scan node284autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};285286ac::Declarationscan{"scan",std::move(scan_node_options)};287288returnExecutePlanAndCollectAsTable(std::move(scan));289}290// (Doc section: Scan Example)291292// (Doc section: Source Example)293294/// \brief An example demonstrating a source and sink node295///296/// Source-Table Example297/// This example shows how a custom source can be used298/// in an execution plan. This includes source node using pregenerated299/// data and collecting it into a table.300///301/// This sort of custom source is often not needed. In most cases you can302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,303/// exec_batch_source, or record_batch_source (for in-memory data)304arrow::StatusSourceSinkExample(){305ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());306307autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};308309ac::Declarationsource{"source",std::move(source_node_options)};310311returnExecutePlanAndCollectAsTable(std::move(source));312}313// (Doc section: Source Example)314315// (Doc section: Table Source Example)316317/// \brief An example showing a table source node318///319/// TableSource-Table Example320/// This example shows how a table_source can be used321/// in an execution plan. This includes a table source node322/// receiving data from a table. This plan simply collects the323/// data back into a table but nodes could be added that modify324/// or transform the data as well (as is shown in later examples)325arrow::StatusTableSourceSinkExample(){326ARROW_ASSIGN_OR_RAISE(autotable,GetTable());327328arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;329intmax_batch_size=2;330autotable_source_options=ac::TableSourceNodeOptions{table,max_batch_size};331332ac::Declarationsource{"table_source",std::move(table_source_options)};333334returnExecutePlanAndCollectAsTable(std::move(source));335}336// (Doc section: Table Source Example)337338// (Doc section: Filter Example)339340/// \brief An example showing a filter node341///342/// Source-Filter-Table343/// This example shows how a filter can be used in an execution plan,344/// to filter data from a source. The output from the execution plan345/// is collected into a table.346arrow::StatusScanFilterSinkExample(){347ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());348349autooptions=std::make_shared<arrow::dataset::ScanOptions>();350// specify the filter. This filter removes all rows where the351// value of the "a" column is greater than 3.352cp::Expressionfilter_expr=cp::greater(cp::field_ref("a"),cp::literal(3));353// set filter for scanner : on-disk / push-down filtering.354// This step can be skipped if you are not reading from disk.355options->filter=filter_expr;356// empty projection357options->projection=cp::project({},{});358359// construct the scan node360std::cout<<"Initialized Scanning Options"<<std::endl;361362autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};363std::cout<<"Scan node options created"<<std::endl;364365ac::Declarationscan{"scan",std::move(scan_node_options)};366367// pipe the scan node into the filter node368// Need to set the filter in scan node options and filter node options.369// At scan node it is used for on-disk / push-down filtering.370// At filter node it is used for in-memory filtering.371ac::Declarationfilter{372"filter",{std::move(scan)},ac::FilterNodeOptions(std::move(filter_expr))};373374returnExecutePlanAndCollectAsTable(std::move(filter));375}376377// (Doc section: Filter Example)378379// (Doc section: Project Example)380381/// \brief An example showing a project node382///383/// Scan-Project-Table384/// This example shows how a Scan operation can be used to load the data385/// into the execution plan, how a project operation can be applied on the386/// data stream and how the output is collected into a table387arrow::StatusScanProjectSinkExample(){388ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());389390autooptions=std::make_shared<arrow::dataset::ScanOptions>();391// projection392cp::Expressiona_times_2=cp::call("multiply",{cp::field_ref("a"),cp::literal(2)});393options->projection=cp::project({},{});394395autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};396397ac::Declarationscan{"scan",std::move(scan_node_options)};398ac::Declarationproject{399"project",{std::move(scan)},ac::ProjectNodeOptions({a_times_2})};400401returnExecutePlanAndCollectAsTable(std::move(project));402}403404// (Doc section: Project Example)405406// This is a variation of ScanProjectSinkExample introducing how to use the407// Declaration::Sequence function408arrow::StatusScanProjectSequenceSinkExample(){409ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());410411autooptions=std::make_shared<arrow::dataset::ScanOptions>();412// projection413cp::Expressiona_times_2=cp::call("multiply",{cp::field_ref("a"),cp::literal(2)});414options->projection=cp::project({},{});415416autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};417418// (Doc section: Project Sequence Example)419// Inputs do not have to be passed to the project node when using Sequence420ac::Declarationplan=421ac::Declaration::Sequence({{"scan",std::move(scan_node_options)},422{"project",ac::ProjectNodeOptions({a_times_2})}});423// (Doc section: Project Sequence Example)424425returnExecutePlanAndCollectAsTable(std::move(plan));426}427428// (Doc section: Scalar Aggregate Example)429430/// \brief An example showing an aggregation node to aggregate an entire table431///432/// Source-Aggregation-Table433/// This example shows how an aggregation operation can be applied on a434/// execution plan resulting in a scalar output. The source node loads the435/// data and the aggregation (counting unique types in column 'a')436/// is applied on this data. The output is collected into a table (that will437/// have exactly one row)438arrow::StatusSourceScalarAggregateSinkExample(){439ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());440441autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};442443ac::Declarationsource{"source",std::move(source_node_options)};444autoaggregate_options=445ac::AggregateNodeOptions{/*aggregates=*/{{"sum",nullptr,"a","sum(a)"}}};446ac::Declarationaggregate{447"aggregate",{std::move(source)},std::move(aggregate_options)};448449returnExecutePlanAndCollectAsTable(std::move(aggregate));450}451// (Doc section: Scalar Aggregate Example)452453// (Doc section: Group Aggregate Example)454455/// \brief An example showing an aggregation node to perform a group-by operation456///457/// Source-Aggregation-Table458/// This example shows how an aggregation operation can be applied on a459/// execution plan resulting in grouped output. The source node loads the460/// data and the aggregation (counting unique types in column 'a') is461/// applied on this data. The output is collected into a table that will contain462/// one row for each unique combination of group keys.463arrow::StatusSourceGroupAggregateSinkExample(){464ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());465466arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;467468autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};469470ac::Declarationsource{"source",std::move(source_node_options)};471autooptions=std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);472autoaggregate_options=473ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count",options,"a","count(a)"}},474/*keys=*/{"b"}};475ac::Declarationaggregate{476"aggregate",{std::move(source)},std::move(aggregate_options)};477478returnExecutePlanAndCollectAsTable(std::move(aggregate));479}480// (Doc section: Group Aggregate Example)481482// (Doc section: ConsumingSink Example)483484/// \brief An example showing a consuming sink node485///486/// Source-Consuming-Sink487/// This example shows how the data can be consumed within the execution plan488/// by using a ConsumingSink node. There is no data output from this execution plan.489arrow::StatusSourceConsumingSinkExample(){490ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());491492autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};493494ac::Declarationsource{"source",std::move(source_node_options)};495496std::atomic<uint32_t>batches_seen{0};497arrow::Future<>finish=arrow::Future<>::Make();498structCustomSinkNodeConsumer:publicac::SinkNodeConsumer{499CustomSinkNodeConsumer(std::atomic<uint32_t>*batches_seen,arrow::Future<>finish)500:batches_seen(batches_seen),finish(std::move(finish)){}501502arrow::StatusInit(conststd::shared_ptr<arrow::Schema>&schema,503ac::BackpressureControl*backpressure_control,504ac::ExecPlan*plan)override{505// This will be called as the plan is started (before the first call to Consume)506// and provides the schema of the data coming into the node, controls for pausing /507// resuming input, and a pointer to the plan itself which can be used to access508// other utilities such as the thread indexer or async task scheduler.509returnarrow::Status::OK();510}511512arrow::StatusConsume(cp::ExecBatchbatch)override{513(*batches_seen)++;514returnarrow::Status::OK();515}516517arrow::Future<>Finish()override{518// Here you can perform whatever (possibly async) cleanup is needed, e.g. closing519// output file handles and flushing remaining work520returnarrow::Future<>::MakeFinished();521}522523std::atomic<uint32_t>*batches_seen;524arrow::Future<>finish;525};526std::shared_ptr<CustomSinkNodeConsumer>consumer=527std::make_shared<CustomSinkNodeConsumer>(&batches_seen,finish);528529ac::Declarationconsuming_sink{"consuming_sink",530{std::move(source)},531ac::ConsumingSinkNodeOptions(std::move(consumer))};532533// Since we are consuming the data within the plan there is no output and we simply534// run the plan to completion instead of collecting into a table.535ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));536537std::cout<<"The consuming sink node saw "<<batches_seen.load()<<" batches"538<<std::endl;539returnarrow::Status::OK();540}541// (Doc section: ConsumingSink Example)542543// (Doc section: OrderBySink Example)544545arrow::StatusExecutePlanAndCollectAsTableWithCustomSink(546std::shared_ptr<ac::ExecPlan>plan,std::shared_ptr<arrow::Schema>schema,547arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen){548// translate sink_gen (async) to sink_reader (sync)549std::shared_ptr<arrow::RecordBatchReader>sink_reader=550ac::MakeGeneratorReader(schema,std::move(sink_gen),arrow::default_memory_pool());551552// validate the ExecPlan553ARROW_RETURN_NOT_OK(plan->Validate());554std::cout<<"ExecPlan created : "<<plan->ToString()<<std::endl;555// start the ExecPlan556plan->StartProducing();557558// collect sink_reader into a Table559std::shared_ptr<arrow::Table>response_table;560561ARROW_ASSIGN_OR_RAISE(response_table,562arrow::Table::FromRecordBatchReader(sink_reader.get()));563564std::cout<<"Results : "<<response_table->ToString()<<std::endl;565566// stop producing567plan->StopProducing();568// plan mark finished569autofuture=plan->finished();570returnfuture.status();571}572573/// \brief An example showing an order-by node574///575/// Source-OrderBy-Sink576/// In this example, the data enters through the source node577/// and the data is ordered in the sink node. The order can be578/// ASCENDING or DESCENDING and it is configurable. The output579/// is obtained as a table from the sink node.580arrow::StatusSourceOrderBySinkExample(){581ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan>plan,582ac::ExecPlan::Make(*cp::threaded_exec_context()));583584ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeSortTestBasicBatches());585586arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;587588autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};589ARROW_ASSIGN_OR_RAISE(ac::ExecNode*source,590ac::MakeExecNode("source",plan.get(),{},source_node_options));591592ARROW_RETURN_NOT_OK(ac::MakeExecNode(593"order_by_sink",plan.get(),{source},594ac::OrderBySinkNodeOptions{595cp::SortOptions{{cp::SortKey{"a",cp::SortOrder::Descending}}},&sink_gen}));596597returnExecutePlanAndCollectAsTableWithCustomSink(plan,basic_data.schema,sink_gen);598}599600// (Doc section: OrderBySink Example)601602// (Doc section: HashJoin Example)603604/// \brief An example showing a hash join node605///606/// Source-HashJoin-Table607/// This example shows how source node gets the data and how a self-join608/// is applied on the data. The join options are configurable. The output609/// is collected into a table.610arrow::StatusSourceHashJoinSinkExample(){611ARROW_ASSIGN_OR_RAISE(autoinput,MakeGroupableBatches());612613ac::Declarationleft{"source",ac::SourceNodeOptions{input.schema,input.gen()}};614ac::Declarationright{"source",ac::SourceNodeOptions{input.schema,input.gen()}};615616ac::HashJoinNodeOptionsjoin_opts{617ac::JoinType::INNER,618/*left_keys=*/{"str"},619/*right_keys=*/{"str"},cp::literal(true),"l_","r_"};620621ac::Declarationhashjoin{622"hashjoin",{std::move(left),std::move(right)},std::move(join_opts)};623624returnExecutePlanAndCollectAsTable(std::move(hashjoin));625}626627// (Doc section: HashJoin Example)628629// (Doc section: KSelect Example)630631/// \brief An example showing a select-k node632///633/// Source-KSelect634/// This example shows how K number of elements can be selected635/// either from the top or bottom. The output node is a modified636/// sink node where output can be obtained as a table.637arrow::StatusSourceKSelectExample(){638ARROW_ASSIGN_OR_RAISE(autoinput,MakeGroupableBatches());639ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan>plan,640ac::ExecPlan::Make(*cp::threaded_exec_context()));641arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;642643ARROW_ASSIGN_OR_RAISE(644ac::ExecNode*source,645ac::MakeExecNode("source",plan.get(),{},646ac::SourceNodeOptions{input.schema,input.gen()}));647648cp::SelectKOptionsoptions=cp::SelectKOptions::TopKDefault(/*k=*/2,{"i32"});649650ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink",plan.get(),{source},651ac::SelectKSinkNodeOptions{options,&sink_gen}));652653autoschema=arrow::schema(654{arrow::field("i32",arrow::int32()),arrow::field("str",arrow::utf8())});655656returnExecutePlanAndCollectAsTableWithCustomSink(plan,schema,sink_gen);657}658659// (Doc section: KSelect Example)660661// (Doc section: Write Example)662663/// \brief An example showing a write node664/// \param file_path The destination to write to665///666/// Scan-Filter-Write667/// This example shows how scan node can be used to load the data668/// and after processing how it can be written to disk.669arrow::StatusScanFilterWriteExample(conststd::string&file_path){670ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset>dataset,GetDataset());671672autooptions=std::make_shared<arrow::dataset::ScanOptions>();673// empty projection674options->projection=cp::project({},{});675676autoscan_node_options=arrow::dataset::ScanNodeOptions{dataset,options};677678ac::Declarationscan{"scan",std::move(scan_node_options)};679680arrow::AsyncGenerator<std::optional<cp::ExecBatch>>sink_gen;681682std::stringroot_path="";683std::stringuri="file://"+file_path;684ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem>filesystem,685arrow::fs::FileSystemFromUri(uri,&root_path));686687autobase_path=root_path+"/parquet_dataset";688// Uncomment the following line, if run repeatedly689// ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));690ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));691692// The partition schema determines which fields are part of the partitioning.693autopartition_schema=arrow::schema({arrow::field("a",arrow::int32())});694// We'll use Hive-style partitioning,695// which creates directories with "key=value" pairs.696697autopartitioning=698std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);699// We'll write Parquet files.700autoformat=std::make_shared<arrow::dataset::ParquetFileFormat>();701702arrow::dataset::FileSystemDatasetWriteOptionswrite_options;703write_options.file_write_options=format->DefaultWriteOptions();704write_options.filesystem=filesystem;705write_options.base_dir=base_path;706write_options.partitioning=partitioning;707write_options.basename_template="part{i}.parquet";708709arrow::dataset::WriteNodeOptionswrite_node_options{write_options};710711ac::Declarationwrite{"write",{std::move(scan)},std::move(write_node_options)};712713// Since the write node has no output we simply run the plan to completion and the714// data should be written715ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));716717std::cout<<"Dataset written to "<<base_path<<std::endl;718returnarrow::Status::OK();719}720721// (Doc section: Write Example)722723// (Doc section: Union Example)724725/// \brief An example showing a union node726///727/// Source-Union-Table728/// This example shows how a union operation can be applied on two729/// data sources. The output is collected into a table.730arrow::StatusSourceUnionSinkExample(){731ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());732733ac::Declarationlhs{"source",734ac::SourceNodeOptions{basic_data.schema,basic_data.gen()}};735lhs.label="lhs";736ac::Declarationrhs{"source",737ac::SourceNodeOptions{basic_data.schema,basic_data.gen()}};738rhs.label="rhs";739ac::Declarationunion_plan{740"union",{std::move(lhs),std::move(rhs)},ac::ExecNodeOptions{}};741742returnExecutePlanAndCollectAsTable(std::move(union_plan));743}744745// (Doc section: Union Example)746747// (Doc section: Table Sink Example)748749/// \brief An example showing a table sink node750///751/// TableSink Example752/// This example shows how a table_sink can be used753/// in an execution plan. This includes a source node754/// receiving data as batches and the table sink node755/// which emits the output as a table.756arrow::StatusTableSinkExample(){757ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan>plan,758ac::ExecPlan::Make(*cp::threaded_exec_context()));759760ARROW_ASSIGN_OR_RAISE(autobasic_data,MakeBasicBatches());761762autosource_node_options=ac::SourceNodeOptions{basic_data.schema,basic_data.gen()};763764ARROW_ASSIGN_OR_RAISE(ac::ExecNode*source,765ac::MakeExecNode("source",plan.get(),{},source_node_options));766767std::shared_ptr<arrow::Table>output_table;768autotable_sink_options=ac::TableSinkNodeOptions{&output_table};769770ARROW_RETURN_NOT_OK(771ac::MakeExecNode("table_sink",plan.get(),{source},table_sink_options));772// validate the ExecPlan773ARROW_RETURN_NOT_OK(plan->Validate());774std::cout<<"ExecPlan created : "<<plan->ToString()<<std::endl;775// start the ExecPlan776plan->StartProducing();777778// Wait for the plan to finish779autofinished=plan->finished();780RETURN_NOT_OK(finished.status());781std::cout<<"Results : "<<output_table->ToString()<<std::endl;782returnarrow::Status::OK();783}784785// (Doc section: Table Sink Example)786787// (Doc section: RecordBatchReaderSource Example)788789/// \brief An example showing the usage of a RecordBatchReader as the data source.790///791/// RecordBatchReaderSourceSink Example792/// This example shows how a record_batch_reader_source can be used793/// in an execution plan. This includes the source node794/// receiving data from a TableRecordBatchReader.795796arrow::StatusRecordBatchReaderSourceSinkExample(){797ARROW_ASSIGN_OR_RAISE(autotable,GetTable());798std::shared_ptr<arrow::RecordBatchReader>reader=799std::make_shared<arrow::TableBatchReader>(table);800ac::Declarationreader_source{"record_batch_reader_source",801ac::RecordBatchReaderSourceNodeOptions{reader}};802returnExecutePlanAndCollectAsTable(std::move(reader_source));803}804805// (Doc section: RecordBatchReaderSource Example)806807enumExampleMode{808SOURCE_SINK=0,809TABLE_SOURCE_SINK=1,810SCAN=2,811FILTER=3,812PROJECT=4,813SCALAR_AGGREGATION=5,814GROUP_AGGREGATION=6,815CONSUMING_SINK=7,816ORDER_BY_SINK=8,817HASHJOIN=9,818KSELECT=10,819WRITE=11,820UNION=12,821TABLE_SOURCE_TABLE_SINK=13,822RECORD_BATCH_READER_SOURCE=14,823PROJECT_SEQUENCE=15824};825826intmain(intargc,char**argv){827intmode=argc>1?std::atoi(argv[2]):SOURCE_SINK;828std::stringbase_save_path=argc>2?argv[2]:"";829arrow::Statusstatus=arrow::compute::Initialize();830if(!status.ok()){831std::cout<<"Error occurred: "<<status.message()<<std::endl;832returnEXIT_FAILURE;833}834// ensure arrow::dataset node factories are in the registry835arrow::dataset::internal::Initialize();836switch(mode){837caseSOURCE_SINK:838PrintBlock("Source Sink Example");839status=SourceSinkExample();840break;841caseTABLE_SOURCE_SINK:842PrintBlock("Table Source Sink Example");843status=TableSourceSinkExample();844break;845caseSCAN:846PrintBlock("Scan Example");847status=ScanSinkExample();848break;849caseFILTER:850PrintBlock("Filter Example");851status=ScanFilterSinkExample();852break;853casePROJECT:854PrintBlock("Project Example");855status=ScanProjectSinkExample();856break;857casePROJECT_SEQUENCE:858PrintBlock("Project Example (using Declaration::Sequence)");859status=ScanProjectSequenceSinkExample();860break;861caseGROUP_AGGREGATION:862PrintBlock("Aggregate Example");863status=SourceGroupAggregateSinkExample();864break;865caseSCALAR_AGGREGATION:866PrintBlock("Aggregate Example");867status=SourceScalarAggregateSinkExample();868break;869caseCONSUMING_SINK:870PrintBlock("Consuming-Sink Example");871status=SourceConsumingSinkExample();872break;873caseORDER_BY_SINK:874PrintBlock("OrderBy Example");875status=SourceOrderBySinkExample();876break;877caseHASHJOIN:878PrintBlock("HashJoin Example");879status=SourceHashJoinSinkExample();880break;881caseKSELECT:882PrintBlock("KSelect Example");883status=SourceKSelectExample();884break;885caseWRITE:886PrintBlock("Write Example");887status=ScanFilterWriteExample(base_save_path);888break;889caseUNION:890PrintBlock("Union Example");891status=SourceUnionSinkExample();892break;893caseTABLE_SOURCE_TABLE_SINK:894PrintBlock("TableSink Example");895status=TableSinkExample();896break;897caseRECORD_BATCH_READER_SOURCE:898PrintBlock("RecordBatchReaderSource Example");899status=RecordBatchReaderSourceSinkExample();900break;901default:902break;903}904905if(status.ok()){906returnEXIT_SUCCESS;907}else{908std::cout<<"Error occurred: "<<status.message()<<std::endl;909returnEXIT_FAILURE;910}911}

