Dataset#
Warning
Experimental: The Java moduledataset is currently under earlydevelopment. API might be changed in each release of Apache Arrow until itgets mature.
Dataset is an universal layer in Apache Arrow for querying data in differentformats or in different partitioning strategies. Usually the data to be queriedis supposed to be located from a traditional file system, however Arrow Datasetis not designed only for querying files but can be extended to serve allpossible data sources such as from inter-process communication or from othernetwork locations, etc.
Getting Started#
Currently supported file formats are:
Apache Arrow (
.arrow)Apache ORC (
.orc)Apache Parquet (
.parquet)Comma-Separated Values (
.csv)Line-delimited JSON Values (
.json)
Below shows a simplest example of using Dataset to query a Parquet file in Java:
// read data from file /opt/example.parquetStringuri="file:/opt/example.parquet";ScanOptionsoptions=newScanOptions(/*batchSize*/32768);try(BufferAllocatorallocator=newRootAllocator();DatasetFactorydatasetFactory=newFileSystemDatasetFactory(allocator,NativeMemoryPool.getDefault(),FileFormat.PARQUET,uri);Datasetdataset=datasetFactory.finish();Scannerscanner=dataset.newScan(options);ArrowReaderreader=scanner.scanBatches()){List<ArrowRecordBatch>batches=newArrayList<>();while(reader.loadNextBatch()){try(VectorSchemaRootroot=reader.getVectorSchemaRoot()){finalVectorUnloaderunloader=newVectorUnloader(root);batches.add(unloader.getRecordBatch());}}// do something with read record batches, for example:analyzeArrowData(batches);// finished the analysis of the data, close all resources:AutoCloseables.close(batches);}catch(Exceptione){e.printStackTrace();}
Note
ArrowRecordBatch is a low-level composite Arrow data exchange formatthat doesn’t provide API to read typed data from it directly.It’s recommended to use utilitiesVectorLoader to load it into a schemaaware containerVectorSchemaRoot by which user could be able to accessdecoded data conveniently in Java.
TheScanOptionsbatchSize argument takes effect only if it is set to a valuesmaller than the number of rows in the recordbatch.
See also
Load record batches withVectorSchemaRoot.
Schema#
Schema of the data to be queried can be inspected via methodDatasetFactory#inspect() before actually reading it. For example:
// read data from local file /opt/example.parquetStringuri="file:/opt/example.parquet";BufferAllocatorallocator=newRootAllocator(Long.MAX_VALUE);DatasetFactoryfactory=newFileSystemDatasetFactory(allocator,NativeMemoryPool.getDefault(),FileFormat.PARQUET,uri);// inspect schemaSchemaschema=factory.inspect();
For some of the data format that is compatible with a user-defined schema, usercan use methodDatasetFactory#inspect(Schemaschema) to create the dataset:
Schemaschema=createUserSchema()Datasetdataset=factory.finish(schema);
Otherwise when the non-parameter methodDatasetFactory#inspect() is called,schema will be inferred automatically from data source. The same as the resultofDatasetFactory#inspect().
Also, if projector is specified during scanning (see next sectionProjection (Subset of Columns)), the actual schema of output data can be gotwithin methodScanner::schema():
Scannerscanner=dataset.newScan(newScanOptions(32768,Optional.of(newString[]{"id","name"})));SchemaprojectedSchema=scanner.schema();
Projection (Subset of Columns)#
User can specify projections in ScanOptions. For example:
String[]projection=newString[]{"id","name"};ScanOptionsoptions=newScanOptions(32768,Optional.of(projection));
If no projection is needed, leave the optional projection argument absent inScanOptions:
ScanOptionsoptions=newScanOptions(32768,Optional.empty());
Or use shortcut constructor:
ScanOptionsoptions=newScanOptions(32768);
Then all columns will be emitted during scanning.
Projection (Produce New Columns) and Filters#
User can specify projections (new columns) or filters in ScanOptions using Substrait. For example:
ByteBuffersubstraitExpressionFilter=getSubstraitExpressionFilter();ByteBuffersubstraitExpressionProject=getSubstraitExpressionProjection();// Use Substrait APIs to create an Expression and serialize to a ByteBufferScanOptionsoptions=newScanOptions.Builder(/*batchSize*/32768).columns(Optional.empty()).substraitExpressionFilter(substraitExpressionFilter).substraitExpressionProjection(getSubstraitExpressionProjection()).build();
See also
- Executing Projections and Filters Using Extended Expressions
Projections and Filters using Substrait.
Read Data from HDFS#
FileSystemDataset supports reading data from non-local file systems. HDFSsupport is included in the official Apache Arrow Java package releases andcan be used directly without re-building the source code.
To access HDFS data using Dataset API, pass a general HDFS URI toFilesSystemDatasetFactory:
Stringuri="hdfs://{hdfs_host}:{port}/data/example.parquet";BufferAllocatorallocator=newRootAllocator(Long.MAX_VALUE);DatasetFactoryfactory=newFileSystemDatasetFactory(allocator,NativeMemoryPool.getDefault(),FileFormat.PARQUET,uri);
Native Memory Management#
To gain better performance and reduce code complexity, JavaFileSystemDataset internally relies on C++arrow::dataset::FileSystemDataset via JNI.As a result, all Arrow data read fromFileSystemDataset is supposed to beallocated off the JVM heap. To manage this part of memory, an utility classNativeMemoryPool is provided to users.
As a basic example, by using a listenableNativeMemoryPool, user can passa listener hooking on C++ buffer allocation/deallocation:
AtomicLongreserved=newAtomicLong(0L);ReservationListenerlistener=newReservationListener(){@Overridepublicvoidreserve(longsize){reserved.getAndAdd(size);}@Overridepublicvoidunreserve(longsize){reserved.getAndAdd(-size);}};NativeMemoryPoolpool=NativeMemoryPool.createListenable(listener);FileSystemDatasetFactoryfactory=newFileSystemDatasetFactory(allocator,pool,FileFormat.PARQUET,uri);
Also, it’s a very common case to reserve the same amount of JVM direct memoryfor the data read from datasets. For this use a built-in utilityclassDirectReservationListener is provided:
NativeMemoryPoolpool=NativeMemoryPool.createListenable(DirectReservationListener.instance());
This way, once the allocated byte count of Arrow buffers reaches the limit ofJVM direct memory,OutOfMemoryError:Directbuffermemory willbe thrown during scanning.
Note
The default instanceNativeMemoryPool.getDefaultMemoryPool() doesnothing on buffer allocation/deallocation. It’s OK to use it inthe case of POC or testing, but for production use in complex environment,it’s recommended to manage memory by using a listenable memory pool.
Note
TheBufferAllocator instance passed toFileSystemDatasetFactory’sconstructor is also aware of the overall memory usage of the produceddataset instances. Once the Java buffers are created the passed allocatorwill become their parent allocator.
Usage Notes#
Native Object Resource Management#
As another result of relying on JNI, all components related toFileSystemDataset should be closed manually or use try-with-resources torelease the corresponding native objects after using. For example:
Stringuri="file:/opt/example.parquet";ScanOptionsoptions=newScanOptions(/*batchSize*/32768);try(BufferAllocatorallocator=newRootAllocator();DatasetFactoryfactory=newFileSystemDatasetFactory(allocator,NativeMemoryPool.getDefault(),FileFormat.PARQUET,uri);Datasetdataset=factory.finish();Scannerscanner=dataset.newScan(options)){// do something}catch(Exceptione){e.printStackTrace();}
If user forgets to close them then native object leakage might be caused.
BatchSize#
ThebatchSize argument ofScanOptions is a limit on the size of an individual batch.
For example, let’s try to read a Parquet file with gzip compression and 3 row groups:
# Let configure ScanOptions as:ScanOptions options = new ScanOptions(/*batchSize*/ 32768);$ parquet-tools meta data4_3rg_gzip.parquetfile schema: schemaage: OPTIONAL INT64 R:0 D:1name: OPTIONAL BINARY L:STRING R:0 D:1row group 1: RC:4 TS:182 OFFSET:4row group 2: RC:4 TS:190 OFFSET:420row group 3: RC:3 TS:179 OFFSET:838
Here, we set the batchSize in ScanOptions to 32768. Because that’s greaterthan the number of rows in the next batch, which is 4 rows because the firstrow group has only 4 rows, then the program gets only 4 rows. The scannerwill not combine smaller batches to reach the limit, but it will splitlarge batches to stay under the limit. So in the case the row group had morethan 32768 rows, it would get split into blocks of 32768 rows or less.

