Developing Acero#
This page goes into more detail into the design of Acero. It discusses howto create custom exec nodes and describes some of the philosophies behind Acero’sdesign and implementation. Finally, it gives an overview of how to extend Acerowith new behaviors and how this new behavior can be upstreamed into the core Arrowrepository.
Understanding ExecNode#
ExecNode is an abstract class with several pure virtual methods that control how the node operates:
ExecNode::StartProducing()#
This method is called once at the start of the plan. Most nodes ignore this method (anynecessary initialization should happen in the constructor or Init). However, source nodeswill typically provide a custom implementation. Source nodes should schedule whatever tasksare needed to start reading and providing the data. Source nodes are usually the primarycreator of tasks in a plan.
Note
The ExecPlan operates on a push-based model. Sources are often pull-based. For example,your source may be an iterator. The source node will typically then schedule tasks to pull oneitem from the source and push that item into the source’s output node (viaInputReceived).
Examples#
In the
table_sourcenode the input table is divided into batches. A task is created foreach batch and that task callsInputReceivedon the node’s output.In the
scannode a task is created to start listing fragments from the dataset. Each listingtask then creates tasks to read batches from the fragment, asynchronously. When the batch isfull read in then a continuation schedules a new task with the exec plan. This task callsInputReceivedon the scan node’s output.
ExecNode::InputReceived()#
This method is called many times during the execution of a plan. It is how nodes pass datato each other. An input node will call InputReceived on its output. Acero’s execution modelis push-based. Each node pushes data into its output by calling InputReceived and passing ina batch of data.
The InputReceived method is often where the actual work happens for the node. For example,a project node will execute its expressions and create a new expanded output batch. It will thencall InputReceived on its output. InputReceived will never be called on a source node. Sinknodes will never call InputReceived. All other nodes will experience both.
Some nodes (often called “pipeline breakers”) must accumulate input before they can generateany output. For example, a sort node must accumulate all input before it can sort the data andgenerate output. In these nodes the InputReceived method will typically place the data intosome kind of accumulation queue. If the node doesn’t have enough data to operate then it willnot call InputReceived. This will then be the end of the current task.
Examples#
The
projectnode runs its expressions, using the received batch as input for the expression.A new batch is created from the input batch and the result of the expressions. The new batch isgiven the same order index as the input batch and the node then callsInputReceivedon itsoutput.The
order_bynode inserts the batch into an accumulation queue. If this was the last batchthen the node will sort everything in the accumulation queue. The node will then callInputReceivedon the output for each batch in the sorted result. A new batch index will beassigned to each batch. Note that this final output step might also occur as a result of a calltoInputFinished(described below).
ExecNode::InputFinished()#
This method will be called once per input. A node will call InputFinished on its output once itknows how many batches it will be sending to that output. Normally this happens when the node isfinished working. For example, a scan node will call InputFinished once it has finished readingits files. However, it could call it earlier if it knows (maybe from file metadata) how manybatches will be created.
Some nodes will use this signal to trigger some processing. For example, a sort node need towait until it has received all of its input before it can sort the data. It relies on the InputFinishedcall to know this has happened.
Even if a node is not doing any special processing when finished (e.g. a project node or filter nodedoesn’t need to do any end-of-stream processing) that node will still call InputFinished on itsoutput.
Warning
The InputFinished call might arrive before the final call to InputReceived. In fact, it couldeven be sent out before any calls to InputReceived begin. For example, the table source nodealways knows exactly how many batches it will be producing. It could choose to call InputFinishedbefore it ever calls InputReceived. If a node needs to do “end-of-stream” processing then it typicallyuses an AtomicCounter which is a helper class to figure out when all of the data has arrived.
Examples#
The
order_bychecks to see if it has already received all its batches. If it has then it performsthe sorting step described in theInputReceivedexample. Before it starts sending output data itchecks to see how many output batches it has (it’s possible the batch size changed as part of theaccumulating or sorting) and callsInputFinishedon the node’s output.The
fetchnode, during a call toInputReceivedrealizes it has received all the rows it wasasked for. It callsInputFinishedon its output immediately (even though its ownInputFinishedmethod has not yet been called)
ExecNode::PauseProducing() /ExecNode::ResumeProducing()#
These methods control backpressure. Some nodes may need to pause their input to avoid accumulatingtoo much data. For example, when the user is consuming the plan with a RecordBatchReader we use aSinkNode. The SinkNode places data in a queue that the RecordBatchReader pulls from (this is aconversion from a push-model to a pull-model). If the user is reading the RecordBatchReader slowly thenit is possible this queue will start to fill up. For another example we can consider the write node.This node writes data to a filesystem. If the writes are slow then data might accumulate at thewrite node. As a result, the write node would need to apply backpressure.
When a node realizes that it needs to apply some backpressure it will call PauseProducing on its input.Once the node has enough space to continue it will then call ResumeProducing on its input. For example,the SinkNode would pause when its queue gets too full. As the user continues to read from theRecordBatchReader we can expect the queue to slowly drain. Once the queue has drained enough then theSinkNode can call ResumeProducing.
Source nodes typically need to provide special behavior for PauseProducing and ResumeProducing. Forexample, a scan node that is reading from a file can pause reading the file. However, some source nodesmay not be able to pause in any meaningful way. There is not much point in a table source node pausingbecause its data is already in memory.
Nodes that are neither source or sink should still forward backpressure signals. For example, whenPauseProducing is called on a project node it should call PauseProducing on its input. If a node hasmultiple inputs then it should forward the signal to every input.
Examples#
The
writenode, in itsInputReceivedmethod, adds a batch to a dataset writer’s queue. If thedataset writer is then full it will return an unfinished future that will complete when it has more room.Thewritenode then callsPauseProducingon its input. It then adds a continuation to the futurethat will callResumeProducingon its input.The
scannode uses anAsyncTaskSchedulerto keep track of all the tasks it schedules. Thisscheduler is throttled to limit how much concurrent I/O thescannode is allowed to perform. WhenPauseProducingis called then the node will pause the scheduler. This means that any tasks queuedbehind the throttle will not be submitted. However, any ongoing I/O will continue (backpressure can’ttake effect immediately). WhenResumeProducingis called thescannode will unpause the scheduler.
ExecNode::StopProducing()#
StopProducing is called when a plan needs to end early. This can happen because the user cancelledthe plan and it can happen because an error occurred. Most nodes do not need to do anything here.There is no expectation or requirement that a node sends any remaining data it has. Any node thatschedules tasks (e.g. a source node) should stop producing new data.
In addition to plan-wide cancellation, a node may call this method on its input if it has decidedthat it has received all the data that it needs. However, because of parallelism, a node may stillreceive a few calls toInputReceived after it has stopped its input.
If any external resources are used then cleanup should happen as part of this call.
Examples#
The
asofjoinnode has a dedicated processing thread the communicates with the main Acero threadsusing a queue. WhenStopProducingis called the node inserts a poison pill into the queue. Thistells the processing thread to stop immediately. Once the processing thread stops it marks its externaltask (described below) as completed which allows the plan to finish.The
fetchnode, inInputReceived, may decide that it has all the data it needs. It can then callStopProducingon its input.
Initialization / Construction / Destruction#
Simple initialization logic (that cannot error) can be done in the constructor. If the initializationlogic may return an invalid status then it can either be done in the exec node’s factory method ortheInit method. The factory method is preferred for simple validation. TheInit method ispreferred if the initialization might do expensive allocation or other resource consumption.Init willalways be called beforeStartProducing is called. Initialization could also be done inStartProducing but keep in mind that other nodes may have started by that point.
In addition, there is aValidate method that can be overloaded to provide custom validation. Thismethod is normally called beforeInit but after all inputs and outputs have been added.
Finalization happens today in the destructor. There are a few examples today where that might be slow.For example, in the write node, if there was an error during the plan, then we might close out some openfiles here. Should there be significant finalization that is either asynchronous or could potentiallytrigger an error then we could introduce a Finalize method to the ExecNode lifecycle. It hasn’t beendone yet only because it hasn’t been needed.
Summary#
Method Name | This is called when… | A node calls this when… |
|---|---|---|
StartProducing | The plan is starting | N/A |
InputReceived | Data is received from the input | To send data to the output |
InputFinished | The input knows how many batches there are | The node can tell its output how many batches there are |
StopProducing | A plan is aborted or an output has enough data | A node has all the data it needs |
Extending Acero#
Acero instantiates a singletonExecFactoryRegistry which maps between names and exec nodefactories (methods which create an ExecNode from options). To create a new ExecNode you can registerthe node with this registry and your node will now be usable by Acero. If you would like to be ableto use this node with Substrait plans you will also need to configure the Substrait registry so that itknows how to map Substrait to your custom node.
This means that you can create and add new nodes to Acero without recompiling Acero from source.
Scheduling and Parallelism#
There are many ways in that data engines can utilize multiple compute resources (e.g. multiple cores).Before we get into the details of Acero’s scheduling we will cover a few high level topics.
Parallel Execution of Plans#
Users may want to execute multiple plans concurrently and they are welcome to do so. However, Acero has noconcept of inter-plan scheduling. Each plan will attempt to maximize its usage of compute resources andthere will likely be contention of CPU and memory and disk resources. If plans are using the default CPU &I/O thread pools this will be mitigated somewhat since they will share the same thread pool.
Locally Distributed Plans#
A common way to tackle multi-threading is to split the input into partitions and then create a plan foreach partition and then merge the results from these plans in some way. For example, let’s assume youhave 20 files and 10 cores and you want to read and sort all the data. You could create a plan for every2 files to read and sort those files. Then you could create one extra plan that takes the input from these10 child plans and merges the 10 input streams in a sorted fashion.
This approach is popular because it is how queries are distributed across multiple servers and so itis widely supported and well understood. Acero does not do this today but there is no reason to prevent it.Adding shuffle & partition nodes to Acero should be a high priority and would enable Acero to be used bydistributed systems. Once that has been done then it should be possible to do a local shuffle (localmeaning exchanging between multiple exec plan instances on a single system) if desired.
A distributed plan can provide parallelism even if the plans themselves run serially#
Pipeline Parallelism#
Acero attempts to maximize parallelism using pipeline parallelism. As each batch of data arrives from thesource we immediately create a task and start processing it. This means we will likely start processingbatch X before the processing of batch X-1 has completed. This is very flexible and powerful. However, it alsomeans that properly implementing an ExecNode is difficult.
For example, an ExecNode’s InputReceived method should be reentrant. In other words, it should be expectedthat InputReceived will be called before the previous call to InputReceived has completed. This means thatnodes with any kind of mutable state will need mutexes or similar mechanisms to protect that state from raceconditions. It also means that tasks can easily get out of order and nodes should not expect any particular orderingof their input (more on this later).
An example of pipeline parallelism on a system with 3 CPU threads and 2 I/O threads#
Asynchronicity#
Some operations take a long time and may not require the CPU. Reading data from the filesystem is one example. If weonly have one thread per core then time will be wasted while we wait for these operations to complete. Thereare two common solutions to this problem. A synchronous solution is often to create more threads than there arecores with the expectation that some of them will be blocked and that is ok. This approach tends to be simplerbut it can lead to excess thread contention and requires fine-tuning.
Another solution is to make the slow operations asynchronous. When the slow operation starts the caller gives upthe thread and allows other tasks to run in the meantime. Once the slow operation finishes then a new task iscreated to take the result and continue processing. This helps to minimize thread contention but tends to bemore complex to implement.
Due to a lack of standard C++ async APIs, Acero uses a combination of the two approaches. Acero has two thread pools.The first is the CPU thread pool. This thread pool has one thread per core. Tasks in this thread pool should neverblock (beyond minor delays for synchronization) and should generally be actively using CPU as much as possible. Threadson the I/O thread pool are expected to spend most of the time idle. They should avoid doing any CPU-intensive work.Their job is basically to wait for data to be available and schedule follow-up tasks on the CPU thread pool.
Arrow achieves asynchronous execution by combining CPU & I/O thread pools#
Note
Most nodes in Acero do not need to worry about asynchronicity. They are fully synchronous and do not spawn tasks.
Task per Pipeline (and sometimes beyond)#
An engine could choose to create a thread task for every execution of a node. However, without careful scheduling,this leads to problems with cache locality. For example, let’s assume we have a basic plan consisting of threeexec nodes, scan, project, and then filter (this is a very common use case). Now let’s assume there are 100 batches.In a task-per-operator model we would have tasks like “Scan Batch 5”, “Project Batch 5”, and “Filter Batch 5”. Eachof those tasks is potentially going to access the same data. For example, maybe theproject andfilter nodes needto read the same column. A column which is intially created in a decode phase of thescan node. To maximize cacheutilization we would need to carefully schedule our tasks to ensure that all three of those tasks are run consecutivelyand assigned to the same CPU core.
To avoid this problem we design tasks that run through as many nodes as possible before the task ends. This sequenceof nodes is often referred to as a “pipeline” and the nodes that end the pipeline (and thus end the task) are oftencalled “pipeline breakers”. Some nodes might even fall somewhere in between. For example, in a hash join node, whenwe receive a batch on the probe side, and the hash table has been built, we do not need to end the task and instead keepon running. This means that tasks might sometimes end at the join node and might sometimes continue past the join node.
A logical view of pipelines in a plan and two tasks, showing that pipeline boundaries may vary during a plan#
Thread Pools and Schedulers#
The CPU and I/O thread pools are a part of the core Arrow-C++ library. They contain a FIFO queue of tasks and willexecute them as a thread is available. For Acero we need additional capabilities. For this we use theAsyncTaskScheduler. In the simplest mode of operation the scheduler simply submits tasks to an underlying thread pool.However, it is also capable of creating sub-schedulers which can apply throttling, prioritization, and task tracking:
A throttled scheduler associates a cost with each task. Tasks are only submitted to the underlying schedulerif there is room. If there is not then the tasks are placed in a queue. The write node uses a throttle of size1 to avoid reentrantly calling the dataset writer (the dataset writer does its own internal scheduling). A throttledscheduler can be manually paused and unpaused. When paused all tasks are queued and queued tasks will not be submittedeven if there is room. This can be useful in source nodes to implement PauseProducing and ResumeProducing.
Priority can be applied to throttled schedulers to control the order in which queued tasks are submitted. Ifthere is room a task is submitted immediately (regardless of priority). However, if the throttle is full thenthe task is queued and subject to prioritization. The scan node throttles how many read requests it generatesand prioritizes reading a dataset in order, if possible.
A task group can be used to keep track of a collection of tasks and run a finalization task when all of thetasks have completed. This is useful for fork-join style problems. The write node uses a task group to closea file once all outstanding write tasks for the file have completed.
There is research and examples out there for different ways to prioritize tasks in an execution engine. Acero has notyet had to address this problem. Let’s go through some common situations:
Engines will often prioritize reading from the build side of a join node before reading from the probe side. Thiswould be more easily handled in Acero by applying backpressure.
Another common use case is to control memory accumulation. Engines will prioritize tasks which are closer to thesink node in an effort to relieve memory pressure. However, Acero currently assumes that spilling will be addedat pipeline breakers and that memory usage in a plan will be more or less static (per core) and well below thelimits of the hardware. This might change if Acero needs to be used in an environment where there are many computeresources and limited memory (e.g. a GPU)
Engines will often use work stealing algorithms to prioritize running tasks on the same core to improve cachelocality. However, since Acero uses a task-per-pipeline model there isn’t much lost opportunity for cacheparallelism that a scheduler could reclaim. Tasks only end when there is no more work that can be done with the data.
While there is not much prioritization in place in Acero today we do have the tools to apply it should we need to.
Note
In addition to the AsyncTaskScheduler there is another class called the TaskScheduler. This class predates theAsyncTaskScheduler and was designed to offer task tracking for highly efficient synchronous fork-join workloads.If this specialized purpose meets your needs then you may consider using it. It would be interesting to profilethis against the AsyncTaskScheduler and see how closely the two compare.
Intra-node Parallelism#
Some nodes can potentially exploit parallelism within a task. For example, in the scan node we can decodecolumns in parallel. In the hash join node, parallelism is sometimes exploited for complex tasks such asbuilding the hash table. This sort of parallelism is less common but not necessarily discouraged. Profiling shouldbe done first though to ensure that this extra parallelism will be helpful in your workload.
All Work Happens in Tasks#
All work in Acero happens as part of a task. When a plan is started the AsyncTaskScheduler is created and given aninitial task. This initial task calls StartProducing on the nodes. Tasks may schedule additional tasks. For example,source nodes will usually schedule tasks during the call to StartProducing. Pipeline breakers will often schedule taskswhen they have accumulated all the data they need. Once all tasks in a plan are finished then the plan is considereddone.
Some nodes use external threads. These threads must be registered as external tasks using the BeginExternalTask method.For example, the asof join node uses a dedicated processing thread to achieve serial execution. This dedicated threadis registered as an external task. External tasks should be avoided where possible because they require carefulhandling to avoid deadlock in error situations.
Ordered Execution#
Some nodes either establish an ordering to their outgoing batches or they need to be able to process batches in order.Acero handles ordering using thebatch_index property on an ExecBatch. If a node has a deterministic output orderthen it should apply a batch index on batches that it emits. For example, the OrderByNode applies a new ordering tobatches (regardless of the incoming ordering). The scan node is able to attach an implicit ordering to batches whichreflects the order of the rows in the files being scanned.
If a node needs to process data in order then it is a bit more complicated. Because of the parallel nature of executionwe cannot guarantee that batches will arrive at a node in order. However, they can generally be expected to be “mostlyordered”. As a result, we can insert the batches into a sequencing queue. The sequencing queue is given a callback whichis guaranteed to run on the batches, serially, in order. For example, the fetch node uses a sequencing queue. The callbackchecks to see if we need to include part or all of the batch, and then slices the batch if needed.
Even if a node does not care about order it should try and maintain the batch index if it can. The project and filternodes do not care about order but they ensure that output batches keep the same index as their input batches. The filternode will even emit empty batches if it needs to so that it can maintain the batch order without gaps.
An example of ordered execution#
Partitioned Execution#
A stream is partitioned (or sometimes called segmented) if rows are grouped together in some way. Currently there is nota formal notion of partitioning. However, one is starting to develop (e.g. segmented aggregation) and we may end upintroducing a more formal notion of partitions to Acero at some point as well.
Spillover#
Spillover has not yet been implemented in Acero.
Distributed Execution#
There are certain exec nodes which are useful when an engine is used in a distributed environment. The terminologycan vary so we will use the Substrait terminology. An exchange node sends data to different workers. Often this isa partitioned exchange so that Acero is expected to partition each batch and distribute partitions across N differentworkers. On the other end we have the capture node. This node receives data from different workers.
These nodes do not exist in Acero today. However, they would be in scope and we hope to have such nodes someday.
Profiling & Tracing#
Acero’s tracing is currently half-implemented and there are major gaps in profiling tools. However, there has been someeffort at tracing with open telemetry and most of the necessary pieces are in place. The main thing currently lacking issome kind of effective visualization of the tracing results.
In order to use the tracing that is present today you will need to build with Arrow withARROW_WITH_OPENTELEMETRY=ON.Then you will need to set the environment variableARROW_TRACING_BACKEND=otlp_http. This will configure open telemetryto export trace results (as OTLP) to the HTTP endpointhttp://localhost:4318/v1/traces. You will need to configure anopen telemetry collector to collect results on that endpoint and you will need to configure a trace viewer of some kindsuch as Jaeger:https://www.jaegertracing.io/docs/1.21/opentelemetry/
Benchmarking#
The most complete macro benchmarking for Acero is provided byvoltrondata-labs/arrowbenchThese include a set of TPC-H benchmarks, executed from the R-dplyr integration, which are run on every Arrow commit andreported to Conbench athttps://conbench.ursa.dev/
In addition to these TPC-H benchmarks there are a number of micro-benchmarks for various nodes (hash-join, asof-join,etc.) Finally, the compute functions themselves should mostly have micro-benchmarks. For more on micro benchmarks youcan refer tohttps://arrow.apache.org/docs/developers/benchmarks.html
Any new functionality should include micro benchmarks to avoid regressions.
Bindings#
Public API#
The public API for Acero consists of Declaration and the various DeclarationToXyz methods. In addition theoptions classes for each node are part of the public API. However, nodes are extensible and so this API isextensible.
R (dplyr)#
Dplyr is an R library for programmatically building queries. The arrow-r package has dplyr bindings whichadapt the dplyr API to create Acero execution plans. In addition, there is a dplyr-substrait backend thatis in development which could eventually replace the Acero-aware binding.
Python#
The pyarrow library binds to Acero in two different ways. First, there is a direct binding in pyarrow.acerowhich directly binds to the public API. Second, there are a number of compute utilities likepyarrow.Table.group_by which uses Acero, though this is invisible to the user.
Java#
The Java implementation exposes some capabilities from Arrow datasets. These use Acero implicitly. Thereare no direct bindings to Acero or Substrait in the Java implementation today.
Design Philosophies#
Engine Independent Compute#
If a node requires complex computation then it should encapsulate that work in abstractions that don’t depend onany particular engine design. For example, the hash join node uses utilities such as a row encoder, a hash table,and an exec batch builder. Other places share implementations of sequencing queues and row segmenters. The nodeitself should be kept minimal and simply maps from Acero to the abstraction.
This helps to decouple designs from Acero’s design details and allows them to be more resilient to changes in theengine. It also helps to promote these abstractions as capabilities on their own. Either for use in other enginesor for potential new additions to pyarrow as compute utilities.
Make Tasks not Threads#
If you need to run something in parallel then you should use thread tasks and not dedicated threads.
This keeps the thread count down (reduces thread contention and context switches)
This prevents deadlock (tasks get cancelled automatically in the event of a failure)
This simplifies profiling (Tasks can be easily measured, easier to know where all the work is)
This makes it possible to run without threads (sometimes users are doing their own threading andsometimes we need to run in thread-restricted environments like emscripten)
Note: we do not always follow this advice currently. There is a dedicated process thread in the asof joinnode. Dedicated threads are “ok” for experimental use but we’d like to migrate away from them.
Don’t Block on CPU Threads#
If you need to run a potentially long running activity that is not actively using CPU resources (e.g. reading fromdisk, network I/O, waiting on an external library using its own threads) then you should use asynchronous utilitiesto ensure that you do not block CPU threads.
Don’t Reinvent the Wheel#
Each node should not be a standalone island of utilities. Where possible, computation should be pushedeither into compute functions or into common shared utilities. This is the only way a project as large asthis can hope to be maintained.
Avoid Query Optimization#
Writing an efficient Acero plan can be challenging. For example, filter expressions and column selectionshould be pushed down into the scan node so that the data isn’t read from disk. Expressions should besimplified and common sub-expressions factored out. The build side of a hash join node should be thesmaller of the two inputs.
However, figuring these problems out is a challenge reserved for a query planner or a query optimizer.Creating a query optimizer is a challenging task beyond the scope of Acero. With adoption of Substraitwe hope utilities will eventually emerge that solve these problems. As a result, we generally avoid doingany kind of query optimization within Acero. Acero should interpret declarations as literally as possible.This helps reduce maintenance and avoids surprises.
We also realize that this is not always possible. For example, the hash join node currently detects if thereis a chain of hash join operators and, if there is, it configure bloom filters between the operators. This istechnically a task that could be left to a query optimizer. However, this behavior is rather specific to Aceroand fairly niche and so it is unlikely it will be introduced to an optimizer anytime soon.
Performance Guidelines#
Batch Size#
Perhaps the most discussed performance criteria is batch size. Acero was originallydesigned based on research to follow a morsel-batch model. Tasks are created based ona large batch of rows (a morsel). The goal is for the morsel to be large enough to justifythe overhead of a task. Within a task the data is further subdivided into batches.Each batch should be small enough to fit comfortable into CPU cache (often the L2 cache).
This sets up two loops. The outer loop is parallel and the inner loop is not:
formorselindataset:# parallelforbatchinmorsel:run_pipeline(batch)
The advantage of this style of execution is that successive nodes (or successive operationswithin an exec node) that access the same column are likely to benefit from cache. It alsois essential for functions that require random access to data. It maximizes parallelism whileminimizing the data transfer from main memory to CPU cache.
If multiple passes through the data are needed (or random access) and the batch is much biggerthen the cache then performance suffers. Breaking the task into smaller batches helps improvetask locality.#
The morsel/batch model is reflected in a few places in Acero:
In most source nodes we will try and grab batches of 1Mi rows. This is often configurable.
In the source node we then iterate and slice off batches of 32Ki rows. This is not currentlyconfigurable.
The hash join node currently requires that a batches contain at 32Ki rows or less as it uses16-bit signed integers as row indices in some places.
However, this guidance is debateable. Profiling has shown that we do not get any real benefitfrom moving to a smaller batch size. It seems any advantage we do get is lost in per-batchoverhead. Most of this overhead appears to be due to various per-batch allocations. In addition,depending on your hardware, it’s not clear that CPU Cache<->RAM will always be the bottleneck. Acombination of linear access, pre-fetch, and high CPU<->RAM bandwidth can alleviate the penaltyof cache misses.
As a result, this section is included in the guide to provide historical context, but should notbe considered binding.
Ongoing & Deprecated Work#
The following efforts are ongoing. They are described here to explain certain duplication in thecode base as well as explain types that are going away.
Scanner v2#
The scanner is currently a node in the datasets module registered with the factory registry as “scan”.This node was written prior to Acero and made extensive use of AsyncGenerator to scan multiple filesin parallel. Unfortunately, the use of AsyncGenerator made the scan difficult to profile, difficultto debug, and impossible to cancel. A new scan node is in progress. It is currently registered withthe name “scan2”. The new scan node uses the AsyncTaskScheduler instead of AsyncGenerator and shouldprovide additional features such as the ability to skip rows and handle nested column projection (forformats that support it)
OrderBySink and SelectKSink#
These two exec nodes provided custom sink implementations. They were written before ordered executionwas added to Acero and were the only way to generate ordered output. However, they had to be placedat the end of a plan and the fact that they were custom sink nodes made them difficult to describe withDeclaration. The OrderByNode and FetchNode replace these. These are kept at the moment until existingbindings move away from them.
Upstreaming Changes#
Acero is designed so that it can be extended without recompilation. You can easily add new computefunctions and exec nodes without creating a fork or compiling Acero. However, as you develop newfeatures that are generally useful, we hope you will make time to upstream your changes.
Even though we welcome these changes we have to admit that there is a cost to this process. Upstreamingcode requires that the new module behave correctly, but that is typically the easier part to review.More importantly, upstreaming code is a process of transferring the maintenance burden from yourself tothe wider Arrow C++ project maintainers. This requires a deep understanding of the code by maintainers,it requires the code be consistent with the style of the project, and it requires that the code be welltested with unit tests to aid in regression.
Because of this, we highly recommend taking the following steps:
As you are starting out you should send a message to the mailing list announcing your intentions anddesign. This will help you determine if there is wider interest in the feature and others may haveideas or suggestions to contribute early on in the process.
If there is not much interest in the feature then keep in mind that it may be difficult to eventuallyupstream the change. The maintenance capacity of the team is limited and we try and prioritizefeatures that are in high demand.
We recommend developing and testing the change on your own fork until you get it to a point where youare fairly confident things are working correctly. If the change is large then you might also thinkabout how you can break up the change into smaller pieces. As you do this you can share both the largerPR (as a draft PR or a branch on your local fork) and the smaller PRs. This way we can see the contextof the smaller PRs. However, if you do break things up, smaller PRs should still ideally stand on theirown.
Any PR will need to have the following:
Unit tests converting the new functionality
Microbenchmarks if there is any significant compute work going on
Examples demonstrating how to use the new feature
Updates to the API reference and this guide
Passing CI (you can enable GitHub Actions on your fork and that will allow most CI jobs to run beforeyou create your PR)

