Source code for pyarrow.ipc

# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements.  See the NOTICE file# distributed with this work for additional information# regarding copyright ownership.  The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License.  You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied.  See the License for the# specific language governing permissions and limitations# under the License.# Arrow file and stream reader/writer classes, and other messaging toolsimportosimportpyarrowaspafrompyarrow.libimport(IpcReadOptions,IpcWriteOptions,ReadStats,WriteStats,# noqaMessage,MessageReader,RecordBatchReader,_ReadPandasMixin,MetadataVersion,Alignment,read_message,read_record_batch,read_schema,read_tensor,write_tensor,get_record_batch_size,get_tensor_size)importpyarrow.libaslib
[docs]classRecordBatchStreamReader(lib._RecordBatchStreamReader):""" Reader for the Arrow streaming binary format. Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. If you want to use memory map use MemoryMappedFile as source. options : pyarrow.ipc.IpcReadOptions Options for IPC deserialization. If None, default values will be used. memory_pool : MemoryPool, default None If None, default memory pool is used. """
[docs]def__init__(self,source,*,options=None,memory_pool=None):options=_ensure_default_ipc_read_options(options)self._open(source,options=options,memory_pool=memory_pool)
_ipc_writer_class_doc="""\Parameters----------sink : str, pyarrow.NativeFile, or file-like Python object Either a file path, or a writable file object.schema : pyarrow.Schema The Arrow schema for data to be written to the file.options : pyarrow.ipc.IpcWriteOptions Options for IPC serialization. If None, default values will be used: the legacy format will not be used unless overridden by setting the environment variable ARROW_PRE_0_15_IPC_FORMAT=1, and the V5 metadata version will be used unless overridden by setting the environment variable ARROW_PRE_1_0_METADATA_VERSION=1."""_ipc_file_writer_class_doc=(_ipc_writer_class_doc+"\n"+"""\metadata : dict | pyarrow.KeyValueMetadata, optional Key/value pairs (both must be bytes-like) that will be stored in the file footer and are retrievable via pyarrow.ipc.open_file(...).metadata.""")
[docs]classRecordBatchStreamWriter(lib._RecordBatchStreamWriter):__doc__=f"""Writer for the Arrow streaming binary format{_ipc_writer_class_doc}"""
[docs]def__init__(self,sink,schema,*,options=None):options=_get_legacy_format_default(options)self._open(sink,schema,options=options)
[docs]classRecordBatchFileReader(lib._RecordBatchFileReader):""" Class for reading Arrow record batch data from the Arrow binary file format Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. If you want to use memory map use MemoryMappedFile as source. footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data options : pyarrow.ipc.IpcReadOptions Options for IPC serialization. If None, default values will be used. memory_pool : MemoryPool, default None If None, default memory pool is used. """
[docs]def__init__(self,source,footer_offset=None,*,options=None,memory_pool=None):options=_ensure_default_ipc_read_options(options)self._open(source,footer_offset=footer_offset,options=options,memory_pool=memory_pool)
[docs]classRecordBatchFileWriter(lib._RecordBatchFileWriter):__doc__=f"""Writer to create the Arrow binary file format{_ipc_file_writer_class_doc}"""
[docs]def__init__(self,sink,schema,*,options=None,metadata=None):options=_get_legacy_format_default(options)self._open(sink,schema,options=options,metadata=metadata)
def_get_legacy_format_default(options):ifoptions:ifnotisinstance(options,IpcWriteOptions):raiseTypeError(f"expected IpcWriteOptions, got{type(options)}")returnoptionsmetadata_version=MetadataVersion.V5use_legacy_format= \bool(int(os.environ.get('ARROW_PRE_0_15_IPC_FORMAT','0')))ifbool(int(os.environ.get('ARROW_PRE_1_0_METADATA_VERSION','0'))):metadata_version=MetadataVersion.V4returnIpcWriteOptions(use_legacy_format=use_legacy_format,metadata_version=metadata_version)def_ensure_default_ipc_read_options(options):ifoptionsandnotisinstance(options,IpcReadOptions):raiseTypeError(f"expected IpcReadOptions, got{type(options)}")returnoptionsorIpcReadOptions()
[docs]defnew_stream(sink,schema,*,options=None):returnRecordBatchStreamWriter(sink,schema,options=options)
new_stream.__doc__=f"""\Create an Arrow columnar IPC stream writer instance{_ipc_writer_class_doc}Returns-------writer : RecordBatchStreamWriter A writer for the given sink"""
[docs]defopen_stream(source,*,options=None,memory_pool=None):""" Create reader for Arrow streaming format. Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. options : pyarrow.ipc.IpcReadOptions Options for IPC serialization. If None, default values will be used. memory_pool : MemoryPool, default None If None, default memory pool is used. Returns ------- reader : RecordBatchStreamReader A reader for the given source """returnRecordBatchStreamReader(source,options=options,memory_pool=memory_pool)
[docs]defnew_file(sink,schema,*,options=None,metadata=None):returnRecordBatchFileWriter(sink,schema,options=options,metadata=metadata)
new_file.__doc__=f"""\Create an Arrow columnar IPC file writer instance{_ipc_file_writer_class_doc}Returns-------writer : RecordBatchFileWriter A writer for the given sink"""
[docs]defopen_file(source,footer_offset=None,*,options=None,memory_pool=None):""" Create reader for Arrow file format. Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data. options : pyarrow.ipc.IpcReadOptions Options for IPC serialization. If None, default values will be used. memory_pool : MemoryPool, default None If None, default memory pool is used. Returns ------- reader : RecordBatchFileReader A reader for the given source """returnRecordBatchFileReader(source,footer_offset=footer_offset,options=options,memory_pool=memory_pool)
defserialize_pandas(df,*,nthreads=None,preserve_index=None):""" Serialize a pandas DataFrame into a buffer protocol compatible object. Parameters ---------- df : pandas.DataFrame nthreads : int, default None Number of threads to use for conversion to Arrow, default all CPUs. preserve_index : bool, default None The default of None will store the index as a column, except for RangeIndex which is stored as metadata only. If True, always preserve the pandas index data as a column. If False, no index information is saved and the result will have a default RangeIndex. Returns ------- buf : buffer An object compatible with the buffer protocol. """batch=pa.RecordBatch.from_pandas(df,nthreads=nthreads,preserve_index=preserve_index)sink=pa.BufferOutputStream()withpa.RecordBatchStreamWriter(sink,batch.schema)aswriter:writer.write_batch(batch)returnsink.getvalue()defdeserialize_pandas(buf,*,use_threads=True):"""Deserialize a buffer protocol compatible object into a pandas DataFrame. Parameters ---------- buf : buffer An object compatible with the buffer protocol. use_threads : bool, default True Whether to parallelize the conversion using multiple threads. Returns ------- df : pandas.DataFrame The buffer deserialized as pandas DataFrame """buffer_reader=pa.BufferReader(buf)withpa.RecordBatchStreamReader(buffer_reader)asreader:table=reader.read_all()returntable.to_pandas(use_threads=use_threads)