Substrait#
Thearrow-dataset module can executeSubstrait plans via theAceroquery engine.
Executing Queries Using Substrait Plans#
Plans can reference data in files via URIs, or “named tables” that must be provided along with the plan.
Here is an example of a Java program that queries a Parquet file using Java Substrait(this example useSubstrait Java project to compile a SQL query to a Substrait plan):
importcom.google.common.collect.ImmutableList;importio.substrait.isthmus.SqlToSubstrait;importio.substrait.proto.Plan;importorg.apache.arrow.dataset.file.FileFormat;importorg.apache.arrow.dataset.file.FileSystemDatasetFactory;importorg.apache.arrow.dataset.jni.NativeMemoryPool;importorg.apache.arrow.dataset.scanner.ScanOptions;importorg.apache.arrow.dataset.scanner.Scanner;importorg.apache.arrow.dataset.source.Dataset;importorg.apache.arrow.dataset.source.DatasetFactory;importorg.apache.arrow.dataset.substrait.AceroSubstraitConsumer;importorg.apache.arrow.memory.BufferAllocator;importorg.apache.arrow.memory.RootAllocator;importorg.apache.arrow.vector.ipc.ArrowReader;importorg.apache.calcite.sql.parser.SqlParseException;importjava.nio.ByteBuffer;importjava.util.HashMap;importjava.util.Map;publicclassClientSubstrait{publicstaticvoidmain(String[]args){Stringuri="file:///data/tpch_parquet/nation.parquet";ScanOptionsoptions=newScanOptions(/*batchSize*/32768);try(BufferAllocatorallocator=newRootAllocator();DatasetFactorydatasetFactory=newFileSystemDatasetFactory(allocator,NativeMemoryPool.getDefault(),FileFormat.PARQUET,uri);Datasetdataset=datasetFactory.finish();Scannerscanner=dataset.newScan(options);ArrowReaderreader=scanner.scanBatches()){// map table to readerMap<String,ArrowReader>mapTableToArrowReader=newHashMap<>();mapTableToArrowReader.put("NATION",reader);// get binary planPlanplan=getPlan();ByteBuffersubstraitPlan=ByteBuffer.allocateDirect(plan.toByteArray().length);substraitPlan.put(plan.toByteArray());// run querytry(ArrowReaderarrowReader=newAceroSubstraitConsumer(allocator).runQuery(substraitPlan,mapTableToArrowReader)){while(arrowReader.loadNextBatch()){System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());}}}catch(Exceptione){e.printStackTrace();}}staticPlangetPlan()throwsSqlParseException{Stringsql="SELECT * from nation";Stringnation="CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), "+"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";SqlToSubstraitsqlToSubstrait=newSqlToSubstrait();Planplan=sqlToSubstrait.execute(sql,ImmutableList.of(nation));returnplan;}}
// Results example:FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3)0 ALGERIA 0 haggle. carefully final deposits detect slyly agai1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon
Executing Projections and Filters Using Extended Expressions#
Dataset also supports projections and filters with Substrait’sExtended Expression.This requires the substrait-java library.
This Java program:
Loads a Parquet file containing the “nation” table from the TPC-H benchmark.
- Applies a filter:
N_NATIONKEY>18
- Projects two new columns:
N_REGIONKEY+10N_NAME||'-'||N_COMMENT
importcom.google.common.collect.ImmutableList;importio.substrait.isthmus.SqlExpressionToSubstrait;importio.substrait.proto.ExtendedExpression;importorg.apache.arrow.dataset.file.FileFormat;importorg.apache.arrow.dataset.file.FileSystemDatasetFactory;importorg.apache.arrow.dataset.jni.NativeMemoryPool;importorg.apache.arrow.dataset.scanner.ScanOptions;importorg.apache.arrow.dataset.scanner.Scanner;importorg.apache.arrow.dataset.source.Dataset;importorg.apache.arrow.dataset.source.DatasetFactory;importorg.apache.arrow.memory.BufferAllocator;importorg.apache.arrow.memory.RootAllocator;importorg.apache.arrow.vector.ipc.ArrowReader;importorg.apache.calcite.sql.parser.SqlParseException;importjava.nio.ByteBuffer;importjava.util.Base64;importjava.util.Optional;publicclassClientSubstraitExtendedExpressionsCookbook{publicstaticvoidmain(String[]args)throwsSqlParseException{projectAndFilterDataset();}privatestaticvoidprojectAndFilterDataset()throwsSqlParseException{Stringuri="file:///Users/data/tpch_parquet/nation.parquet";ScanOptionsoptions=newScanOptions.Builder(/*batchSize*/32768).columns(Optional.empty()).substraitFilter(getByteBuffer(newString[]{"N_NATIONKEY > 18"})).substraitProjection(getByteBuffer(newString[]{"N_REGIONKEY + 10","N_NAME || CAST(' - ' as VARCHAR) || N_COMMENT"})).build();try(BufferAllocatorallocator=newRootAllocator();DatasetFactorydatasetFactory=newFileSystemDatasetFactory(allocator,NativeMemoryPool.getDefault(),FileFormat.PARQUET,uri);Datasetdataset=datasetFactory.finish();Scannerscanner=dataset.newScan(options);ArrowReaderreader=scanner.scanBatches()){while(reader.loadNextBatch()){System.out.println(reader.getVectorSchemaRoot().contentToTSVString());}}catch(Exceptione){thrownewRuntimeException(e);}}privatestaticByteBuffergetByteBuffer(String[]sqlExpression)throwsSqlParseException{Stringschema="CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME VARCHAR, "+"N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";SqlExpressionToSubstraitexpressionToSubstrait=newSqlExpressionToSubstrait();ExtendedExpressionexpression=expressionToSubstrait.convert(sqlExpression,ImmutableList.of(schema));byte[]expressionToByte=Base64.getDecoder().decode(Base64.getEncoder().encodeToString(expression.toByteArray()));ByteBufferbyteBuffer=ByteBuffer.allocateDirect(expressionToByte.length);byteBuffer.put(expressionToByte);returnbyteBuffer;}}
column-1 column-213 ROMANIA - ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account14 SAUDI ARABIA - ts. silent requests haggle. closely express packages sleep across the blithely12 VIETNAM - hely enticingly express accounts. even, final13 RUSSIA - requests against the platelets use never according to the quickly regular pint13 UNITED KINGDOM - eans boost carefully special requests. accounts are. carefull11 UNITED STATES - y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be
On this page

