Tabular Datasets#

The Arrow Datasets library provides functionality to efficiently work withtabular, potentially larger than memory, and multi-file datasets. This includes:

  • A unified interface that supports different sources and file formats anddifferent file systems (local, cloud).

  • Discovery of sources (crawling directories, handling partitioned datasets withvarious partitioning schemes, basic schema normalization, …)

  • Optimized reading with predicate pushdown (filtering rows), projection(selecting and deriving columns), and optionally parallel reading.

The supported file formats currently are Parquet, Feather / Arrow IPC, CSV andORC (note that ORC datasets can currently only be read and not yet written).The goal is to expand support to other file formats and data sources(e.g. database connections) in the future.

Reading Datasets#

For the examples below, let’s create a small dataset consistingof a directory with two parquet files:

50// Generate some data for the rest of this example.51arrow::Result<std::shared_ptr<arrow::Table>>CreateTable(){52autoschema=53arrow::schema({arrow::field("a",arrow::int64()),arrow::field("b",arrow::int64()),54arrow::field("c",arrow::int64())});55std::shared_ptr<arrow::Array>array_a;56std::shared_ptr<arrow::Array>array_b;57std::shared_ptr<arrow::Array>array_c;58arrow::NumericBuilder<arrow::Int64Type>builder;59ARROW_RETURN_NOT_OK(builder.AppendValues({0,1,2,3,4,5,6,7,8,9}));60ARROW_RETURN_NOT_OK(builder.Finish(&array_a));61builder.Reset();62ARROW_RETURN_NOT_OK(builder.AppendValues({9,8,7,6,5,4,3,2,1,0}));63ARROW_RETURN_NOT_OK(builder.Finish(&array_b));64builder.Reset();65ARROW_RETURN_NOT_OK(builder.AppendValues({1,2,1,2,1,2,1,2,1,2}));66ARROW_RETURN_NOT_OK(builder.Finish(&array_c));67returnarrow::Table::Make(schema,{array_a,array_b,array_c});68}6970// Set up a dataset by writing two Parquet files.71arrow::Result<std::string>CreateExampleParquetDataset(72conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){73autobase_path=root_path+"/parquet_dataset";74ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));75// Create an Arrow Table76ARROW_ASSIGN_OR_RAISE(autotable,CreateTable());77// Write it into two Parquet files78ARROW_ASSIGN_OR_RAISE(autooutput,79filesystem->OpenOutputStream(base_path+"/data1.parquet"));80ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(81*table->Slice(0,5),arrow::default_memory_pool(),output,/*chunk_size=*/2048));82ARROW_ASSIGN_OR_RAISE(output,83filesystem->OpenOutputStream(base_path+"/data2.parquet"));84ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(85*table->Slice(5),arrow::default_memory_pool(),output,/*chunk_size=*/2048));86returnbase_path;87}

(See the full example at bottom:Full Example.)

Dataset discovery#

Aarrow::dataset::Dataset object can be created using the variousarrow::dataset::DatasetFactory objects. Here, we’ll use thearrow::dataset::FileSystemDatasetFactory, which can create a datasetgiven a base directory path:

163// Read the whole dataset with the given format, without partitioning.164arrow::Result<std::shared_ptr<arrow::Table>>ScanWholeDataset(165conststd::shared_ptr<fs::FileSystem>&filesystem,166conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){167// Create a dataset by scanning the filesystem for files168fs::FileSelectorselector;169selector.base_dir=base_dir;170ARROW_ASSIGN_OR_RAISE(171autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,172ds::FileSystemFactoryOptions()));173ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());174// Print out the fragments175ARROW_ASSIGN_OR_RAISE(autofragments,dataset->GetFragments())176for(constauto&fragment:fragments){177std::cout<<"Found fragment: "<<(*fragment)->ToString()<<std::endl;178}179// Read the entire dataset as a Table180ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());181ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());182returnscanner->ToTable();183}

We’re also passing the filesystem to use and the file format to use for reading.This lets us choose between (for example) reading local files or files in AmazonS3, or between Parquet and CSV.

In addition to searching a base directory, we can list file paths manually.

Creating aarrow::dataset::Dataset does not begin reading the dataitself. It only crawls the directory to find all the files (if needed), which canbe retrieved witharrow::dataset::FileSystemDataset::files():

// Print out the files crawled (only for FileSystemDataset)for(constauto&filename:dataset->files()){std::cout<<filename<<std::endl;}

…and infers the dataset’s schema (by default from the first file):

std::cout<<dataset->schema()->ToString()<<std::endl;

Using thearrow::dataset::Dataset::NewScan() method, we can build aarrow::dataset::Scanner and read the dataset (or a portion of it) intoaarrow::Table with thearrow::dataset::Scanner::ToTable()method:

163// Read the whole dataset with the given format, without partitioning.164arrow::Result<std::shared_ptr<arrow::Table>>ScanWholeDataset(165conststd::shared_ptr<fs::FileSystem>&filesystem,166conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){167// Create a dataset by scanning the filesystem for files168fs::FileSelectorselector;169selector.base_dir=base_dir;170ARROW_ASSIGN_OR_RAISE(171autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,172ds::FileSystemFactoryOptions()));173ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());174// Print out the fragments175ARROW_ASSIGN_OR_RAISE(autofragments,dataset->GetFragments())176for(constauto&fragment:fragments){177std::cout<<"Found fragment: "<<(*fragment)->ToString()<<std::endl;178}179// Read the entire dataset as a Table180ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());181ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());182returnscanner->ToTable();183}

Note

Depending on the size of your dataset, this can require a lot ofmemory; seeFiltering data below onfiltering/projecting.

Reading different file formats#

The above examples use Parquet files on local disk, but the Dataset APIprovides a consistent interface across multiple file formats and filesystems.(SeeReading from cloud storage for more information on the latter.)Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats aresupported; more formats are planned in the future.

If we save the table as Feather files instead of Parquet files:

 91// Set up a dataset by writing two Feather files. 92arrow::Result<std::string>CreateExampleFeatherDataset( 93conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){ 94autobase_path=root_path+"/feather_dataset"; 95ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); 96// Create an Arrow Table 97ARROW_ASSIGN_OR_RAISE(autotable,CreateTable()); 98// Write it into two Feather files 99ARROW_ASSIGN_OR_RAISE(autooutput,100filesystem->OpenOutputStream(base_path+"/data1.feather"));101ARROW_ASSIGN_OR_RAISE(autowriter,102arrow::ipc::MakeFileWriter(output.get(),table->schema()));103ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0,5)));104ARROW_RETURN_NOT_OK(writer->Close());105ARROW_ASSIGN_OR_RAISE(output,106filesystem->OpenOutputStream(base_path+"/data2.feather"));107ARROW_ASSIGN_OR_RAISE(writer,108arrow::ipc::MakeFileWriter(output.get(),table->schema()));109ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));110ARROW_RETURN_NOT_OK(writer->Close());111returnbase_path;112}

…then we can read the Feather file by passing anarrow::dataset::IpcFileFormat:

autoformat=std::make_shared<ds::ParquetFileFormat>();// ...autofactory=ds::FileSystemDatasetFactory::Make(filesystem,selector,format,options).ValueOrDie();

Customizing file formats#

arrow::dataset::FileFormat objects have properties that control howfiles are read. For example:

autoformat=std::make_shared<ds::ParquetFileFormat>();format->reader_options.dict_columns.insert("a");

Will configure column"a" to be dictionary-encoded when read. Similarly,settingarrow::dataset::CsvFileFormat::parse_options lets us changethings like reading comma-separated or tab-separated data.

Additionally, passing anarrow::dataset::FragmentScanOptions toarrow::dataset::ScannerBuilder::FragmentScanOptions() offers fine-grainedcontrol over data scanning. For example, for CSV files, we can change what valuesare converted into Boolean true and false at scan time.

Filtering data#

So far, we’ve been reading the entire dataset, but if we need only a subset of thedata, this can waste time or memory reading data we don’t need. Thearrow::dataset::Scanner offers control over what data to read.

In this snippet, we usearrow::dataset::ScannerBuilder::Project() to selectwhich columns to read:

187// Read a dataset, but select only column "b" and only rows where b < 4.188//189// This is useful when you only want a few columns from a dataset. Where possible,190// Datasets will push down the column selection such that less work is done.191arrow::Result<std::shared_ptr<arrow::Table>>FilterAndSelectDataset(192conststd::shared_ptr<fs::FileSystem>&filesystem,193conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){194fs::FileSelectorselector;195selector.base_dir=base_dir;196ARROW_ASSIGN_OR_RAISE(197autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,198ds::FileSystemFactoryOptions()));199ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());200// Read specified columns with a row filter201ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());202ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));203ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"),cp::literal(4))));204ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());205returnscanner->ToTable();206}

Some formats, such as Parquet, can reduce I/O costs here by reading only thespecified columns from the filesystem.

A filter can be provided witharrow::dataset::ScannerBuilder::Filter(), sothat rows which do not match the filter predicate will not be included in thereturned table. Again, some formats, such as Parquet, can use this filter toreduce the amount of I/O needed.

187// Read a dataset, but select only column "b" and only rows where b < 4.188//189// This is useful when you only want a few columns from a dataset. Where possible,190// Datasets will push down the column selection such that less work is done.191arrow::Result<std::shared_ptr<arrow::Table>>FilterAndSelectDataset(192conststd::shared_ptr<fs::FileSystem>&filesystem,193conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){194fs::FileSelectorselector;195selector.base_dir=base_dir;196ARROW_ASSIGN_OR_RAISE(197autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,198ds::FileSystemFactoryOptions()));199ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());200// Read specified columns with a row filter201ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());202ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));203ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"),cp::literal(4))));204ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());205returnscanner->ToTable();206}

Projecting columns#

In addition to selecting columns,arrow::dataset::ScannerBuilder::Project()can also be used for more complex projections, such as renaming columns, castingthem to other types, and even deriving new columns based on evaluatingexpressions.

In this case, we pass a vector of expressions used to construct column valuesand a vector of names for the columns:

210// Read a dataset, but with column projection.211//212// This is useful to derive new columns from existing data. For example, here we213// demonstrate casting a column to a different type, and turning a numeric column into a214// boolean column based on a predicate. You could also rename columns or perform215// computations involving multiple columns.216arrow::Result<std::shared_ptr<arrow::Table>>ProjectDataset(217conststd::shared_ptr<fs::FileSystem>&filesystem,218conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){219fs::FileSelectorselector;220selector.base_dir=base_dir;221ARROW_ASSIGN_OR_RAISE(222autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,223ds::FileSystemFactoryOptions()));224ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());225// Read specified columns with a row filter226ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());227ARROW_RETURN_NOT_OK(scan_builder->Project(228{229// Leave column "a" as-is.230cp::field_ref("a"),231// Cast column "b" to float32.232cp::call("cast",{cp::field_ref("b")},233arrow::compute::CastOptions::Safe(arrow::float32())),234// Derive a boolean column from "c".235cp::equal(cp::field_ref("c"),cp::literal(1)),236},237{"a_renamed","b_as_float32","c_1"}));238ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());239returnscanner->ToTable();240}

This also determines the column selection; only the given columns will bepresent in the resulting table. If you want to include a derived column inaddition to the existing columns, you can build up the expressions from thedataset schema:

244// Read a dataset, but with column projection.245//246// This time, we read all original columns plus one derived column. This simply combines247// the previous two examples: selecting a subset of columns by name, and deriving new248// columns with an expression.249arrow::Result<std::shared_ptr<arrow::Table>>SelectAndProjectDataset(250conststd::shared_ptr<fs::FileSystem>&filesystem,251conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){252fs::FileSelectorselector;253selector.base_dir=base_dir;254ARROW_ASSIGN_OR_RAISE(255autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,256ds::FileSystemFactoryOptions()));257ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());258// Read specified columns with a row filter259ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());260std::vector<std::string>names;261std::vector<cp::Expression>exprs;262// Read all the original columns.263for(constauto&field:dataset->schema()->fields()){264names.push_back(field->name());265exprs.push_back(cp::field_ref(field->name()));266}267// Also derive a new column.268names.emplace_back("b_large");269exprs.push_back(cp::greater(cp::field_ref("b"),cp::literal(1)));270ARROW_RETURN_NOT_OK(scan_builder->Project(exprs,names));271ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());272returnscanner->ToTable();273}

Note

When combining filters and projections, Arrow will determine allnecessary columns to read. For instance, if you filter on a column thatisn’t ultimately selected, Arrow will still read the column to evaluatethe filter.

Reading and writing partitioned data#

So far, we’ve been working with datasets consisting of flat directories withfiles. Oftentimes, a dataset will have one or more columns that are frequentlyfiltered on. Instead of having to read and then filter the data, by organizing thefiles into a nested directory structure, we can define a partitioned dataset,where sub-directory names hold information about which subset of the data isstored in that directory. Then, we can more efficiently filter data by using thatinformation to avoid loading files that don’t match the filter.

For example, a dataset partitioned by year and month may have the following layout:

dataset_name/  year=2007/    month=01/       data0.parquet       data1.parquet       ...    month=02/       data0.parquet       data1.parquet       ...    month=03/    ...  year=2008/    month=01/    ...  ...

The above partitioning scheme is using “/key=value/” directory names, as found inApache Hive. Under this convention, the file atdataset_name/year=2007/month=01/data0.parquet contains only data for whichyear==2007 andmonth==01.

Let’s create a small partitioned dataset. For this, we’ll use Arrow’s datasetwriting functionality.

116// Set up a dataset by writing files with partitioning117arrow::Result<std::string>CreateExampleParquetHivePartitionedDataset(118conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){119autobase_path=root_path+"/parquet_dataset";120ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));121// Create an Arrow Table122autoschema=arrow::schema(123{arrow::field("a",arrow::int64()),arrow::field("b",arrow::int64()),124arrow::field("c",arrow::int64()),arrow::field("part",arrow::utf8())});125std::vector<std::shared_ptr<arrow::Array>>arrays(4);126arrow::NumericBuilder<arrow::Int64Type>builder;127ARROW_RETURN_NOT_OK(builder.AppendValues({0,1,2,3,4,5,6,7,8,9}));128ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));129builder.Reset();130ARROW_RETURN_NOT_OK(builder.AppendValues({9,8,7,6,5,4,3,2,1,0}));131ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));132builder.Reset();133ARROW_RETURN_NOT_OK(builder.AppendValues({1,2,1,2,1,2,1,2,1,2}));134ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));135arrow::StringBuilderstring_builder;136ARROW_RETURN_NOT_OK(137string_builder.AppendValues({"a","a","a","a","a","b","b","b","b","b"}));138ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));139autotable=arrow::Table::Make(schema,arrays);140// Write it using Datasets141autodataset=std::make_shared<ds::InMemoryDataset>(table);142ARROW_ASSIGN_OR_RAISE(autoscanner_builder,dataset->NewScan());143ARROW_ASSIGN_OR_RAISE(autoscanner,scanner_builder->Finish());144145// The partition schema determines which fields are part of the partitioning.146autopartition_schema=arrow::schema({arrow::field("part",arrow::utf8())});147// We'll use Hive-style partitioning, which creates directories with "key=value" pairs.148autopartitioning=std::make_shared<ds::HivePartitioning>(partition_schema);149// We'll write Parquet files.150autoformat=std::make_shared<ds::ParquetFileFormat>();151ds::FileSystemDatasetWriteOptionswrite_options;152write_options.file_write_options=format->DefaultWriteOptions();153write_options.filesystem=filesystem;154write_options.base_dir=base_path;155write_options.partitioning=partitioning;156write_options.basename_template="part{i}.parquet";157ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options,scanner));158returnbase_path;159}

The above created a directory with two subdirectories (“part=a” and “part=b”),and the Parquet files written in those directories no longer include the “part”column.

Reading this dataset, we now specify that the dataset should use a Hive-likepartitioning scheme:

277// Read an entire dataset, but with partitioning information.278arrow::Result<std::shared_ptr<arrow::Table>>ScanPartitionedDataset(279conststd::shared_ptr<fs::FileSystem>&filesystem,280conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){281fs::FileSelectorselector;282selector.base_dir=base_dir;283selector.recursive=true;// Make sure to search subdirectories284ds::FileSystemFactoryOptionsoptions;285// We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition286// schema.287options.partitioning=ds::HivePartitioning::MakeFactory();288ARROW_ASSIGN_OR_RAISE(autofactory,ds::FileSystemDatasetFactory::Make(289filesystem,selector,format,options));290ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());291// Print out the fragments292ARROW_ASSIGN_OR_RAISE(autofragments,dataset->GetFragments());293for(constauto&fragment:fragments){294std::cout<<"Found fragment: "<<(*fragment)->ToString()<<std::endl;295std::cout<<"Partition expression: "296<<(*fragment)->partition_expression().ToString()<<std::endl;297}298ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());299ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());300returnscanner->ToTable();301}

Although the partition fields are not included in the actual Parquet files,they will be added back to the resulting table when scanning this dataset:

$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitionedFound fragment: /tmp/parquet_dataset/part=a/part0.parquetPartition expression: (part == "a")Found fragment: /tmp/parquet_dataset/part=b/part1.parquetPartition expression: (part == "b")Read 20 rowsa: int64  -- field metadata --  PARQUET:field_id: '1'b: double  -- field metadata --  PARQUET:field_id: '2'c: int64  -- field metadata --  PARQUET:field_id: '3'part: string----# snip...

We can now filter on the partition keys, which avoids loading filesaltogether if they do not match the filter:

305// Read an entire dataset, but with partitioning information. Also, filter the dataset on306// the partition values.307arrow::Result<std::shared_ptr<arrow::Table>>FilterPartitionedDataset(308conststd::shared_ptr<fs::FileSystem>&filesystem,309conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){310fs::FileSelectorselector;311selector.base_dir=base_dir;312selector.recursive=true;313ds::FileSystemFactoryOptionsoptions;314options.partitioning=ds::HivePartitioning::MakeFactory();315ARROW_ASSIGN_OR_RAISE(autofactory,ds::FileSystemDatasetFactory::Make(316filesystem,selector,format,options));317ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());318ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());319// Filter based on the partition values. This will mean that we won't even read the320// files whose partition expressions don't match the filter.321ARROW_RETURN_NOT_OK(322scan_builder->Filter(cp::equal(cp::field_ref("part"),cp::literal("b"))));323ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());324returnscanner->ToTable();325}

Different partitioning schemes#

The above example uses a Hive-like directory scheme, such as “/year=2009/month=11/day=15”.We specified this by passing the Hive partitioning factory. In this case, the types ofthe partition keys are inferred from the file paths.

It is also possible to directly construct the partitioning and explicitly definethe schema of the partition keys. For example:

autopart=std::make_shared<ds::HivePartitioning>(arrow::schema({arrow::field("year",arrow::int16()),arrow::field("month",arrow::int8()),arrow::field("day",arrow::int32())}));

Arrow supports another partitioning scheme, “directory partitioning”, where thesegments in the file path represent the values of the partition keys withoutincluding the name (the field names are implicit in the segment’s index). Forexample, given field names “year”, “month”, and “day”, one path might be“/2019/11/15”.

Since the names are not included in the file paths, these must be specifiedwhen constructing a directory partitioning:

autopart=ds::DirectoryPartitioning::MakeFactory({"year","month","day"});

Directory partitioning also supports providing a full schema rather than inferringtypes from file paths.

Partitioning performance considerations#

Partitioning datasets has two aspects that affect performance: it increases the number offiles and it creates a directory structure around the files. Both of these have benefitsas well as costs. Depending on the configuration and the size of your dataset, the costscan outweigh the benefits.

Because partitions split up the dataset into multiple files, partitioned datasets can beread and written with parallelism. However, each additional file adds a little overhead inprocessing for filesystem interaction. It also increases the overall dataset size sinceeach file has some shared metadata. For example, each parquet file contains the schema andgroup-level statistics. The number of partitions is a floor for the number of files. Ifyou partition a dataset by date with a year of data, you will have at least 365 files. Ifyou further partition by another dimension with 1,000 unique values, you will have up to365,000 files. This fine of partitioning often leads to small files that mostly consist ofmetadata.

Partitioned datasets create nested folder structures, and those allow us to prune whichfiles are loaded in a scan. However, this adds overhead to discovering files in the dataset,as we’ll need to recursively “list directory” to find the data files. Too finepartitions can cause problems here: Partitioning a dataset by date for a years worthof data will require 365 list calls to find all the files; adding another column withcardinality 1,000 will make that 365,365 calls.

The most optimal partitioning layout will depend on your data, access patterns, and whichsystems will be reading the data. Most systems, including Arrow, should work across arange of file sizes and partitioning layouts, but there are extremes you should avoid. Theseguidelines can help avoid some known worst cases:

  • Avoid files smaller than 20MB and larger than 2GB.

  • Avoid partitioning layouts with more than 10,000 distinct partitions.

For file formats that have a notion of groups within a file, such as Parquet, similarguidelines apply. Row groups can provide parallelism when reading and allow data skippingbased on statistics, but very small groups can cause metadata to be a significant portionof file size. Arrow’s file writer provides sensible defaults for group sizing in most cases.

Reading from other data sources#

Reading in-memory data#

If you already have data in memory that you’d like to use with the Datasets API(e.g. to filter/project data, or to write it out to a filesystem), you can wrap itin anarrow::dataset::InMemoryDataset:

autotable=arrow::Table::FromRecordBatches(...);autodataset=std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));// Scan the dataset, filter, it, etc.autoscanner_builder=dataset->NewScan();

In the example, we used the InMemoryDataset to write our example data to localdisk which was used in the rest of the example:

116// Set up a dataset by writing files with partitioning117arrow::Result<std::string>CreateExampleParquetHivePartitionedDataset(118conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){119autobase_path=root_path+"/parquet_dataset";120ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));121// Create an Arrow Table122autoschema=arrow::schema(123{arrow::field("a",arrow::int64()),arrow::field("b",arrow::int64()),124arrow::field("c",arrow::int64()),arrow::field("part",arrow::utf8())});125std::vector<std::shared_ptr<arrow::Array>>arrays(4);126arrow::NumericBuilder<arrow::Int64Type>builder;127ARROW_RETURN_NOT_OK(builder.AppendValues({0,1,2,3,4,5,6,7,8,9}));128ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));129builder.Reset();130ARROW_RETURN_NOT_OK(builder.AppendValues({9,8,7,6,5,4,3,2,1,0}));131ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));132builder.Reset();133ARROW_RETURN_NOT_OK(builder.AppendValues({1,2,1,2,1,2,1,2,1,2}));134ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));135arrow::StringBuilderstring_builder;136ARROW_RETURN_NOT_OK(137string_builder.AppendValues({"a","a","a","a","a","b","b","b","b","b"}));138ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));139autotable=arrow::Table::Make(schema,arrays);140// Write it using Datasets141autodataset=std::make_shared<ds::InMemoryDataset>(table);142ARROW_ASSIGN_OR_RAISE(autoscanner_builder,dataset->NewScan());143ARROW_ASSIGN_OR_RAISE(autoscanner,scanner_builder->Finish());144145// The partition schema determines which fields are part of the partitioning.146autopartition_schema=arrow::schema({arrow::field("part",arrow::utf8())});147// We'll use Hive-style partitioning, which creates directories with "key=value" pairs.148autopartitioning=std::make_shared<ds::HivePartitioning>(partition_schema);149// We'll write Parquet files.150autoformat=std::make_shared<ds::ParquetFileFormat>();151ds::FileSystemDatasetWriteOptionswrite_options;152write_options.file_write_options=format->DefaultWriteOptions();153write_options.filesystem=filesystem;154write_options.base_dir=base_path;155write_options.partitioning=partitioning;156write_options.basename_template="part{i}.parquet";157ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options,scanner));158returnbase_path;159}

Reading from cloud storage#

In addition to local files, Arrow Datasets also support reading from cloudstorage systems, such as Amazon S3, by passing a different filesystem.

See thefilesystem docs for more details on the availablefilesystems.

A note on transactions & ACID guarantees#

The dataset API offers no transaction support or any ACID guarantees. This affectsboth reading and writing. Concurrent reads are fine. Concurrent writes or writesconcurring with reads may have unexpected behavior. Various approaches can be usedto avoid operating on the same files such as using a unique basename template foreach writer, a temporary directory for new files, or separate storage of the filelist instead of relying on directory discovery.

Unexpectedly killing the process while a write is in progress can leave the systemin an inconsistent state. Write calls generally return as soon as the bytes to bewritten have been completely delivered to the OS page cache. Even though a writeoperation has been completed it is possible for part of the file to be lost ifthere is a sudden power loss immediately after the write call.

Most file formats have magic numbers which are written at the end. This means apartial file write can safely be detected and discarded. The CSV file format doesnot have any such concept and a partially written CSV file may be detected as valid.

Full Example#

  1// Licensed to the Apache Software Foundation (ASF) under one  2// or more contributor license agreements. See the NOTICE file  3// distributed with this work for additional information  4// regarding copyright ownership. The ASF licenses this file  5// to you under the Apache License, Version 2.0 (the  6// "License"); you may not use this file except in compliance  7// with the License. You may obtain a copy of the License at  8//  9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, 12// software distributed under the License is distributed on an 13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14// KIND, either express or implied. See the License for the 15// specific language governing permissions and limitations 16// under the License. 17 18// This example showcases various ways to work with Datasets. It's 19// intended to be paired with the documentation. 20 21#include<arrow/api.h> 22#include<arrow/compute/api.h> 23#include<arrow/compute/cast.h> 24#include<arrow/dataset/dataset.h> 25#include<arrow/dataset/discovery.h> 26#include<arrow/dataset/file_base.h> 27#include<arrow/dataset/file_ipc.h> 28#include<arrow/dataset/file_parquet.h> 29#include<arrow/dataset/scanner.h> 30#include<arrow/filesystem/filesystem.h> 31#include<arrow/ipc/writer.h> 32#include<arrow/util/iterator.h> 33#include<parquet/arrow/writer.h> 34#include"arrow/compute/expression.h" 35 36#include<iostream> 37#include<vector> 38 39namespaceds=arrow::dataset; 40namespacefs=arrow::fs; 41namespacecp=arrow::compute; 42 43/** 44 * \brief Run Example 45 * 46 * ./debug/dataset-documentation-example file:///<some_path>/<some_directory> parquet 47 */ 48 49// (Doc section: Reading Datasets) 50// Generate some data for the rest of this example. 51arrow::Result<std::shared_ptr<arrow::Table>>CreateTable(){ 52autoschema= 53arrow::schema({arrow::field("a",arrow::int64()),arrow::field("b",arrow::int64()), 54arrow::field("c",arrow::int64())}); 55std::shared_ptr<arrow::Array>array_a; 56std::shared_ptr<arrow::Array>array_b; 57std::shared_ptr<arrow::Array>array_c; 58arrow::NumericBuilder<arrow::Int64Type>builder; 59ARROW_RETURN_NOT_OK(builder.AppendValues({0,1,2,3,4,5,6,7,8,9})); 60ARROW_RETURN_NOT_OK(builder.Finish(&array_a)); 61builder.Reset(); 62ARROW_RETURN_NOT_OK(builder.AppendValues({9,8,7,6,5,4,3,2,1,0})); 63ARROW_RETURN_NOT_OK(builder.Finish(&array_b)); 64builder.Reset(); 65ARROW_RETURN_NOT_OK(builder.AppendValues({1,2,1,2,1,2,1,2,1,2})); 66ARROW_RETURN_NOT_OK(builder.Finish(&array_c)); 67returnarrow::Table::Make(schema,{array_a,array_b,array_c}); 68} 69 70// Set up a dataset by writing two Parquet files. 71arrow::Result<std::string>CreateExampleParquetDataset( 72conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){ 73autobase_path=root_path+"/parquet_dataset"; 74ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); 75// Create an Arrow Table 76ARROW_ASSIGN_OR_RAISE(autotable,CreateTable()); 77// Write it into two Parquet files 78ARROW_ASSIGN_OR_RAISE(autooutput, 79filesystem->OpenOutputStream(base_path+"/data1.parquet")); 80ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( 81*table->Slice(0,5),arrow::default_memory_pool(),output,/*chunk_size=*/2048)); 82ARROW_ASSIGN_OR_RAISE(output, 83filesystem->OpenOutputStream(base_path+"/data2.parquet")); 84ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( 85*table->Slice(5),arrow::default_memory_pool(),output,/*chunk_size=*/2048)); 86returnbase_path; 87} 88// (Doc section: Reading Datasets) 89 90// (Doc section: Reading different file formats) 91// Set up a dataset by writing two Feather files. 92arrow::Result<std::string>CreateExampleFeatherDataset( 93conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){ 94autobase_path=root_path+"/feather_dataset"; 95ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); 96// Create an Arrow Table 97ARROW_ASSIGN_OR_RAISE(autotable,CreateTable()); 98// Write it into two Feather files 99ARROW_ASSIGN_OR_RAISE(autooutput,100filesystem->OpenOutputStream(base_path+"/data1.feather"));101ARROW_ASSIGN_OR_RAISE(autowriter,102arrow::ipc::MakeFileWriter(output.get(),table->schema()));103ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0,5)));104ARROW_RETURN_NOT_OK(writer->Close());105ARROW_ASSIGN_OR_RAISE(output,106filesystem->OpenOutputStream(base_path+"/data2.feather"));107ARROW_ASSIGN_OR_RAISE(writer,108arrow::ipc::MakeFileWriter(output.get(),table->schema()));109ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));110ARROW_RETURN_NOT_OK(writer->Close());111returnbase_path;112}113// (Doc section: Reading different file formats)114115// (Doc section: Reading and writing partitioned data)116// Set up a dataset by writing files with partitioning117arrow::Result<std::string>CreateExampleParquetHivePartitionedDataset(118conststd::shared_ptr<fs::FileSystem>&filesystem,conststd::string&root_path){119autobase_path=root_path+"/parquet_dataset";120ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));121// Create an Arrow Table122autoschema=arrow::schema(123{arrow::field("a",arrow::int64()),arrow::field("b",arrow::int64()),124arrow::field("c",arrow::int64()),arrow::field("part",arrow::utf8())});125std::vector<std::shared_ptr<arrow::Array>>arrays(4);126arrow::NumericBuilder<arrow::Int64Type>builder;127ARROW_RETURN_NOT_OK(builder.AppendValues({0,1,2,3,4,5,6,7,8,9}));128ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));129builder.Reset();130ARROW_RETURN_NOT_OK(builder.AppendValues({9,8,7,6,5,4,3,2,1,0}));131ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));132builder.Reset();133ARROW_RETURN_NOT_OK(builder.AppendValues({1,2,1,2,1,2,1,2,1,2}));134ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));135arrow::StringBuilderstring_builder;136ARROW_RETURN_NOT_OK(137string_builder.AppendValues({"a","a","a","a","a","b","b","b","b","b"}));138ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));139autotable=arrow::Table::Make(schema,arrays);140// Write it using Datasets141autodataset=std::make_shared<ds::InMemoryDataset>(table);142ARROW_ASSIGN_OR_RAISE(autoscanner_builder,dataset->NewScan());143ARROW_ASSIGN_OR_RAISE(autoscanner,scanner_builder->Finish());144145// The partition schema determines which fields are part of the partitioning.146autopartition_schema=arrow::schema({arrow::field("part",arrow::utf8())});147// We'll use Hive-style partitioning, which creates directories with "key=value" pairs.148autopartitioning=std::make_shared<ds::HivePartitioning>(partition_schema);149// We'll write Parquet files.150autoformat=std::make_shared<ds::ParquetFileFormat>();151ds::FileSystemDatasetWriteOptionswrite_options;152write_options.file_write_options=format->DefaultWriteOptions();153write_options.filesystem=filesystem;154write_options.base_dir=base_path;155write_options.partitioning=partitioning;156write_options.basename_template="part{i}.parquet";157ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options,scanner));158returnbase_path;159}160// (Doc section: Reading and writing partitioned data)161162// (Doc section: Dataset discovery)163// Read the whole dataset with the given format, without partitioning.164arrow::Result<std::shared_ptr<arrow::Table>>ScanWholeDataset(165conststd::shared_ptr<fs::FileSystem>&filesystem,166conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){167// Create a dataset by scanning the filesystem for files168fs::FileSelectorselector;169selector.base_dir=base_dir;170ARROW_ASSIGN_OR_RAISE(171autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,172ds::FileSystemFactoryOptions()));173ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());174// Print out the fragments175ARROW_ASSIGN_OR_RAISE(autofragments,dataset->GetFragments())176for(constauto&fragment:fragments){177std::cout<<"Found fragment: "<<(*fragment)->ToString()<<std::endl;178}179// Read the entire dataset as a Table180ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());181ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());182returnscanner->ToTable();183}184// (Doc section: Dataset discovery)185186// (Doc section: Filtering data)187// Read a dataset, but select only column "b" and only rows where b < 4.188//189// This is useful when you only want a few columns from a dataset. Where possible,190// Datasets will push down the column selection such that less work is done.191arrow::Result<std::shared_ptr<arrow::Table>>FilterAndSelectDataset(192conststd::shared_ptr<fs::FileSystem>&filesystem,193conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){194fs::FileSelectorselector;195selector.base_dir=base_dir;196ARROW_ASSIGN_OR_RAISE(197autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,198ds::FileSystemFactoryOptions()));199ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());200// Read specified columns with a row filter201ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());202ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));203ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"),cp::literal(4))));204ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());205returnscanner->ToTable();206}207// (Doc section: Filtering data)208209// (Doc section: Projecting columns)210// Read a dataset, but with column projection.211//212// This is useful to derive new columns from existing data. For example, here we213// demonstrate casting a column to a different type, and turning a numeric column into a214// boolean column based on a predicate. You could also rename columns or perform215// computations involving multiple columns.216arrow::Result<std::shared_ptr<arrow::Table>>ProjectDataset(217conststd::shared_ptr<fs::FileSystem>&filesystem,218conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){219fs::FileSelectorselector;220selector.base_dir=base_dir;221ARROW_ASSIGN_OR_RAISE(222autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,223ds::FileSystemFactoryOptions()));224ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());225// Read specified columns with a row filter226ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());227ARROW_RETURN_NOT_OK(scan_builder->Project(228{229// Leave column "a" as-is.230cp::field_ref("a"),231// Cast column "b" to float32.232cp::call("cast",{cp::field_ref("b")},233arrow::compute::CastOptions::Safe(arrow::float32())),234// Derive a boolean column from "c".235cp::equal(cp::field_ref("c"),cp::literal(1)),236},237{"a_renamed","b_as_float32","c_1"}));238ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());239returnscanner->ToTable();240}241// (Doc section: Projecting columns)242243// (Doc section: Projecting columns #2)244// Read a dataset, but with column projection.245//246// This time, we read all original columns plus one derived column. This simply combines247// the previous two examples: selecting a subset of columns by name, and deriving new248// columns with an expression.249arrow::Result<std::shared_ptr<arrow::Table>>SelectAndProjectDataset(250conststd::shared_ptr<fs::FileSystem>&filesystem,251conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){252fs::FileSelectorselector;253selector.base_dir=base_dir;254ARROW_ASSIGN_OR_RAISE(255autofactory,ds::FileSystemDatasetFactory::Make(filesystem,selector,format,256ds::FileSystemFactoryOptions()));257ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());258// Read specified columns with a row filter259ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());260std::vector<std::string>names;261std::vector<cp::Expression>exprs;262// Read all the original columns.263for(constauto&field:dataset->schema()->fields()){264names.push_back(field->name());265exprs.push_back(cp::field_ref(field->name()));266}267// Also derive a new column.268names.emplace_back("b_large");269exprs.push_back(cp::greater(cp::field_ref("b"),cp::literal(1)));270ARROW_RETURN_NOT_OK(scan_builder->Project(exprs,names));271ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());272returnscanner->ToTable();273}274// (Doc section: Projecting columns #2)275276// (Doc section: Reading and writing partitioned data #2)277// Read an entire dataset, but with partitioning information.278arrow::Result<std::shared_ptr<arrow::Table>>ScanPartitionedDataset(279conststd::shared_ptr<fs::FileSystem>&filesystem,280conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){281fs::FileSelectorselector;282selector.base_dir=base_dir;283selector.recursive=true;// Make sure to search subdirectories284ds::FileSystemFactoryOptionsoptions;285// We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition286// schema.287options.partitioning=ds::HivePartitioning::MakeFactory();288ARROW_ASSIGN_OR_RAISE(autofactory,ds::FileSystemDatasetFactory::Make(289filesystem,selector,format,options));290ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());291// Print out the fragments292ARROW_ASSIGN_OR_RAISE(autofragments,dataset->GetFragments());293for(constauto&fragment:fragments){294std::cout<<"Found fragment: "<<(*fragment)->ToString()<<std::endl;295std::cout<<"Partition expression: "296<<(*fragment)->partition_expression().ToString()<<std::endl;297}298ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());299ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());300returnscanner->ToTable();301}302// (Doc section: Reading and writing partitioned data #2)303304// (Doc section: Reading and writing partitioned data #3)305// Read an entire dataset, but with partitioning information. Also, filter the dataset on306// the partition values.307arrow::Result<std::shared_ptr<arrow::Table>>FilterPartitionedDataset(308conststd::shared_ptr<fs::FileSystem>&filesystem,309conststd::shared_ptr<ds::FileFormat>&format,conststd::string&base_dir){310fs::FileSelectorselector;311selector.base_dir=base_dir;312selector.recursive=true;313ds::FileSystemFactoryOptionsoptions;314options.partitioning=ds::HivePartitioning::MakeFactory();315ARROW_ASSIGN_OR_RAISE(autofactory,ds::FileSystemDatasetFactory::Make(316filesystem,selector,format,options));317ARROW_ASSIGN_OR_RAISE(autodataset,factory->Finish());318ARROW_ASSIGN_OR_RAISE(autoscan_builder,dataset->NewScan());319// Filter based on the partition values. This will mean that we won't even read the320// files whose partition expressions don't match the filter.321ARROW_RETURN_NOT_OK(322scan_builder->Filter(cp::equal(cp::field_ref("part"),cp::literal("b"))));323ARROW_ASSIGN_OR_RAISE(autoscanner,scan_builder->Finish());324returnscanner->ToTable();325}326// (Doc section: Reading and writing partitioned data #3)327328arrow::StatusRunDatasetDocumentation(conststd::string&format_name,329conststd::string&uri,conststd::string&mode){330ARROW_RETURN_NOT_OK(arrow::compute::Initialize());331332std::stringbase_path;333std::shared_ptr<ds::FileFormat>format;334std::stringroot_path;335ARROW_ASSIGN_OR_RAISE(autofs,fs::FileSystemFromUri(uri,&root_path));336337if(format_name=="feather"){338format=std::make_shared<ds::IpcFileFormat>();339ARROW_ASSIGN_OR_RAISE(base_path,CreateExampleFeatherDataset(fs,root_path));340}elseif(format_name=="parquet"){341format=std::make_shared<ds::ParquetFileFormat>();342ARROW_ASSIGN_OR_RAISE(base_path,CreateExampleParquetDataset(fs,root_path));343}elseif(format_name=="parquet_hive"){344format=std::make_shared<ds::ParquetFileFormat>();345ARROW_ASSIGN_OR_RAISE(base_path,346CreateExampleParquetHivePartitionedDataset(fs,root_path));347}else{348std::cerr<<"Unknown format: "<<format_name<<std::endl;349std::cerr<<"Supported formats: feather, parquet, parquet_hive"<<std::endl;350returnarrow::Status::ExecutionError("Dataset creating failed.");351}352353std::shared_ptr<arrow::Table>table;354if(mode=="no_filter"){355ARROW_ASSIGN_OR_RAISE(table,ScanWholeDataset(fs,format,base_path));356}elseif(mode=="filter"){357ARROW_ASSIGN_OR_RAISE(table,FilterAndSelectDataset(fs,format,base_path));358}elseif(mode=="project"){359ARROW_ASSIGN_OR_RAISE(table,ProjectDataset(fs,format,base_path));360}elseif(mode=="select_project"){361ARROW_ASSIGN_OR_RAISE(table,SelectAndProjectDataset(fs,format,base_path));362}elseif(mode=="partitioned"){363ARROW_ASSIGN_OR_RAISE(table,ScanPartitionedDataset(fs,format,base_path));364}elseif(mode=="filter_partitioned"){365ARROW_ASSIGN_OR_RAISE(table,FilterPartitionedDataset(fs,format,base_path));366}else{367std::cerr<<"Unknown mode: "<<mode<<std::endl;368std::cerr369<<"Supported modes: no_filter, filter, project, select_project, partitioned"370<<std::endl;371returnarrow::Status::ExecutionError("Dataset reading failed.");372}373std::cout<<"Read "<<table->num_rows()<<" rows"<<std::endl;374std::cout<<table->ToString()<<std::endl;375returnarrow::Status::OK();376}377378intmain(intargc,char**argv){379if(argc<3){380// Fake success for CI purposes.381returnEXIT_SUCCESS;382}383384std::stringuri=argv[1];385std::stringformat_name=argv[2];386std::stringmode=argc>3?argv[3]:"no_filter";387388autostatus=RunDatasetDocumentation(format_name,uri,mode);389if(!status.ok()){390std::cerr<<status.ToString()<<std::endl;391returnEXIT_FAILURE;392}393returnEXIT_SUCCESS;394}