Streaming, Serialization, and IPC#

Writing and Reading Streams#

Arrow defines two types of binary formats for serializing record batches:

  • Streaming format: for sending an arbitrary length sequence of recordbatches. The format must be processed from start to end, and does not supportrandom access

  • File or Random Access format: for serializing a fixed number of recordbatches. Supports random access, and thus is very useful when used withmemory maps

To follow this section, make sure to first read the section onMemory andIO.

Using streams#

First, let’s create a small record batch:

In [1]:importpyarrowaspaIn [2]:data=[   ...:pa.array([1,2,3,4]),   ...:pa.array(['foo','bar','baz',None]),   ...:pa.array([True,None,False,True])   ...:]   ...:In [3]:batch=pa.record_batch(data,names=['f0','f1','f2'])In [4]:batch.num_rowsOut[4]:4In [5]:batch.num_columnsOut[5]:3

Now, we can begin writing a stream containing some number of these batches. Forthis we useRecordBatchStreamWriter, which can write to awriteableNativeFile object or a writeable Python object. For convenience,this one can be created withnew_stream():

In [6]:sink=pa.BufferOutputStream()In [7]:withpa.ipc.new_stream(sink,batch.schema)aswriter:   ...:foriinrange(5):   ...:writer.write_batch(batch)   ...:

Here we used an in-memory Arrow buffer stream (sink),but this could have been a socket or some other IO sink.

When creating theStreamWriter, we pass the schema, since the schema(column names and types) must be the same for all of the batches sent in thisparticular stream. Now we can do:

In [8]:buf=sink.getvalue()In [9]:buf.sizeOut[9]:1984

Nowbuf contains the complete stream as an in-memory byte buffer. We canread such a stream withRecordBatchStreamReader or theconvenience functionpyarrow.ipc.open_stream:

In [10]:withpa.ipc.open_stream(buf)asreader:   ....:schema=reader.schema   ....:batches=[bforbinreader]   ....:In [11]:schemaOut[11]:f0: int64f1: stringf2: boolIn [12]:len(batches)Out[12]:5

We can check the returned batches are the same as the original input:

In [13]:batches[0].equals(batch)Out[13]:True

An important point is that if the input source supports zero-copy reads(e.g. like a memory map, orpyarrow.BufferReader), then the returnedbatches are also zero-copy and do not allocate any new memory on read.

Writing and Reading Random Access Files#

TheRecordBatchFileWriter has the same API asRecordBatchStreamWriter. You can create one withnew_file():

In [14]:sink=pa.BufferOutputStream()In [15]:withpa.ipc.new_file(sink,batch.schema)aswriter:   ....:foriinrange(10):   ....:writer.write_batch(batch)   ....:In [16]:buf=sink.getvalue()In [17]:buf.sizeOut[17]:4226

The difference betweenRecordBatchFileReader andRecordBatchStreamReader is that the input source must have aseek method for random access. The stream reader only requires readoperations. We can also use theopen_file() method to open a file:

In [18]:withpa.ipc.open_file(buf)asreader:   ....:num_record_batches=reader.num_record_batches   ....:In [19]:b=reader.get_batch(3)

Because we have access to the entire payload, we know the number of recordbatches in the file, and can read any at random.

In [20]:num_record_batchesOut[20]:10In [21]:b.equals(batch)Out[21]:True

Reading from Stream and File Format for pandas#

The stream and file reader classes have a specialread_pandas method tosimplify reading multiple record batches and converting them to a singleDataFrame output:

In [22]:withpa.ipc.open_file(buf)asreader:   ....:df=reader.read_pandas()   ....:In [23]:df[:5]Out[23]:   f0    f1     f20   1   foo   True1   2   bar   None2   3   baz  False3   4  None   True4   1   foo   True

Efficiently Writing and Reading Arrow Data#

Being optimized for zero copy and memory mapped data, Arrow allows to easilyread and write arrays consuming the minimum amount of resident memory.

When writing and reading raw Arrow data, we can use the Arrow File Formator the Arrow Streaming Format.

To dump an array to file, you can use thenew_file()which will provide a newRecordBatchFileWriter instancethat can be used to write batches of data to that file.

For example to write an array of 10M integers, we could write it in 1000 chunksof 10000 entries:

In [24]:BATCH_SIZE=10000In [25]:NUM_BATCHES=1000In [26]:schema=pa.schema([pa.field('nums',pa.int32())])In [27]:withpa.OSFile('bigfile.arrow','wb')assink:   ....:withpa.ipc.new_file(sink,schema)aswriter:   ....:forrowinrange(NUM_BATCHES):   ....:batch=pa.record_batch([pa.array(range(BATCH_SIZE),type=pa.int32())],schema)   ....:writer.write(batch)   ....:

record batches support multiple columns, so in practice we always write theequivalent of aTable.

Writing in batches is effective because we in theory need to keep in memory onlythe current batch we are writing. But when reading back, we can be even more effectiveby directly mapping the data from disk and avoid allocating any new memory on read.

Under normal conditions, reading back our file will consume a few hundred megabytesof memory:

In [28]:withpa.OSFile('bigfile.arrow','rb')assource:   ....:loaded_array=pa.ipc.open_file(source).read_all()   ....:In [29]:print("LEN:",len(loaded_array))LEN: 10000000In [30]:print("RSS:{}MB".format(pa.total_allocated_bytes()>>20))RSS: 38MB

To more efficiently read big data from disk, we can memory map the file, so thatArrow can directly reference the data mapped from disk and avoid having toallocate its own memory.In such case the operating system will be able to page in the mapped memorylazily and page it out without any write back cost when under pressure,allowing to more easily read arrays bigger than the total memory.

In [31]:withpa.memory_map('bigfile.arrow','rb')assource:   ....:loaded_array=pa.ipc.open_file(source).read_all()   ....:In [32]:print("LEN:",len(loaded_array))LEN: 10000000In [33]:print("RSS:{}MB".format(pa.total_allocated_bytes()>>20))RSS: 0MB

Note

Other high level APIs likeread_table() also provide amemory_map option. But in those cases, the memory mapping can’t help withreducing resident memory consumption. SeeReading Parquet and Memory Mapping for details.