Arrow IPC#
IPC options#
- structIpcReadOptions#
Options for reading Arrow IPC messages.
Public Members
- intmax_recursion_depth=kMaxNestingDepth#
The maximum permitted schema nesting depth.
- MemoryPool*memory_pool=default_memory_pool()#
The memory pool to use for allocations made during IPC reading.
While Arrow IPC is predominantly zero-copy, it may have to allocate memory in some cases (for example if compression is enabled).
- std::vector<int>included_fields#
Top-level schema fields to include when deserializingRecordBatch.
If empty (the default), return all deserialized fields. If non-empty, the values are the indices of fields in the top-level schema.
- booluse_threads=true#
Use global CPU thread pool to parallelize any computational tasks like decompression.
- boolensure_native_endian=true#
Whether to convert incoming data to platform-native endianness.
If the endianness of the received schema is not equal to platform-native endianness, then all buffers with endian-sensitive data will be byte-swapped. This includes the value buffers of numeric types, temporal types, decimal types, as well as the offset buffers of variable-sized binary and list-like types.
Endianness conversion is achieved by theRecordBatchFileReader,RecordBatchStreamReader andStreamDecoder classes.
- Alignmentensure_alignment=Alignment::kAnyAlignment#
How to align data if mis-aligned.
Data is copied to aligned memory locations allocated via theMemoryPool configured asarrow::ipc::IpcReadOptions::memory_pool. Some use cases might require data to have a specific alignment, for example, for the data buffer of an Int32 array to be aligned on a 4-byte boundary.
Default (kAnyAlignment) keeps the alignment as is, so no copy of data occurs.
- io::CacheOptionspre_buffer_cache_options=io::CacheOptions::LazyDefaults()#
Options to control caching behavior when pre-buffering is requested.
The lazy property will always be reset to true to deliver the expected behavior
- intmax_recursion_depth=kMaxNestingDepth#
- structIpcWriteOptions#
Options for writing Arrow IPC messages.
Public Members
- boolallow_64bit=false#
If true, allow field lengths that don’t fit in a signed 32-bit int.
Some implementations may not be able to parse streams created with this option.
- intmax_recursion_depth=kMaxNestingDepth#
The maximum permitted schema nesting depth.
- int32_talignment=8#
Write padding after memory buffers up to this multiple of bytes.
- boolwrite_legacy_ipc_format=false#
Write the pre-0.15.0 IPC message format.
This legacy format consists of a 4-byte prefix instead of 8-byte.
- MemoryPool*memory_pool=default_memory_pool()#
The memory pool to use for allocations made during IPC writing.
While Arrow IPC is predominantly zero-copy, it may have to allocate memory in some cases (for example if compression is enabled).
- std::shared_ptr<util::Codec>codec#
Compression codec to use for record batch body buffers.
May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.
- std::optional<double>min_space_savings#
Minimum space savings percentage required for compression to be applied.
Space savings is calculated as (1.0 - compressed_size / uncompressed_size).
For example, if min_space_savings = 0.1, a 100-byte body buffer won’t undergo compression if its expected compressed size exceeds 90 bytes. If this option is unset, compression will be used indiscriminately. If no codec was supplied, this option is ignored.
Values outside of the range [0,1] are handled as errors.
Note that enabling this option may result in unreadable data for Arrow C++ versions prior to 12.0.0.
- booluse_threads=true#
Use global CPU thread pool to parallelize any computational tasks like compression.
- boolemit_dictionary_deltas=false#
Whether to emit dictionary deltas.
If false, a changed dictionary for a given field will emit a full dictionary replacement. If true, a changed dictionary will be compared against the previous version. If possible, a dictionary delta will be emitted, otherwise a full dictionary replacement.
Default is false to maximize stream compatibility.
Also, note that if a changed dictionary is a nested dictionary, then a delta is never emitted, for compatibility with the read path.
- boolunify_dictionaries=false#
Whether to unify dictionaries for the IPC file format.
The IPC file format doesn’t support dictionary replacements. Therefore, chunks of a column with a dictionary type must have the same dictionary in each record batch (or an extended dictionary + delta).
If this option is true,RecordBatchWriter::WriteTable will attempt to unify dictionaries across each table column. If this option is false, incompatible dictionaries across a table column will simply raise an error.
Note that enabling this option has a runtime cost. Also, not all types currently support dictionary unification.
This option is ignored for IPC streams, which support dictionary replacement and deltas.
- MetadataVersionmetadata_version=MetadataVersion::V5#
Format version to use for IPC messages and their metadata.
Presently using V5 version (readable by 1.0.0 and later). V4 is also available (readable by 0.8.0 and later).
- boolallow_64bit=false#
Reading IPC streams and files#
Blocking API#
Use either of these two classes, depending on which IPC format you wantto read. The file format requires a random-access file, while the streamformat only requires a sequential input stream.
- classRecordBatchStreamReader:publicarrow::RecordBatchReader#
Synchronous batch stream reader that reads fromio::InputStream.
This class reads the schema (plus any dictionaries) as the first messages in the stream, followed by record batches. For more granular zero-copy reads see the ReadRecordBatch functions
Public Static Functions
- staticResult<std::shared_ptr<RecordBatchStreamReader>>Open(std::unique_ptr<MessageReader>message_reader,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Create batch reader from generic MessageReader.
This will take ownership of the given MessageReader.
- Parameters:
message_reader –[in] a MessageReader implementation
options –[in] any IPC reading options (optional)
- Returns:
the created batch reader
- staticResult<std::shared_ptr<RecordBatchStreamReader>>Open(io::InputStream*stream,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Record batch stream reader from InputStream.
- Parameters:
stream –[in] an input stream instance. Must stay alive throughout lifetime of stream reader
options –[in] any IPC reading options (optional)
- Returns:
the created batch reader
- staticResult<std::shared_ptr<RecordBatchStreamReader>>Open(conststd::shared_ptr<io::InputStream>&stream,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open stream and retain ownership of stream object.
- Parameters:
stream –[in] the input stream
options –[in] any IPC reading options (optional)
- Returns:
the created batch reader
- staticResult<std::shared_ptr<RecordBatchStreamReader>>Open(std::unique_ptr<MessageReader>message_reader,constIpcReadOptions&options=IpcReadOptions::Defaults())#
- classRecordBatchFileReader:publicstd::enable_shared_from_this<RecordBatchFileReader>#
Reads the record batch file format.
Public Functions
- virtualintnum_record_batches()const=0#
Returns the number of record batches in the file.
- virtualMetadataVersionversion()const=0#
Return the metadata version from the file metadata.
- virtualstd::shared_ptr<constKeyValueMetadata>metadata()const=0#
Return the contents of the custom_metadata field from the file’s Footer.
- virtualResult<std::shared_ptr<RecordBatch>>ReadRecordBatch(inti)=0#
Read a particular record batch from the file.
Does not copy memory if the input source supports zero-copy.
- Parameters:
i –[in] the index of the record batch to return
- Returns:
the read batch
- virtualResult<RecordBatchWithMetadata>ReadRecordBatchWithCustomMetadata(inti)=0#
Read a particular record batch along with its custom metadata from the file.
Does not copy memory if the input source supports zero-copy.
- Parameters:
i –[in] the index of the record batch to return
- Returns:
a struct containing the read batch and its custom metadata
- virtualStatusPreBufferMetadata(conststd::vector<int>&indices)=0#
Begin loading metadata for the desired batches into memory.
This method will also begin loading all dictionaries messages into memory.
For a regular file this will immediately begin disk I/O in the background on a thread on the IOContext’s thread pool. If the file is memory mapped this will ensure the memory needed for the metadata is paged from disk into memory
- Parameters:
indices – Indices of the batches to prefetch If empty then all batches will be prefetched.
- virtualResult<AsyncGenerator<std::shared_ptr<RecordBatch>>>GetRecordBatchGenerator(constboolcoalesce=false,constio::IOContext&io_context=io::default_io_context(),constio::CacheOptionscache_options=io::CacheOptions::LazyDefaults(),arrow::internal::Executor*executor=NULLPTR)=0#
Get a reentrant generator of record batches.
- Parameters:
coalesce –[in] If true, enable I/O coalescing.
io_context –[in] The IOContext to use (controls which thread pool is used for I/O).
cache_options –[in] Options for coalescing (if enabled).
executor –[in] Optionally, an executor to use for decoding record batches. This is generally only a benefit for very wide and/or compressed batches.
- Result<std::shared_ptr<Table>>ToTable()#
Collect all batches and concatenate asarrow::Table.
Public Static Functions
- staticResult<std::shared_ptr<RecordBatchFileReader>>Open(io::RandomAccessFile*file,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open aRecordBatchFileReader.
Open a file-like object that is assumed to be self-contained; i.e., the end of the file interface is the end of the Arrow file. Note that there can be any amount of data preceding the Arrow-formatted data, because we need only locate the end of the Arrow file stream to discover the metadata and then proceed to read the data into memory.
- staticResult<std::shared_ptr<RecordBatchFileReader>>Open(io::RandomAccessFile*file,int64_tfooter_offset,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open aRecordBatchFileReader If the file is embedded within some larger file or memory region, you can pass the absolute memory offset to the end of the file (which contains the metadata footer).
The metadata must have been written with memory offsets relative to the start of the containing file
- Parameters:
file –[in] the data source
footer_offset –[in] the position of the end of the Arrow file
options –[in] options for IPC reading
- Returns:
the returned reader
- staticResult<std::shared_ptr<RecordBatchFileReader>>Open(conststd::shared_ptr<io::RandomAccessFile>&file,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Version of Open that retains ownership of file.
- Parameters:
file –[in] the data source
options –[in] options for IPC reading
- Returns:
the returned reader
- staticResult<std::shared_ptr<RecordBatchFileReader>>Open(conststd::shared_ptr<io::RandomAccessFile>&file,int64_tfooter_offset,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Version of Open that retains ownership of file.
- Parameters:
file –[in] the data source
footer_offset –[in] the position of the end of the Arrow file
options –[in] options for IPC reading
- Returns:
the returned reader
- staticFuture<std::shared_ptr<RecordBatchFileReader>>OpenAsync(conststd::shared_ptr<io::RandomAccessFile>&file,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open a file asynchronously (owns the file).
- staticFuture<std::shared_ptr<RecordBatchFileReader>>OpenAsync(io::RandomAccessFile*file,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open a file asynchronously (borrows the file).
- staticFuture<std::shared_ptr<RecordBatchFileReader>>OpenAsync(conststd::shared_ptr<io::RandomAccessFile>&file,int64_tfooter_offset,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open a file asynchronously (owns the file).
- staticFuture<std::shared_ptr<RecordBatchFileReader>>OpenAsync(io::RandomAccessFile*file,int64_tfooter_offset,constIpcReadOptions&options=IpcReadOptions::Defaults())#
Open a file asynchronously (borrows the file).
- virtualintnum_record_batches()const=0#
Event-driven API#
To read an IPC stream in event-driven fashion, you must implement aListener subclass that you will pass toStreamDecoder.
- classListener#
A general listener class to receive events.
You must implement callback methods for interested events.
This API is EXPERIMENTAL.
- Since
0.17.0
Subclassed by arrow::ipc::CollectListener
Public Functions
- virtualStatusOnEOS()#
Called when end-of-stream is received.
The default implementation just returnsarrow::Status::OK().
See also
- Returns:
- virtualStatusOnRecordBatchDecoded(std::shared_ptr<RecordBatch>record_batch)#
Called when a record batch is decoded andOnRecordBatchWithMetadataDecoded() isn’t overridden.
The default implementation just returnsarrow::Status::NotImplemented().
See also
- Parameters:
record_batch –[in] a record batch decoded
- Returns:
- virtualStatusOnRecordBatchWithMetadataDecoded(RecordBatchWithMetadatarecord_batch_with_metadata)#
Called when a record batch with custom metadata is decoded.
The default implementation just callsOnRecordBatchDecoded() without custom metadata.
See also
- Since
13.0.0
- Parameters:
record_batch_with_metadata –[in] a record batch with custom metadata decoded
- Returns:
- virtualStatusOnSchemaDecoded(std::shared_ptr<Schema>schema)#
Called when a schema is decoded.
The default implementation just returnsarrow::Status::OK().
See also
- Parameters:
schema –[in] a schema decoded
- Returns:
- virtualStatusOnSchemaDecoded(std::shared_ptr<Schema>schema,std::shared_ptr<Schema>filtered_schema)#
Called when a schema is decoded.
The default implementation just calls OnSchemaDecoded(schema) (without filtered_schema) to keep backward compatibility.
See also
- Since
13.0.0
- Parameters:
schema –[in] a schema decoded
filtered_schema –[in] a filtered schema that only has read fields
- Returns:
- classStreamDecoder#
Push style stream decoder that receives data from user.
This class decodes the Apache Arrow IPC streaming format data.
This API is EXPERIMENTAL.
- Since
0.17.0
Public Functions
- StreamDecoder(std::shared_ptr<Listener>listener,IpcReadOptionsoptions=IpcReadOptions::Defaults())#
Construct a stream decoder.
- Parameters:
listener –[in] aListener that must implementListener::OnRecordBatchDecoded() to receive decoded record batches
options –[in] any IPC reading options (optional)
- StatusConsume(constuint8_t*data,int64_tsize)#
Feed data to the decoder as a raw data.
If the decoder can read one or more record batches by the data, the decoder calls listener->OnRecordBatchDecoded() with a decoded record batch multiple times.
- Parameters:
data –[in] a raw data to be processed. This data isn’t copied. The passed memory must be kept alive through record batch processing.
size –[in] raw data size.
- Returns:
- StatusConsume(std::shared_ptr<Buffer>buffer)#
Feed data to the decoder as aBuffer.
If the decoder can read one or more record batches by theBuffer, the decoder calls listener->RecordBatchReceived() with a decoded record batch multiple times.
- StatusReset()#
Reset the internal status.
You can reuse this decoder for new stream after calling this.
- Returns:
- int64_tnext_required_size()const#
Return the number of bytes needed to advance the state of the decoder.
This method is provided for users who want to optimize performance. Normal users don’t need to use this method.
Here is an example usage for normal users:
decoder.Consume(buffer1);decoder.Consume(buffer2);decoder.Consume(buffer3);
Decoder has internal buffer. If consumed data isn’t enough to advance the state of the decoder, consumed data is buffered to the internal buffer. It causes performance overhead.
If you passnext_required_size() size data to eachConsume() call, the decoder doesn’t use its internal buffer. It improves performance.
Here is an example usage to avoid using internal buffer:
buffer1=get_data(decoder.next_required_size());decoder.Consume(buffer1);buffer2=get_data(decoder.next_required_size());decoder.Consume(buffer2);
Users can use this method to avoid creating small chunks. Record batch data must be contiguous data. If users pass small chunks to the decoder, the decoder needs concatenate small chunks internally. It causes performance overhead.
Here is an example usage to reduce small chunks:
buffer=AllocateResizableBuffer();while((small_chunk=get_data(&small_chunk_size))){autocurrent_buffer_size=buffer->size();buffer->Resize(current_buffer_size+small_chunk_size);memcpy(buffer->mutable_data()+current_buffer_size,small_chunk,small_chunk_size);if(buffer->size()<decoder.next_required_size()){continue;}std::shared_ptr<arrow::Buffer>chunk(buffer.release());decoder.Consume(chunk);buffer=AllocateResizableBuffer();}if(buffer->size()>0){std::shared_ptr<arrow::Buffer>chunk(buffer.release());decoder.Consume(chunk);}
- Returns:
the number of bytes needed to advance the state of the decoder
Statistics#
- structReadStats#
Public Members
- int64_tnum_messages=0#
Number of IPC messages read.
- int64_tnum_record_batches=0#
Number of record batches read.
- int64_tnum_dictionary_batches=0#
Number of dictionary batches read.
Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries
- int64_tnum_dictionary_deltas=0#
Number of dictionary deltas read.
- int64_tnum_replaced_dictionaries=0#
Number of replaced dictionaries (i.e.
where a dictionary batch replaces an existing dictionary with an unrelated new dictionary).
- int64_tnum_messages=0#
Writing IPC streams and files#
Blocking API#
The IPC stream format is only optionally terminated, whereas the IPC file formatmust include a terminating footer. Thus a writer of the IPC file format must beexplicitly finalized withClose() or the resultingfile will be corrupt.
- Result<std::shared_ptr<RecordBatchWriter>>MakeStreamWriter(io::OutputStream*sink,conststd::shared_ptr<Schema>&schema,constIpcWriteOptions&options=IpcWriteOptions::Defaults())#
Create a new IPC stream writer from stream sink and schema.
User is responsible for closing the actual OutputStream.
- Parameters:
sink –[in] output stream to write to
schema –[in] the schema of the record batches to be written
options –[in] options for serialization
- Returns:
Result<std::shared_ptr<RecordBatchWriter>>
- Result<std::shared_ptr<RecordBatchWriter>>MakeStreamWriter(std::shared_ptr<io::OutputStream>sink,conststd::shared_ptr<Schema>&schema,constIpcWriteOptions&options=IpcWriteOptions::Defaults())#
Create a new IPC stream writer from stream sink and schema.
User is responsible for closing the actual OutputStream.
- Parameters:
sink –[in] output stream to write to
schema –[in] the schema of the record batches to be written
options –[in] options for serialization
- Returns:
Result<std::shared_ptr<RecordBatchWriter>>
- Result<std::shared_ptr<RecordBatchWriter>>MakeFileWriter(io::OutputStream*sink,conststd::shared_ptr<Schema>&schema,constIpcWriteOptions&options=IpcWriteOptions::Defaults(),conststd::shared_ptr<constKeyValueMetadata>&metadata=NULLPTR)#
Create a new IPC file writer from stream sink and schema.
- Parameters:
sink –[in] output stream to write to
schema –[in] the schema of the record batches to be written
options –[in] options for serialization, optional
metadata –[in] custom metadata for File Footer, optional
- Returns:
Result<std::shared_ptr<RecordBatchWriter>>
- Result<std::shared_ptr<RecordBatchWriter>>MakeFileWriter(std::shared_ptr<io::OutputStream>sink,conststd::shared_ptr<Schema>&schema,constIpcWriteOptions&options=IpcWriteOptions::Defaults(),conststd::shared_ptr<constKeyValueMetadata>&metadata=NULLPTR)#
Create a new IPC file writer from stream sink and schema.
- Parameters:
sink –[in] output stream to write to
schema –[in] the schema of the record batches to be written
options –[in] options for serialization, optional
metadata –[in] custom metadata for File Footer, optional
- Returns:
Result<std::shared_ptr<RecordBatchWriter>>
- classRecordBatchWriter#
Abstract interface for writing a stream of record batches.
Subclassed by arrow::flight::MetadataRecordBatchWriter
Public Functions
- virtualStatusWriteRecordBatch(constRecordBatch&batch)=0#
Write a record batch to the stream.
- Parameters:
batch –[in] the record batch to write to the stream
- Returns:
- virtualStatusWriteRecordBatch(constRecordBatch&batch,conststd::shared_ptr<constKeyValueMetadata>&custom_metadata)#
Write a record batch with custom metadata to the stream.
- Parameters:
batch –[in] the record batch to write to the stream
custom_metadata –[in] the record batch’s custom metadata to write to the stream
- Returns:
- StatusWriteTable(constTable&table)#
Write possibly-chunked table by creating sequence of record batches.
- Parameters:
table –[in] table to write
- Returns:
- virtualStatusWriteTable(constTable&table,int64_tmax_chunksize)#
WriteTable with a particular chunksize.
- Parameters:
table –[in] table to write
max_chunksize –[in] maximum number of rows for table chunks. To indicate that no maximum should be enforced, pass -1.
- Returns:
- virtualWriteStatsstats()const=0#
Return current write statistics.
- virtualStatusWriteRecordBatch(constRecordBatch&batch)=0#
Statistics#
- structWriteStats#
Public Members
- int64_tnum_messages=0#
Number of IPC messages written.
- int64_tnum_record_batches=0#
Number of record batches written.
- int64_tnum_dictionary_batches=0#
Number of dictionary batches written.
Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries
- int64_tnum_dictionary_deltas=0#
Number of dictionary deltas written.
- int64_tnum_replaced_dictionaries=0#
Number of replaced dictionaries (i.e.
where a dictionary batch replaces an existing dictionary with an unrelated new dictionary).
- int64_ttotal_raw_body_size=0#
Total size in bytes of record batches emitted.
The “raw” size counts the original buffer sizes, while the “serialized” size includes padding and (optionally) compression.
- int64_tnum_messages=0#

