Streaming Execution (Acero)#

Creating and running execution plans#

enumclassUnalignedBufferHandling#

How to handle unaligned buffers.

Values:

enumeratorkWarn#
enumeratorkIgnore#
enumeratorkReallocate#
enumeratorkError#
UnalignedBufferHandlingGetDefaultUnalignedBufferHandling()#

get the default behavior of unaligned buffer handling

This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which can be set to “warn”, “ignore”, “reallocate”, or “error”. If the environment variable is not set, or is set to an invalid value, this will return kWarn

Result<std::shared_ptr<Schema>>DeclarationToSchema(constDeclaration&declaration,FunctionRegistry*function_registry=NULLPTR)#

Calculate the output schema of a declaration.

This does not actually execute the plan. This operation may fail if the declaration represents an invalid plan (e.g. a project node with multiple inputs)

Parameters:
  • declaration – A declaration describing an execution plan

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Returns:

the schema that batches would have after going through the execution plan

Result<std::string>DeclarationToString(constDeclaration&declaration,FunctionRegistry*function_registry=NULLPTR)#

Create a string representation of a plan.

This representation is for debug purposes only.

Conversion to a string may fail if the declaration represents an invalid plan.

Use Substrait for complete serialization of plans

Parameters:
  • declaration – A declaration describing an execution plan

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Returns:

a string representation of the plan suitable for debugging output

Result<std::shared_ptr<Table>>DeclarationToTable(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Utility method to run a declaration and collect the results into a table.

This method will add a sink node to the declaration to collect results into a table. It will then create anExecPlan from the declaration, start the exec plan, block until the plan has finished, and return the created table.

Parameters:
  • declaration – A declaration describing the plan to run

  • use_threads – Ifuse_threads is false then all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded (but should not use significant CPU resources).

  • memory_pool – The memory pool to use for allocations made while running the plan.

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Result<std::shared_ptr<Table>>DeclarationToTable(Declarationdeclaration,QueryOptionsquery_options)#
Future<std::shared_ptr<Table>>DeclarationToTableAsync(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Asynchronous version of.

Parameters:
  • declaration – A declaration describing the plan to run

  • use_threads – The behavior of use_threads is slightly different than the synchronous version since we cannot run synchronously on the calling thread. Instead, if use_threads=false then a new thread pool will be created with a single thread and this will be used for all compute work.

  • memory_pool – The memory pool to use for allocations made while running the plan.

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Future<std::shared_ptr<Table>>DeclarationToTableAsync(Declarationdeclaration,ExecContextcustom_exec_context)#

Overload of.

The executor must be specified (cannot be null) and must be kept alive until the returned future finishes.

See also

DeclarationToTableAsync accepting a custom exec context

Result<BatchesWithCommonSchema>DeclarationToExecBatches(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Utility method to run a declaration and collect the results into ExecBatch vector.

See also

DeclarationToTable for details on threading & execution

Result<BatchesWithCommonSchema>DeclarationToExecBatches(Declarationdeclaration,QueryOptionsquery_options)#
Future<BatchesWithCommonSchema>DeclarationToExecBatchesAsync(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Asynchronous version of.

See also

DeclarationToTableAsync for details on threading & execution

Future<BatchesWithCommonSchema>DeclarationToExecBatchesAsync(Declarationdeclaration,ExecContextcustom_exec_context)#

Overload of.

See also

DeclarationToExecBatchesAsync accepting a custom exec context

See also

DeclarationToTableAsync for details on threading & execution

Result<std::vector<std::shared_ptr<RecordBatch>>>DeclarationToBatches(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Utility method to run a declaration and collect the results into a vector.

See also

DeclarationToTable for details on threading & execution

Result<std::vector<std::shared_ptr<RecordBatch>>>DeclarationToBatches(Declarationdeclaration,QueryOptionsquery_options)#
Future<std::vector<std::shared_ptr<RecordBatch>>>DeclarationToBatchesAsync(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Asynchronous version of.

See also

DeclarationToTableAsync for details on threading & execution

Future<std::vector<std::shared_ptr<RecordBatch>>>DeclarationToBatchesAsync(Declarationdeclaration,ExecContextexec_context)#

Overload of.

See also

DeclarationToBatchesAsync accepting a custom exec context

See also

DeclarationToTableAsync for details on threading & execution

Result<std::unique_ptr<RecordBatchReader>>DeclarationToReader(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Utility method to run a declaration and return results as aRecordBatchReader.

If an exec context is not provided then a default exec context will be used based on the value ofuse_threads. Ifuse_threads is false then the CPU executor will be a serial executor and all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded.

Ifuse_threads is false then all CPU work will happen during the calls toRecordBatchReader::Next and no CPU work will happen in the background. Ifuse_threads is true then CPU work will happen on the CPU thread pool and tasks may run in between calls toRecordBatchReader::Next. If the returned reader is not consumed quickly enough then the plan will eventually pause as the backpressure queue fills up.

If a custom exec context is provided then the value ofuse_threads will be ignored.

The returnedRecordBatchReader can be closed early to cancel the computation of record batches. In this case, only errors encountered by the computation may be reported. In particular, no cancellation error may be reported.

Result<std::unique_ptr<RecordBatchReader>>DeclarationToReader(Declarationdeclaration,QueryOptionsquery_options)#
StatusDeclarationToStatus(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Utility method to run a declaration and ignore results.

This can be useful when the data are consumed as part of the plan itself, for example, when the plan ends with a write node.

See also

DeclarationToTable for details on threading & execution

StatusDeclarationToStatus(Declarationdeclaration,QueryOptionsquery_options)#
FutureDeclarationToStatusAsync(Declarationdeclaration,booluse_threads=true,MemoryPool*memory_pool=default_memory_pool(),FunctionRegistry*function_registry=NULLPTR)#

Asynchronous version of.

This can be useful when the data are consumed as part of the plan itself, for example, when the plan ends with a write node.

See also

DeclarationToTableAsync for details on threading & execution

FutureDeclarationToStatusAsync(Declarationdeclaration,ExecContextexec_context)#

Overload of.

See also

DeclarationToStatusAsync accepting a custom exec context

See also

DeclarationToTableAsync for details on threading & execution

structDeclaration#
#include <arrow/acero/exec_plan.h>

Helper class for declaring execution nodes.

ADeclaration represents an unconstructedExecNode (and potentially an entire graph since its inputs may also be Declarations)

ADeclaration can be converted to a plan and executed using one of the DeclarationToXyz methods.

For more direct control, aDeclaration can be added to an existing execution plan withDeclaration::AddToPlan, which will recursively construct any inputs as necessary.

Public Types

usingInput=std::variant<ExecNode*,Declaration>#

Public Functions

inlineDeclaration()#
inlineDeclaration(std::stringfactory_name,std::vector<Input>inputs,std::shared_ptr<ExecNodeOptions>options,std::stringlabel)#

construct a declaration

Parameters:
  • factory_name – the name of the exec node to construct. The node must have been added to the exec node registry with this name.

  • inputs – the inputs to the node, these should be other declarations

  • options – options that control the behavior of the node. You must use the appropriate subclass. For example, iffactory_name is “project” thenoptions should beProjectNodeOptions.

  • label – a label to give the node. Can be used to distinguish it from other nodes of the same type in the plan.

template<typenameOptions>
inlineDeclaration(std::stringfactory_name,std::vector<Input>inputs,Optionsoptions,std::stringlabel)#
template<typenameOptions>
inlineDeclaration(std::stringfactory_name,std::vector<Input>inputs,Optionsoptions)#
template<typenameOptions>
inlineDeclaration(std::stringfactory_name,Optionsoptions)#
template<typenameOptions>
inlineDeclaration(std::stringfactory_name,Optionsoptions,std::stringlabel)#
Result<ExecNode*>AddToPlan(ExecPlan*plan,ExecFactoryRegistry*registry=default_exec_factory_registry())const#

add the declaration to an already created execution plan

This method will recursively call AddToPlan on all of the declaration’s inputs. This method is only for advanced use when the DeclarationToXyz methods are not sufficient.

Parameters:
  • plan – the plan to add the node to

  • registry – the registry to use to lookup the node factory

Returns:

the instantiated execution node

boolIsValid(ExecFactoryRegistry*registry=default_exec_factory_registry())const#

Public Members

std::stringfactory_name#

the name of the factory to use when creating a node

std::vector<Input>inputs#

the declarations’s inputs

std::shared_ptr<ExecNodeOptions>options#

options to control the behavior of the node

std::stringlabel#

a label to give the node in the plan

Public Static Functions

staticDeclarationSequence(std::vector<Declaration>decls)#

Convenience factory for the common case of a simple sequence of nodes.

Each of decls will be appended to the inputs of the subsequent declaration, and the final modified declaration will be returned.

Without this convenience factory, constructing a sequence would require explicit, difficult-to-read nesting:

Declaration{"n3",{Declaration{"n2",{Declaration{"n1",{Declaration{"n0",N0Opts{}},},N1Opts{}},},N2Opts{}},},N3Opts{}};
An equivalentDeclaration can be constructed more tersely using Sequence:
Declaration::Sequence({{"n0",N0Opts{}},{"n1",N1Opts{}},{"n2",N2Opts{}},{"n3",N3Opts{}},});

structQueryOptions#
#include <arrow/acero/exec_plan.h>

plan-wide options that can be specified when executing an execution plan

Public Members

booluse_legacy_batching=false#

Should the plan use a legacy batching strategy.

This is currently in place only to support the Scanner::ToTable method. This method relies on batch indices from the scanner remaining consistent. This is impractical in theExecPlan which might slice batches as needed (e.g. for a join)

However, it still works for simple plans and this is the only way we have at the moment for maintaining implicit order.

std::optional<bool>sequence_output=std::nullopt#

If the output has a meaningful order then sequence the output of the plan.

The default behavior (std::nullopt) will sequence output batches if there is a meaningful ordering in the final node and will emit batches immediately otherwise.

If explicitly set to true then plan execution will fail if there is no meaningful ordering. This can be useful to validate a query that should be emitting ordered results.

If explicitly set to false then batches will be emit immediately even if there is a meaningful ordering. This could cause batches to be emit out of order but may offer a small decrease to latency.

booluse_threads=true#

should the plan use multiple background threads for CPU-intensive work

If this is false then all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded (but should not use significant CPU resources).

Will be ignored if custom_cpu_executor is set

::arrow::internal::Executor*custom_cpu_executor=NULLPTR#

custom executor to use for CPU-intensive work

Must be null or remain valid for the duration of the plan. If this is null then a default thread pool will be chosen whose behavior will be controlled by theuse_threads option.

::arrow::internal::Executor*custom_io_executor=NULLPTR#

custom executor to use for IO work

Must be null or remain valid for the duration of the plan. If this is null then the global io thread pool will be chosen whose behavior will be controlled by the “ARROW_IO_THREADS” environment.

MemoryPool*memory_pool=default_memory_pool()#

a memory pool to use for allocations

Must remain valid for the duration of the plan.

FunctionRegistry*function_registry=GetFunctionRegistry()#

a function registry to use for the plan

Must remain valid for the duration of the plan.

std::vector<std::string>field_names#

the names of the output columns

If this is empty then names will be generated based on the input columns

If set then the number of names must equal the number of output columns

std::optional<UnalignedBufferHandling>unaligned_buffer_handling#

Policy for unaligned buffers in source data.

Various compute functions and acero internals will type pun array buffers from uint8_t* to some kind of value type (e.g. we might cast to int32_t* to add two int32 arrays)

If the buffer is poorly aligned (e.g. an int32 array is not aligned on a 4-byte boundary) then this is technically undefined behavior in C++. However, most modern compilers and CPUs are fairly tolerant of this behavior and nothing bad (beyond a small hit to performance) is likely to happen.

Note that this only applies to source buffers. All buffers allocated internally by Acero will be suitably aligned.

If this field is set to kWarn then Acero will check if any buffers are unaligned and, if they are, will emit a warning.

If this field is set to kReallocate then Acero will allocate a new, suitably aligned buffer and copy the contents from the old buffer into this new buffer.

If this field is set to kError then Acero will gracefully abort the plan instead.

If this field is set to kIgnore then Acero will not even check if the buffers are unaligned.

If this field is not set then it will be treated as kWarn unless overridden by the ACERO_ALIGNMENT_HANDLING environment variable

structBatchesWithCommonSchema#
#include <arrow/acero/exec_plan.h>

a collection of exec batches with a common schema

Public Members

std::vector<ExecBatch>batches#
std::shared_ptr<Schema>schema#

Configuration for execution nodes#

enumclassJoinType#

Values:

enumeratorLEFT_SEMI#
enumeratorRIGHT_SEMI#
enumeratorLEFT_ANTI#
enumeratorRIGHT_ANTI#
enumeratorINNER#
enumeratorLEFT_OUTER#
enumeratorRIGHT_OUTER#
enumeratorFULL_OUTER#
enumclassJoinKeyCmp#

Values:

enumeratorEQ#
enumeratorIS#
usingArrayVectorIteratorMaker=std::function<Iterator<std::shared_ptr<ArrayVector>>()>#

a source node that reads from an iterator of array vectors

usingExecBatchIteratorMaker=std::function<Iterator<std::shared_ptr<ExecBatch>>()>#

a source node that reads from an iterator of ExecBatch

usingRecordBatchIteratorMaker=std::function<Iterator<std::shared_ptr<RecordBatch>>()>#
constexprint32_tkDefaultBackpressureHighBytes=1<<30#

a default value at which backpressure will be applied

constexprint32_tkDefaultBackpressureLowBytes=1<<28#

a default value at which backpressure will be removed

std::stringToString(JoinTypet)#
classExecNodeOptions#
#include <arrow/acero/options.h>

A base class for all options objects.

The only time this is used directly is when a node has no configuration

Subclassed byarrow::acero::SchemaSourceNodeOptions< ArrayVectorIteratorMaker >,arrow::acero::SchemaSourceNodeOptions< ExecBatchIteratorMaker >,arrow::acero::SchemaSourceNodeOptions< RecordBatchIteratorMaker >,arrow::acero::AggregateNodeOptions,arrow::acero::AsofJoinNodeOptions,arrow::acero::ConsumingSinkNodeOptions,arrow::acero::FetchNodeOptions,arrow::acero::FilterNodeOptions,arrow::acero::HashJoinNodeOptions,arrow::acero::NamedTableNodeOptions,arrow::acero::OrderByNodeOptions,arrow::acero::PivotLongerNodeOptions,arrow::acero::ProjectNodeOptions,arrow::acero::RecordBatchReaderSourceNodeOptions,arrow::acero::SchemaSourceNodeOptions< ItMaker >,arrow::acero::SinkNodeOptions,arrow::acero::SourceNodeOptions,arrow::acero::TableSinkNodeOptions,arrow::acero::TableSourceNodeOptions,arrow::dataset::ScanNodeOptions,arrow::dataset::ScanV2Options,arrow::dataset::WriteNodeOptions

Public Functions

virtual~ExecNodeOptions()=default#

Public Members

std::shared_ptr<DebugOptions>debug_opts#

This must not be used in release-mode.

classSourceNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

A node representing a generic source of data for Acero.

The source node will start callinggenerator during StartProducing. An initial task will be created that will callgenerator. It will not callgenerator reentrantly. If the source can be read in parallel then those details should be encapsulated withingenerator.

For each batch received a new task will be created to push that batch downstream. This task will slice smaller units of sizeExecPlan::kMaxBatchSize from the parent batch and call InputReceived. Thus, if thegenerator yields a large batch it may result in several calls to InputReceived.

The SourceNode will, by default, assign an implicit ordering to outgoing batches. This is valid as long as the generator generates batches in a deterministic fashion. Currently, the only way to override this is to subclass the SourceNode.

This node is not generally used directly but can serve as the basis for various specialized nodes.

Public Functions

inlineSourceNodeOptions(std::shared_ptr<Schema>output_schema,std::function<Future<std::optional<ExecBatch>>()>generator,Orderingordering=Ordering::Unordered())#

Create an instance from values.

Public Members

std::shared_ptr<Schema>output_schema#

the schema for batches that will be generated by this source

std::function<Future<std::optional<ExecBatch>>()>generator#

an asynchronous stream of batches ending with std::nullopt

Orderingordering#

the order of the data, defaults to Ordering::Unordered

classTableSourceNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node that generates data from a table already loaded in memory

The table source node will slice off chunks, defined bymax_batch_size for parallel processing. The table source node extends source node and so these chunks will be iteratively processed in small batches.

See also

SourceNodeOptions for details.

Public Functions

inlineTableSourceNodeOptions(std::shared_ptr<Table>table,int64_tmax_batch_size=kDefaultMaxBatchSize)#

Create an instance from values.

Public Members

std::shared_ptr<Table>table#

a table which acts as the data source

int64_tmax_batch_size#

size of batches to emit from this node If the table is larger the node will emit multiple batches from the the table to be processed in parallel.

Public Static Attributes

staticconstexprint64_tkDefaultMaxBatchSize=1<<20#
classNamedTableNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

define a lazily resolved Arrow table.

The table uniquely identified by the names can typically be resolved at the time when the plan is to be consumed.

This node is for serialization purposes only and can never be executed.

Public Functions

inlineNamedTableNodeOptions(std::vector<std::string>names,std::shared_ptr<Schema>schema)#

Create an instance from values.

Public Members

std::vector<std::string>names#

the names to put in the serialized plan

std::shared_ptr<Schema>schema#

the output schema of the table

template<typenameItMaker>
classSchemaSourceNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a source node which feeds data from a synchronous iterator of batches

ItMaker is a maker of an iterator of tabular data.

The node can be configured to use an I/O executor. If set then each time the iterator is polled a new I/O thread task will be created to do the polling. This allows a blocking iterator to stay off the CPU thread pool.

Public Functions

inlineSchemaSourceNodeOptions(std::shared_ptr<Schema>schema,ItMakerit_maker,arrow::internal::Executor*io_executor)#

Create an instance that will create a new task on io_executor for each iteration.

inlineSchemaSourceNodeOptions(std::shared_ptr<Schema>schema,ItMakerit_maker,boolrequires_io=false)#

Create an instance that will either iterate synchronously or use the default I/O executor.

Public Members

std::shared_ptr<Schema>schema#

The schema of the record batches from the iterator.

ItMakerit_maker#

A maker of an iterator which acts as the data source.

arrow::internal::Executor*io_executor#

The executor to use for scanning the iterator.

Defaults to the default I/O executor. Only used if requires_io is true. If requires_io is false then this MUST be nullptr.

boolrequires_io#

If true then items will be fetched from the iterator on a dedicated I/O thread to keep I/O off the CPU thread.

classRecordBatchReaderSourceNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a source node that reads from aRecordBatchReader

Each iteration of theRecordBatchReader will be run on a new thread task created on the I/O thread pool.

Public Functions

inlineRecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader>reader,arrow::internal::Executor*io_executor=NULLPTR)#

Create an instance from values.

Public Members

std::shared_ptr<RecordBatchReader>reader#

TheRecordBatchReader which acts as the data source.

arrow::internal::Executor*io_executor#

The executor to use for the reader.

Defaults to the default I/O executor.

classArrayVectorSourceNodeOptions:publicarrow::acero::SchemaSourceNodeOptions<ArrayVectorIteratorMaker>#
#include <arrow/acero/options.h>

An extended Source node which accepts a schema and array-vectors.

classExecBatchSourceNodeOptions:publicarrow::acero::SchemaSourceNodeOptions<ExecBatchIteratorMaker>#
#include <arrow/acero/options.h>

An extended Source node which accepts a schema and exec-batches.

Public Functions

ExecBatchSourceNodeOptions(std::shared_ptr<Schema>schema,std::vector<ExecBatch>batches,::arrow::internal::Executor*io_executor)#
ExecBatchSourceNodeOptions(std::shared_ptr<Schema>schema,std::vector<ExecBatch>batches,boolrequires_io=false)#
inlineSchemaSourceNodeOptions(std::shared_ptr<Schema>schema,ItMakerit_maker,arrow::internal::Executor*io_executor)#

Create an instance that will create a new task on io_executor for each iteration.

inlineSchemaSourceNodeOptions(std::shared_ptr<Schema>schema,ItMakerit_maker,boolrequires_io=false)#

Create an instance that will either iterate synchronously or use the default I/O executor.

classRecordBatchSourceNodeOptions:publicarrow::acero::SchemaSourceNodeOptions<RecordBatchIteratorMaker>#
#include <arrow/acero/options.h>

a source node that reads from an iterator ofRecordBatch

classFilterNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node which excludes some rows from batches passed through it

filter_expression will be evaluated against each batch which is pushed to this node. Any rows for which filter_expression does not evaluate totrue will be excluded in the batch emitted by this node.

This node will emit empty batches if all rows are excluded. This is done to avoid gaps in the ordering.

Public Functions

inlineexplicitFilterNodeOptions(Expressionfilter_expression)#

create an instance from values

Public Members

Expressionfilter_expression#

the expression to filter batches

The return type of this expression must be boolean

classFetchNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node which selects a specified subset from the input

Public Functions

inlineFetchNodeOptions(int64_toffset,int64_tcount)#

create an instance from values

Public Members

int64_toffset#

the number of rows to skip

int64_tcount#

the number of rows to keep (not counting skipped rows)

Public Static Attributes

staticconstexprstd::string_viewkName="fetch"#
classProjectNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node which executes expressions on input batches, producing batches of the same length with new columns.

Each expression will be evaluated against each batch which is pushed to this node to produce a corresponding output column.

If names are not provided, the string representations of exprs will be used.

Public Functions

inlineexplicitProjectNodeOptions(std::vector<Expression>expressions,std::vector<std::string>names={})#

create an instance from values

Public Members

std::vector<Expression>expressions#

the expressions to run on the batches

The output will have one column for each expression. If you wish to keep any of the columns from the input then you should create a simple field_ref expression for that column.

std::vector<std::string>names#

the names of the output columns

If this is not specified then the result of calling ToString on the expression will be used instead

This list should either be empty or have the same length asexpressions

classAggregateNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node which aggregates input batches and calculates summary statistics

The node can summarize the entire input or it can group the input with grouping keys and segment keys.

By default, the aggregate node is a pipeline breaker. It must accumulate all input before any output is produced. Segment keys are a performance optimization. If you know your input is already partitioned by one or more columns then you can specify these as segment keys. At each change in the segment keys the node will emit values for all data seen so far.

Segment keys are currently limited to single-threaded mode.

Both keys and segment-keys determine the group. However segment-keys are also used for determining grouping segments, which should be large, and allow streaming a partial aggregation result after processing each segment. One common use-case for segment-keys is ordered aggregation, in which the segment-key attribute specifies a column with non-decreasing values or a lexicographically-ordered set of such columns.

If the keys attribute is a non-empty vector, then each aggregate inaggregates is expected to be a HashAggregate function. If the keys attribute is an empty vector, then each aggregate is assumed to be a ScalarAggregate function.

If the segment_keys attribute is a non-empty vector, then segmented aggregation, as described above, applies.

The keys and segment_keys vectors must be disjoint.

If no measures are provided then you will simply get the list of unique keys.

This node outputs segment keys first, followed by regular keys, followed by one column for each aggregate.

Public Functions

inlineexplicitAggregateNodeOptions(std::vector<Aggregate>aggregates,std::vector<FieldRef>keys={},std::vector<FieldRef>segment_keys={})#

create an instance from values

Public Members

std::vector<Aggregate>aggregates#
std::vector<FieldRef>keys#
std::vector<FieldRef>segment_keys#
classBackpressureMonitor#
#include <arrow/acero/options.h>

an interface that can be queried for backpressure statistics

Public Functions

virtual~BackpressureMonitor()=default#
virtualuint64_tbytes_in_use()=0#

fetches the number of bytes currently queued up

virtualboolis_paused()=0#

checks to see if backpressure is currently applied

structBackpressureOptions#
#include <arrow/acero/options.h>

Options to control backpressure behavior.

Public Functions

inlineBackpressureOptions()#

Create default options that perform no backpressure.

inlineBackpressureOptions(uint64_tresume_if_below,uint64_tpause_if_above)#

Create options that will perform backpressure.

Parameters:
  • resume_if_below – The producer should resume producing if the backpressure queue has fewer than resume_if_below items.

  • pause_if_above – The producer should pause producing if the backpressure queue has more than pause_if_above items

inlineboolshould_apply_backpressure()const#

helper method to determine if backpressure is disabled

Returns:

true if pause_if_above is greater than zero, false otherwise

Public Members

uint64_tresume_if_below#

the number of bytes at which the producer should resume producing

uint64_tpause_if_above#

the number of bytes at which the producer should pause producing

If this is <= 0 then backpressure will be disabled

Public Static Functions

staticinlineBackpressureOptionsDefaultBackpressure()#

create an instance using default values for backpressure limits

classSinkNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a sink node which collects results in a queue

Emitted batches will only be ordered if there is a meaningful ordering and sequence_output is not set to false.

Subclassed byarrow::acero::OrderBySinkNodeOptions,arrow::acero::SelectKSinkNodeOptions

Public Functions

inlineexplicitSinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>*generator,std::shared_ptr<Schema>*schema,BackpressureOptionsbackpressure={},BackpressureMonitor**backpressure_monitor=NULLPTR,std::optional<bool>sequence_output=std::nullopt)#
inlineexplicitSinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>*generator,BackpressureOptionsbackpressure={},BackpressureMonitor**backpressure_monitor=NULLPTR,std::optional<bool>sequence_output=std::nullopt)#

Public Members

std::function<Future<std::optional<ExecBatch>>()>*generator#

A pointer to a generator of batches.

This will be set when the node is added to the plan and should be used to consume data from the plan. If this function is not called frequently enough then the sink node will start to accumulate data and may apply backpressure.

std::shared_ptr<Schema>*schema#

A pointer which will be set to the schema of the generated batches.

This is optional, if nullptr is passed in then it will be ignored. This will be set when the node is added to the plan, before StartProducing is called

BackpressureOptionsbackpressure#

Options to control when to apply backpressure.

This is optional, the default is to never apply backpressure. If the plan is not consumed quickly enough the system may eventually run out of memory.

BackpressureMonitor**backpressure_monitor#

A pointer to a backpressure monitor.

This will be set when the node is added to the plan. This can be used to inspect the amount of data currently queued in the sink node. This is an optional utility and backpressure can be applied even if this is not used.

std::optional<bool>sequence_output#

Controls whether batches should be emitted immediately or sequenced in order.

See also

QueryOptions for more details

classBackpressureControl#
#include <arrow/acero/options.h>

Control used by aSinkNodeConsumer to pause & resume.

Callers should ensure that they do not call Pause and Resume simultaneously and they should sequence things so that a call toPause() is always followed by an eventual call toResume()

Public Functions

virtual~BackpressureControl()=default#
virtualvoidPause()=0#

Ask the input to pause.

This is best effort, batches may continue to arrive Must eventually be followed by a call toResume() or deadlock will occur

virtualvoidResume()=0#

Ask the input to resume.

classSinkNodeConsumer#
#include <arrow/acero/options.h>

a sink node that consumes the data as part of the plan using callbacks

Subclassed by arrow::acero::NullSinkNodeConsumer, arrow::acero::TableSinkNodeConsumer

Public Functions

virtual~SinkNodeConsumer()=default#
virtualStatusInit(conststd::shared_ptr<Schema>&schema,BackpressureControl*backpressure_control,ExecPlan*plan)=0#

Prepare any consumer state.

This will be run once the schema is finalized as the plan is starting and before any calls to Consume. A common use is to save off the schema so that batches can be interpreted.

virtualStatusConsume(ExecBatchbatch)=0#

Consume a batch of data.

virtualFutureFinish()=0#

Signal to the consumer that the last batch has been delivered.

The returned future should only finish when all outstanding tasks have completed

If the plan is ended early or aborts due to an error then this will not be called.

classConsumingSinkNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

Add a sink node which consumes data within the exec plan run.

Public Functions

inlineexplicitConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer>consumer,std::vector<std::string>names={},std::optional<bool>sequence_output=std::nullopt)#

Public Members

std::shared_ptr<SinkNodeConsumer>consumer#
std::vector<std::string>names#

Names to rename the sink’s schema fields to.

If specified then names must be provided for all fields. Currently, only a flat schema is supported (see GH-31875).

If not specified then names will be generated based on the source data.

std::optional<bool>sequence_output#

Controls whether batches should be emitted immediately or sequenced in order.

See also

QueryOptions for more details

classOrderBySinkNodeOptions:publicarrow::acero::SinkNodeOptions#
#include <arrow/acero/options.h>

Make a node which sorts rows passed through it.

All batches pushed to this node will be accumulated, then sorted, by the given fields. Then sorted batches will be forwarded to the generator in sorted order.

Public Functions

inlineexplicitOrderBySinkNodeOptions(SortOptionssort_options,std::function<Future<std::optional<ExecBatch>>()>*generator)#

create an instance from values

Public Members

SortOptionssort_options#

options describing which columns and direction to sort

classOrderByNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

Apply a new ordering to data.

Currently this node works by accumulating all data, sorting, and then emitting the new data with an updated batch index.

Larger-than-memory sort is not currently supported.

Public Functions

inlineexplicitOrderByNodeOptions(Orderingordering)#

Public Members

Orderingordering#

The new ordering to apply to outgoing data.

Public Static Attributes

staticconstexprstd::string_viewkName="order_by"#
classHashJoinNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node which implements a join operation using a hash table

Public Functions

inlineHashJoinNodeOptions(JoinTypein_join_type,std::vector<FieldRef>in_left_keys,std::vector<FieldRef>in_right_keys,Expressionfilter=literal(true),std::stringoutput_suffix_for_left=default_output_suffix_for_left,std::stringoutput_suffix_for_right=default_output_suffix_for_right,booldisable_bloom_filter=false)#

create an instance from values that outputs all columns

inlineHashJoinNodeOptions(std::vector<FieldRef>in_left_keys,std::vector<FieldRef>in_right_keys)#

create an instance from keys

This will create an inner join that outputs all columns and has no post join filter

in_left_keys should have the same length and types asin_right_keys

Parameters:
  • in_left_keys – the keys in the left input

  • in_right_keys – the keys in the right input

inlineHashJoinNodeOptions(JoinTypejoin_type,std::vector<FieldRef>left_keys,std::vector<FieldRef>right_keys,std::vector<FieldRef>left_output,std::vector<FieldRef>right_output,Expressionfilter=literal(true),std::stringoutput_suffix_for_left=default_output_suffix_for_left,std::stringoutput_suffix_for_right=default_output_suffix_for_right,booldisable_bloom_filter=false)#

create an instance from values usingJoinKeyCmp::EQ for all comparisons

inlineHashJoinNodeOptions(JoinTypejoin_type,std::vector<FieldRef>left_keys,std::vector<FieldRef>right_keys,std::vector<FieldRef>left_output,std::vector<FieldRef>right_output,std::vector<JoinKeyCmp>key_cmp,Expressionfilter=literal(true),std::stringoutput_suffix_for_left=default_output_suffix_for_left,std::stringoutput_suffix_for_right=default_output_suffix_for_right,booldisable_bloom_filter=false)#

create an instance from values

HashJoinNodeOptions()=default#

Public Members

JoinTypejoin_type=JoinType::INNER#
std::vector<FieldRef>left_keys#
std::vector<FieldRef>right_keys#
booloutput_all=false#
std::vector<FieldRef>left_output#
std::vector<FieldRef>right_output#
std::vector<JoinKeyCmp>key_cmp#
std::stringoutput_suffix_for_left#
std::stringoutput_suffix_for_right#
Expressionfilter=literal(true)#
booldisable_bloom_filter=false#

Public Static Attributes

staticconstexprconstchar*default_output_suffix_for_left=""#
staticconstexprconstchar*default_output_suffix_for_right=""#
classAsofJoinNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a node which implements the asof join operation

Note, this API is experimental and will change in the future

This node takes one left table and any number of right tables, and asof joins them together. Batches produced by each input must be ordered by the “on” key. This node will output one row for each row in the left table.

Public Functions

inlineAsofJoinNodeOptions(std::vector<Keys>input_keys,int64_ttolerance)#

Public Members

std::vector<Keys>input_keys#

AsofJoin keys per input table.

At least two keys must be given. The first key corresponds to a left table and all other keys correspond to right tables for the as-of-join.

See also

Keys for details.

int64_ttolerance#

Tolerance for inexact “on” key matching.

A right row is considered a match with a left row ifright.on-left.on is in the range[min(0,tolerance),max(0,tolerance)].tolerance may be:

  • negative, in which case a past-as-of-join occurs (match ifftolerance<=right.on-left.on<=0);

  • or positive, in which case a future-as-of-join occurs (match iff0<=right.on-left.on<=tolerance);

  • or zero, in which case an exact-as-of-join occurs (match iffright.on==left.on).

The tolerance is interpreted in the same units as the “on” key.

structKeys#
#include <arrow/acero/options.h>

Keys for one input table of the AsofJoin operation.

The keys must be consistent across the input tables: Each “on” key must refer to a field of the same type and units across the tables. Each “by” key must refer to a list of fields of the same types across the tables.

Public Members

FieldRefon_key#

“on” key for the join.

The input table must be sorted by the “on” key. Must be a single field of a common type. An inexact match is used on the “on” key, i.e. a row is considered a match if and only ifright.on-left.on is in the range[min(0,tolerance),max(0,tolerance)]. Currently, the “on” key must be of an integer, date, or timestamp type.

std::vector<FieldRef>by_key#

“by” key for the join.

Each input table must have each field of the “by” key. Exact equality is used for each field of the “by” key. Currently, each field of the “by” key must be of an integer, date, timestamp, or base-binary type.

classSelectKSinkNodeOptions:publicarrow::acero::SinkNodeOptions#
#include <arrow/acero/options.h>

a node which select top_k/bottom_k rows passed through it

All batches pushed to this node will be accumulated, then selected, by the given fields. Then sorted batches will be forwarded to the generator in sorted order.

Public Functions

inlineexplicitSelectKSinkNodeOptions(SelectKOptionsselect_k_options,std::function<Future<std::optional<ExecBatch>>()>*generator)#

Public Members

SelectKOptionsselect_k_options#

SelectK options.

classTableSinkNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

a sink node which accumulates all output into a table

Public Functions

inlineexplicitTableSinkNodeOptions(std::shared_ptr<Table>*output_table,std::optional<bool>sequence_output=std::nullopt)#

create an instance from values

Public Members

std::shared_ptr<Table>*output_table#

an “out parameter” specifying the table that will be created

Must not be null and remain valid for the entirety of the plan execution. After the plan has completed this will be set to point to the result table

std::optional<bool>sequence_output#

Controls whether batches should be emitted immediately or sequenced in order.

See also

QueryOptions for more details

std::vector<std::string>names#

Custom names to use for the columns.

If specified then names must be provided for all fields. Currently, only a flat schema is supported (see GH-31875).

If not specified then names will be generated based on the source data.

structPivotLongerRowTemplate#
#include <arrow/acero/options.h>

a row template that describes one row that will be generated for each input row

Public Functions

inlinePivotLongerRowTemplate(std::vector<std::string>feature_values,std::vector<std::optional<FieldRef>>measurement_values)#

Public Members

std::vector<std::string>feature_values#

A (typically unique) set of feature values for the template, usually derived from a column name.

These will be used to populate the feature columns

std::vector<std::optional<FieldRef>>measurement_values#

The fields containing the measurements to use for this row.

These will be used to populate the measurement columns. If nullopt then nulls will be inserted for the given value.

classPivotLongerNodeOptions:publicarrow::acero::ExecNodeOptions#
#include <arrow/acero/options.h>

Reshape a table by turning some columns into additional rows.

This operation is sometimes also referred to as UNPIVOT

This is typically done when there are multiple observations in each row in order to transform to a table containing a single observation per row.

For example:

time

left_temp

right_temp

1

10

20

2

15

18

The above table contains two observations per row. There is an implicit feature “location” (left vs right) and a measurement “temp”. What we really want is:

time

location

temp

1

left

10

1

right

20

2

left

15

2

right

18

For a more complex example consider:

time

ax1

ay1

bx1

ay2

0

1

2

3

4

We can pretend a vs b and x vs y are features while 1 and 2 are two different kinds of measurements. We thus want to pivot to

time

a/b

x/y

f1

f2

0

a

x

1

null

0

a

y

2

4

0

b

x

3

null

To do this we create a row template for each combination of features. One should be able to do this purely by looking at the column names. For example, given the above columns “ax1”, “ay1”, “bx1”, and “ay2” we know we have three feature combinations (a, x), (a, y), and (b, x). Similarly, we know we have two possible measurements, “1” and “2”.

For each combination of features we create a row template. In each row template we describe the combination and then list which columns to use for the measurements. If a measurement doesn’t exist for a given combination then we use nullopt.

So, for our above example, we have:

(a, x): names={“a”, “x”}, values={“ax1”, nullopt} (a, y): names={“a”, “y”}, values={“ay1”, “ay2”} (b, x): names={“b”, “x”}, values={“bx1”, nullopt}

Finishing it off we name our new columns: feature_field_names={“a/b”,”x/y”} measurement_field_names={“f1”, “f2”}

Public Members

std::vector<PivotLongerRowTemplate>row_templates#

One or more row templates to create new output rows.

Normally there are at least two row templates. The output # of rows will be the input # of rows * the number of row templates

std::vector<std::string>feature_field_names#

The names of the columns which describe the new features.

std::vector<std::string>measurement_field_names#

The names of the columns which represent the measurements.

Public Static Attributes

staticconstexprstd::string_viewkName="pivot_longer"#

Internals for creating custom nodes#

ExecFactoryRegistry*default_exec_factory_registry()#

The default registry, which includes built-in factories.

inlineResult<ExecNode*>MakeExecNode(conststd::string&factory_name,ExecPlan*plan,std::vector<ExecNode*>inputs,constExecNodeOptions&options,ExecFactoryRegistry*registry=default_exec_factory_registry())#

Construct anExecNode using the named factory.

inlinebooloperator==(constExecBatch&l,constExecBatch&r)#
inlinebooloperator!=(constExecBatch&l,constExecBatch&r)#
voidPrintTo(constExecBatch&,std::ostream*)#
classExecPlan:publicstd::enable_shared_from_this<ExecPlan>#
#include <arrow/acero/exec_plan.h>

Public Types

usingNodeVector=std::vector<ExecNode*>#

Public Functions

virtual~ExecPlan()=default#
QueryContext*query_context()#
constNodeVector&nodes()const#

retrieve the nodes in the plan

ExecNode*AddNode(std::unique_ptr<ExecNode>node)#
template<typenameNode,typename...Args>
inlineNode*EmplaceNode(Args&&...args)#
StatusValidate()#
voidStartProducing()#

Start producing on all nodes.

Nodes are started in reverse topological order, such that any node is started before all of its inputs.

voidStopProducing()#

Stop producing on all nodes.

Triggers all sources to stop producing new data. In order to cleanly stop the plan will continue to run any tasks that are already in progress. The caller should still wait forfinished to complete before destroying the plan.

Futurefinished()#

A future which will be marked finished when all tasks have finished.

boolHasMetadata()const#

Return whether the plan has non-empty metadata.

std::shared_ptr<constKeyValueMetadata>metadata()const#

Return the plan’s attached metadata.

std::stringToString()const#

Public Static Functions

staticResult<std::shared_ptr<ExecPlan>>Make(QueryOptionsoptions,ExecContextexec_context=*threaded_exec_context(),std::shared_ptr<constKeyValueMetadata>metadata=NULLPTR)#

Make an empty exec plan.

staticResult<std::shared_ptr<ExecPlan>>Make(ExecContextexec_context=*threaded_exec_context(),std::shared_ptr<constKeyValueMetadata>metadata=NULLPTR)#
staticResult<std::shared_ptr<ExecPlan>>Make(QueryOptionsoptions,ExecContext*exec_context,std::shared_ptr<constKeyValueMetadata>metadata=NULLPTR)#
staticResult<std::shared_ptr<ExecPlan>>Make(ExecContext*exec_context,std::shared_ptr<constKeyValueMetadata>metadata=NULLPTR)#

Public Static Attributes

staticconstuint32_tkMaxBatchSize=1<<15#
classExecNode#
#include <arrow/acero/exec_plan.h>

Subclassed by arrow::acero::MapNode

Public Types

usingNodeVector=std::vector<ExecNode*>#

Public Functions

virtual~ExecNode()=default#
virtualconstchar*kind_name()const=0#
inlineintnum_inputs()const#
inlineconstNodeVector&inputs()const#

This node’s predecessors in the exec plan.

inlineboolis_sink()const#

True if the plan has no output schema (is a sink)

inlineconststd::vector<std::string>&input_labels()const#

Labels identifying the function of each input.

inlineconstExecNode*output()const#

This node’s successor in the exec plan.

inlineconststd::shared_ptr<Schema>&output_schema()const#

The datatypes for batches produced by this node.

inlineExecPlan*plan()#

This node’s exec plan.

inlineconststd::string&label()const#

An optional label, for display and debugging.

There is no guarantee that this value is non-empty or unique.

inlinevoidSetLabel(std::stringlabel)#
virtualStatusValidate()const#
virtualconstOrdering&ordering()const#

the ordering of the output batches

This does not guarantee the batches will be emitted by this node in order. Instead it guarantees that the batches will have theirExecBatch::index property set in a way that respects this ordering.

In other words, given the ordering {{“x”, SortOrder::Ascending}} we know that all values of x in a batch with index N will be less than or equal to all values of x in a batch with index N+k (assuming k > 0). Furthermore, we also know that values will be sorted within a batch. Any row N will have a value of x that is less than the value for any row N+k.

Note that an ordering can be both Ordering::Unordered and Ordering::Implicit. A node’s output should be marked Ordering::Unordered if the order is non-deterministic. For example, a hash-join has no predictable output order.

If the ordering is Ordering::Implicit

then there is a meaningful order but that ordering is not represented by any column in the data. The most common case for this is when reading data from an in-memory table. The data has an implicit “row

order” which is not necessarily represented in the data set.

A filter or project node will not modify the ordering. Nothing needs to be done other than ensure the index assigned to output batches is the same as the input batch that was mapped.

Other nodes may introduce order. For example, an order-by node will emit a brand new ordering independent of the input ordering.

Finally, as described above, such as a hash-join or aggregation may may destroy ordering (although these nodes could also choose to establish a new ordering based on the hash keys).

Some nodes will require an ordering. For example, a fetch node or an asof join node will only function if the input data is ordered (for fetch it is enough to be implicitly ordered. For an asof join the ordering must be explicit and compatible with the on key.)

Nodes that maintain ordering should be careful to avoid introducing gaps in the batch index. This may require emitting empty batches in order to maintain continuity.

virtualStatusInputReceived(ExecNode*input,ExecBatchbatch)=0#

Upstream API: These functions are called by input nodes that want to inform this node about an updated condition (a new input batch or an impending end of stream).

Implementation rules:

A node will typically perform some kind of operation on the batch and then call InputReceived on its outputs with the result.

Other nodes may need to accumulate some number of inputs before any output can be produced. These nodes will add the batch to some kind of in-memory accumulation queue and return.

virtualStatusInputFinished(ExecNode*input,inttotal_batches)=0#

Mark the inputs finished after the given number of batches.

This may be called before all inputs are received. This simply fixes the total number of incoming batches for an input, so that theExecNode knows when it has received all input, regardless of order.

virtualStatusInit()#

Perform any needed initialization.

This hook performs any actions in between creation ofExecPlan and the call to StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes that executes this method is undefined, but the calls are made synchronously.

At this point a node can rely on all inputs & outputs (and the input schemas) being well defined.

virtualStatusStartProducing()=0#

Lifecycle API:

  • start / stop to initiate and terminate production

  • pause / resume to apply backpressure

Implementation rules:

StopProducing may be called due to an error, by the user (e.g. cancel), or because a node has all the data it needs (e.g. limit, top-k on sorted data). This means the method may be called multiple times and we have the following additional rules

  • StopProducing() must be idempotent

  • StopProducing() must be forwarded to inputs (this is needed for the limit/top-k case because we may not be stopping the entire plan)

Start producing

This must only be called once.

This is typically called automatically byExecPlan::StartProducing().

virtualvoidPauseProducing(ExecNode*output,int32_tcounter)=0#

Pause producing temporarily.

This call is a hint that an output node is currently not willing to receive data.

This may be called any number of times. However, the node is still free to produce data (which may be difficult to prevent anyway if data is produced using multiple threads).

Parameters:
  • output – Pointer to the output that is full

  • counter – Counter used to sequence calls to pause/resume

virtualvoidResumeProducing(ExecNode*output,int32_tcounter)=0#

Resume producing after a temporary pause.

This call is a hint that an output node is willing to receive data again.

This may be called any number of times.

Parameters:
  • output – Pointer to the output that is now free

  • counter – Counter used to sequence calls to pause/resume

virtualStatusStopProducing()#

Stop producing new data.

If this node is a source then the source should stop generating data as quickly as possible. If this node is not a source then there is typically nothing that needs to be done although a node may choose to start ignoring incoming data.

This method will be called when an error occurs in the plan This method may also be called by the user if they wish to end a plan early Finally, this method may be called if a node determines it no longer needs any more input (for example, a limit node).

This method may be called multiple times.

This is not a pause. There will be no way to start the source again after this has been called.

std::stringToString(intindent=0)const#
classExecFactoryRegistry#
#include <arrow/acero/exec_plan.h>

An extensible registry for factories of ExecNodes.

Public Types

usingFactory=std::function<Result<ExecNode*>(ExecPlan*,std::vector<ExecNode*>,constExecNodeOptions&)>#

Public Functions

virtual~ExecFactoryRegistry()=default#
virtualResult<Factory>GetFactory(conststd::string&factory_name)=0#

Get the named factory from this registry.

will raise if factory_name is not found

virtualStatusAddFactory(std::stringfactory_name,Factoryfactory)=0#

Add a factory to this registry with the provided name.

will raise if factory_name is already in the registry

structExecBatch#
#include <arrow/compute/exec.h>

Public Functions

ExecBatch()=default#
inlineExecBatch(std::vector<Datum>values,int64_tlength)#
explicitExecBatch(constRecordBatch&batch)#
Result<std::shared_ptr<RecordBatch>>ToRecordBatch(std::shared_ptr<Schema>schema,MemoryPool*pool=default_memory_pool())const#
int64_tTotalBufferSize()const#

The sum of bytes in each buffer referenced by the batch.

Note: Scalars are not counted Note: Some values may referenced only part of a buffer, for example, an array with an offset. The actual data visible to this batch will be smaller than the total buffer size in this case.

template<typenameindex_type>
inlineconstDatum&operator[](index_typei)const#

Return the value at the i-th index.

boolEquals(constExecBatch&other)const#
inlineintnum_values()const#

A convenience for the number of values / arguments.

ExecBatchSlice(int64_toffset,int64_tlength)const#
Result<ExecBatch>SelectValues(conststd::vector<int>&ids)const#
inlinestd::vector<TypeHolder>GetTypes()const#

A convenience for returning the types from the batch.

std::stringToString()const#

Public Members

std::vector<Datum>values#

The values representing positional arguments to be passed to a kernel’s exec function for processing.

std::shared_ptr<SelectionVector>selection_vector#

A deferred filter represented as an array of indices into the values.

For example, the filter [true, true, false, true] would be represented as the selection vector [0, 1, 3]. When the selection vector is set,ExecBatch::length is equal to the length of this array.

Expressionguarantee=literal(true)#

A predicateExpression guaranteed to evaluate to true for all rows in this batch.

int64_tlength=0#

The semantic length of theExecBatch.

When the values are all scalars, the length should be set to 1 for non-aggregate kernels, otherwise the length is taken from the array values, except when there is a selection vector. When there is a selection vector set, the length of the batch is the length of the selection.Aggregate kernels can have anExecBatch formed by projecting just the partition columns from a batch in which case, it would have scalar rows with length greater than 1.

If the array values are of length 0 then the length is 0 regardless of whether any values areScalar.

int64_tindex=kUnsequencedIndex#

index of this batch in a sorted stream of batches

This index must be strictly monotonic starting at 0 without gaps or it can be set to kUnsequencedIndex if there is no meaningful order

Public Static Functions

staticResult<int64_t>InferLength(conststd::vector<Datum>&values)#

Infer theExecBatch length from values.

staticResult<ExecBatch>Make(std::vector<Datum>values,int64_tlength=-1)#

Creates anExecBatch with length-validation.

If any value is given, then all values must have a common length. If the given length is negative, then the length of theExecBatch is set to this common length, or to 1 if no values are given. Otherwise, the given length must equal the common length, if any value is given.

On this page