Reading/Writing IPC formats#
Arrow defines two types of binary formats for serializing record batches:
Streaming format: for sending an arbitrary number of recordbatches. The format must be processed from start to end, and does not supportrandom access
File or Random Access format: for serializing a fixed number of recordbatches. It supports random access, and thus is very useful when used withmemory maps
Writing and Reading Streaming Format#
First, let’s populate aVectorSchemaRoot with a small batch of records
BitVectorbitVector=newBitVector("boolean",allocator);VarCharVectorvarCharVector=newVarCharVector("varchar",allocator);for(inti=0;i<10;i++){bitVector.setSafe(i,i%2==0?0:1);varCharVector.setSafe(i,("test"+i).getBytes(StandardCharsets.UTF_8));}bitVector.setValueCount(10);varCharVector.setValueCount(10);List<Field>fields=Arrays.asList(bitVector.getField(),varCharVector.getField());List<FieldVector>vectors=Arrays.asList(bitVector,varCharVector);VectorSchemaRootroot=newVectorSchemaRoot(fields,vectors);
Now, we can begin writing a stream containing some number of these batches. For this we useArrowStreamWriter(DictionaryProvider used for any vectors that are dictionary encoded is optional and can be null))
try(ByteArrayOutputStreamout=newByteArrayOutputStream();ArrowStreamWriterwriter=newArrowStreamWriter(root,/*DictionaryProvider=*/null,Channels.newChannel(out));){// ... do write into the ArrowStreamWriter}
Here we used an in-memory stream, but this could have been a socket or some other IO stream. Then we can do
writer.start();// write the first batchwriter.writeBatch();// write another four batches.for(inti=0;i<4;i++){// populate VectorSchemaRoot data and write the second batchBitVectorchildVector1=(BitVector)root.getVector(0);VarCharVectorchildVector2=(VarCharVector)root.getVector(1);childVector1.reset();childVector2.reset();// ... do some populate work here, could be different for each batchwriter.writeBatch();}writer.end();
Note that, since theVectorSchemaRoot in the writer is a container that can hold batches, batches flow throughVectorSchemaRoot as part of a pipeline, so we need to populate data beforewriteBatch, so that later batchescould overwrite previous ones.
Now theByteArrayOutputStream contains the complete stream which contains 5 record batches.We can read such a stream withArrowStreamReader. Note that theVectorSchemaRoot within the readerwill be loaded with new values on every call toloadNextBatch()
try(ArrowStreamReaderreader=newArrowStreamReader(newByteArrayInputStream(out.toByteArray()),allocator)){// This will be loaded with new values on every call to loadNextBatchVectorSchemaRootreadRoot=reader.getVectorSchemaRoot();Schemaschema=readRoot.getSchema();for(inti=0;i<5;i++){reader.loadNextBatch();// ... do something with readRoot}}
Here we also give a simple example with dictionary encoded vectors
// create providerDictionaryProvider.MapDictionaryProviderprovider=newDictionaryProvider.MapDictionaryProvider();try(finalVarCharVectordictVector=newVarCharVector("dict",allocator);finalVarCharVectorvector=newVarCharVector("vector",allocator);){// create dictionary vectordictVector.allocateNewSafe();dictVector.setSafe(0,"aa".getBytes());dictVector.setSafe(1,"bb".getBytes());dictVector.setSafe(2,"cc".getBytes());dictVector.setValueCount(3);// create dictionaryDictionarydictionary=newDictionary(dictVector,newDictionaryEncoding(1L,false,/*indexType=*/null));provider.put(dictionary);// create original data vectorvector.allocateNewSafe();vector.setSafe(0,"bb".getBytes());vector.setSafe(1,"bb".getBytes());vector.setSafe(2,"cc".getBytes());vector.setSafe(3,"aa".getBytes());vector.setValueCount(4);// get the encoded vectorIntVectorencodedVector=(IntVector)DictionaryEncoder.encode(vector,dictionary);ByteArrayOutputStreamout=newByteArrayOutputStream();// create VectorSchemaRootList<Field>fields=Arrays.asList(encodedVector.getField());List<FieldVector>vectors=Arrays.asList(encodedVector);try(VectorSchemaRootroot=newVectorSchemaRoot(fields,vectors)){// write dataArrowStreamWriterwriter=newArrowStreamWriter(root,provider,Channels.newChannel(out));writer.start();writer.writeBatch();writer.end();}// read datatry(ArrowStreamReaderreader=newArrowStreamReader(newByteArrayInputStream(out.toByteArray()),allocator)){reader.loadNextBatch();VectorSchemaRootreadRoot=reader.getVectorSchemaRoot();// get the encoded vectorIntVectorintVector=(IntVector)readRoot.getVector(0);// get dictionaries and decode the vectorMap<Long,Dictionary>dictionaryMap=reader.getDictionaryVectors();longdictionaryId=intVector.getField().getDictionary().getId();try(VarCharVectorvarCharVector=(VarCharVector)DictionaryEncoder.decode(intVector,dictionaryMap.get(dictionaryId))){// ... use decoded vector}}}
Writing and Reading Random Access Files#
TheArrowFileWriter has the same API asArrowStreamWriter
try(ByteArrayOutputStreamout=newByteArrayOutputStream();ArrowFileWriterwriter=newArrowFileWriter(root,/*DictionaryProvider=*/null,Channels.newChannel(out));){writer.start();// write the first batchwriter.writeBatch();// write another four batches.for(inti=0;i<4;i++){// ... do populate workwriter.writeBatch();}writer.end();}
The difference betweenArrowFileReader andArrowStreamReader is that the input sourcemust have aseek method for random access. Because we have access to the entire payload, we know thenumber of record batches in the file, and can read any at random
try(ArrowFileReaderreader=newArrowFileReader(newByteArrayReadableSeekableByteChannel(out.toByteArray()),allocator)){// read the 4-th batchArrowBlockblock=reader.getRecordBlocks().get(3);reader.loadRecordBatch(block);VectorSchemaRootreadBatch=reader.getVectorSchemaRoot();}

