Tabular Datasets#

Thepyarrow.dataset module provides functionality to efficiently work withtabular, potentially larger than memory, and multi-file datasets. This includes:

  • A unified interface that supports different sources and file formats anddifferent file systems (local, cloud).

  • Discovery of sources (crawling directories, handle directory-based partitioneddatasets, basic schema normalization, ..)

  • Optimized reading with predicate pushdown (filtering rows), projection(selecting and deriving columns), and optionally parallel reading.

The supported file formats currently are Parquet, Feather / Arrow IPC, CSV andORC (note that ORC datasets can currently only be read and not yet written).The goal is to expand support to other file formats and data sources(e.g. database connections) in the future.

For those familiar with the existingpyarrow.parquet.ParquetDataset forreading Parquet datasets:pyarrow.dataset’s goal is similar but not specificto the Parquet format and not tied to Python: the same datasets API is exposedin the R bindings or Arrow. In additionpyarrow.dataset boasts improvedperformance and new features (e.g. filtering within files rather than only onpartition keys).

Reading Datasets#

For the examples below, let’s create a small dataset consistingof a directory with two parquet files:

In [1]:importtempfileIn [2]:importpathlibIn [3]:importpyarrowaspaIn [4]:importpyarrow.parquetaspqIn [5]:importnumpyasnpIn [6]:base=pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))In [7]:(base/"parquet_dataset").mkdir(exist_ok=True)# creating an Arrow TableIn [8]:table=pa.table({'a':range(10),'b':np.random.randn(10),'c':[1,2]*5})# writing it into two parquet filesIn [9]:pq.write_table(table.slice(0,5),base/"parquet_dataset/data1.parquet")In [10]:pq.write_table(table.slice(5,10),base/"parquet_dataset/data2.parquet")

Dataset discovery#

ADataset object can be created with thedataset() function. Wecan pass it the path to the directory containing the data files:

In [11]:importpyarrow.datasetasdsIn [12]:dataset=ds.dataset(base/"parquet_dataset",format="parquet")In [13]:datasetOut[13]:<pyarrow._dataset.FileSystemDataset at 0x7fe030eacd00>

In addition to searching a base directory,dataset() accepts a path to asingle file or a list of file paths.

Creating aDataset object does not begin reading the data itself. Ifneeded, it only crawls the directory to find all the files:

In [14]:dataset.filesOut[14]:['/tmp/pyarrow-9_qw6tb_/parquet_dataset/data1.parquet', '/tmp/pyarrow-9_qw6tb_/parquet_dataset/data2.parquet']

… and infers the dataset’s schema (by default from the first file):

In [15]:print(dataset.schema.to_string(show_field_metadata=False))a: int64b: doublec: int64

Using theDataset.to_table() method we can read the dataset (or a portionof it) into a pyarrow Table (note that depending on the size of your datasetthis can require a lot of memory, see below on filtering / iterative loading):

In [16]:dataset.to_table()Out[16]:pyarrow.Tablea: int64b: doublec: int64----a: [[0,1,2,3,4],[5,6,7,8,9]]b: [[1.1373880927195317,-0.5829723903558806,1.0058469280348412,-0.7464153609137238,-0.49764206660895927],[-0.9559528016600878,-2.007285802328415,-0.3980382178636434,1.8734612388530516,0.8336648980261276]]c: [[1,2,1,2,1],[2,1,2,1,2]]# converting to pandas to see the contents of the scanned tableIn [17]:dataset.to_table().to_pandas()Out[17]:   a         b  c0  0  1.137388  11  1 -0.582972  22  2  1.005847  13  3 -0.746415  24  4 -0.497642  15  5 -0.955953  26  6 -2.007286  17  7 -0.398038  28  8  1.873461  19  9  0.833665  2

Reading different file formats#

The above examples use Parquet files as dataset sources but the Dataset APIprovides a consistent interface across multiple file formats and filesystems.Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats aresupported; more formats are planned in the future.

If we save the table as Feather files instead of Parquet files:

In [18]:importpyarrow.featherasfeatherIn [19]:feather.write_feather(table,base/"data.feather")

…then we can read the Feather file using the same functions, but with specifyingformat="feather":

In [20]:dataset=ds.dataset(base/"data.feather",format="feather")In [21]:dataset.to_table().to_pandas().head()Out[21]:   a         b  c0  0  1.137388  11  1 -0.582972  22  2  1.005847  13  3 -0.746415  24  4 -0.497642  1

Customizing file formats#

The format name as a string, like:

ds.dataset(...,format="parquet")

is short hand for a default constructedParquetFileFormat:

ds.dataset(...,format=ds.ParquetFileFormat())

TheFileFormat objects can be customized using keywords. For example:

parquet_format=ds.ParquetFileFormat(read_options={'dictionary_columns':['a']})ds.dataset(...,format=parquet_format)

Will configure column"a" to be dictionary encoded on scan.

Filtering data#

To avoid reading all data when only needing a subset, thecolumns andfilter keywords can be used.

Thecolumns keyword can be used to only read the specified columns:

In [22]:dataset=ds.dataset(base/"parquet_dataset",format="parquet")In [23]:dataset.to_table(columns=['a','b']).to_pandas()Out[23]:   a         b0  0  1.1373881  1 -0.5829722  2  1.0058473  3 -0.7464154  4 -0.4976425  5 -0.9559536  6 -2.0072867  7 -0.3980388  8  1.8734619  9  0.833665

With thefilter keyword, rows which do not match the filter predicate willnot be included in the returned table. The keyword expects a booleanExpression referencing at least one of the columns:

In [24]:dataset.to_table(filter=ds.field('a')>=7).to_pandas()Out[24]:   a         b  c0  7 -0.398038  21  8  1.873461  12  9  0.833665  2In [25]:dataset.to_table(filter=ds.field('c')==2).to_pandas()Out[25]:   a         b  c0  1 -0.582972  21  3 -0.746415  22  5 -0.955953  23  7 -0.398038  24  9  0.833665  2

The easiest way to construct thoseExpression objects is by using thefield() helper function. Any column - not just partition columns - can bereferenced using thefield() function (which creates aFieldExpression). Operator overloads are provided to compose filtersincluding the comparisons (equal, larger/less than, etc), set membershiptesting, and boolean combinations (&,|,~):

In [26]:ds.field('a')!=3Out[26]:<pyarrow.compute.Expression (a != 3)>In [27]:ds.field('a').isin([1,2,3])Out[27]:<pyarrow.compute.Expression is_in(a, {value_set=int64:[  1,  2,  3], null_matching_behavior=MATCH})>In [28]:(ds.field('a')>ds.field('b'))&(ds.field('b')>1)Out[28]:<pyarrow.compute.Expression ((a > b) and (b > 1))>

Note thatExpression objects cannot be combined by python logicaloperatorsand,or andnot.

Projecting columns#

Thecolumns keyword can be used to read a subset of the columns of thedataset by passing it a list of column names. The keyword can also be usedfor more complex projections in combination with expressions.

In this case, we pass it a dictionary with the keys being the resultingcolumn names and the values the expression that is used to construct the columnvalues:

In [29]:projection={   ....:"a_renamed":ds.field("a"),   ....:"b_as_float32":ds.field("b").cast("float32"),   ....:"c_1":ds.field("c")==1,   ....:}   ....:In [30]:dataset.to_table(columns=projection).to_pandas().head()Out[30]:   a_renamed  b_as_float32    c_10          0      1.137388   True1          1     -0.582972  False2          2      1.005847   True3          3     -0.746415  False4          4     -0.497642   True

The dictionary also determines the column selection (only the keys in thedictionary will be present as columns in the resulting table). If you wantto include a derived column inaddition to the existing columns, you canbuild up the dictionary from the dataset schema:

In [31]:projection={col:ds.field(col)forcolindataset.schema.names}In [32]:projection.update({"b_large":ds.field("b")>1})In [33]:dataset.to_table(columns=projection).to_pandas().head()Out[33]:   a         b  c  b_large0  0  1.137388  1     True1  1 -0.582972  2    False2  2  1.005847  1     True3  3 -0.746415  2    False4  4 -0.497642  1    False

Reading partitioned data#

Above, a dataset consisting of a flat directory with files was shown. However, adataset can exploit a nested directory structure defining a partitioned dataset,where the sub-directory names hold information about which subset of the data isstored in that directory.

For example, a dataset partitioned by year and month may look like on disk:

dataset_name/  year=2007/    month=01/       data0.parquet       data1.parquet       ...    month=02/       data0.parquet       data1.parquet       ...    month=03/    ...  year=2008/    month=01/    ...  ...

The above partitioning scheme is using “/key=value/” directory names, as foundin Apache Hive.

Let’s create a small partitioned dataset. Thewrite_to_dataset()function can write such hive-like partitioned datasets.

In [34]:table=pa.table({'a':range(10),'b':np.random.randn(10),'c':[1,2]*5,   ....:'part':['a']*5+['b']*5})   ....:In [35]:pq.write_to_dataset(table,"parquet_dataset_partitioned",   ....:partition_cols=['part'])   ....:

The above created a directory with two subdirectories (“part=a” and “part=b”),and the Parquet files written in those directories no longer include the “part”column.

Reading this dataset withdataset(), we now specify that the datasetshould use a hive-like partitioning scheme with thepartitioning keyword:

In [36]:dataset=ds.dataset("parquet_dataset_partitioned",format="parquet",   ....:partitioning="hive")   ....:In [37]:dataset.filesOut[37]:['parquet_dataset_partitioned/part=a/1ee1e9045ad24a2ba5a45b7f1b7e3d05-0.parquet', 'parquet_dataset_partitioned/part=b/1ee1e9045ad24a2ba5a45b7f1b7e3d05-0.parquet']

Although the partition fields are not included in the actual Parquet files,they will be added back to the resulting table when scanning this dataset:

In [38]:dataset.to_table().to_pandas().head(3)Out[38]:   a         b  c part0  0 -0.459856  1    a1  1  1.137802  2    a2  2  0.160218  1    a

We can now filter on the partition keys, which avoids loading filesaltogether if they do not match the filter:

In [39]:dataset.to_table(filter=ds.field("part")=="b").to_pandas()Out[39]:   a         b  c part0  5  0.129500  2    b1  6  0.754953  1    b2  7 -1.534345  2    b3  8 -1.097663  1    b4  9 -0.558693  2    b

Different partitioning schemes#

The above example uses a hive-like directory scheme, such as “/year=2009/month=11/day=15”.We specified this passing thepartitioning="hive" keyword. In this case,the types of the partition keys are inferred from the file paths.

It is also possible to explicitly define the schema of the partition keysusing thepartitioning() function. For example:

part=ds.partitioning(pa.schema([("year",pa.int16()),("month",pa.int8()),("day",pa.int32())]),flavor="hive")dataset=ds.dataset(...,partitioning=part)

“Directory partitioning” is also supported, where the segments in the file pathrepresent the values of the partition keys without including the name (thefield name are implicit in the segment’s index). For example, given field names“year”, “month”, and “day”, one path might be “/2019/11/15”.

Since the names are not included in the file paths, these must be specifiedwhen constructing a directory partitioning:

part=ds.partitioning(field_names=["year","month","day"])

Directory partitioning also supports providing a full schema rather than inferringtypes from file paths.

Reading from cloud storage#

In addition to local files, pyarrow also supports reading from cloud storage.Currently,HDFS andAmazonS3-compatiblestorage are supported.

When passing a file URI, the file system will be inferred. For example,specifying a S3 path:

dataset=ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/")

Typically, you will want to customize the connection parameters, and thena file system object can be created and passed to thefilesystem keyword:

frompyarrowimportfss3=fs.S3FileSystem(region="us-east-2")dataset=ds.dataset("voltrondata-labs-datasets/nyc-taxi/",filesystem=s3)

The currently available classes areS3FileSystem andHadoopFileSystem. See theFilesystem Interface docs for moredetails.

Reading from Minio#

In addition to cloud storage, pyarrow also supports reading from aMinIO object storage instance emulating S3APIs. Paired withtoxiproxy, this isuseful for testing or benchmarking.

frompyarrowimportfs# By default, MinIO will listen for unencrypted HTTP traffic.minio=fs.S3FileSystem(scheme="http",endpoint_override="localhost:9000")dataset=ds.dataset("voltrondata-labs-datasets/nyc-taxi/",filesystem=minio)

Working with Parquet Datasets#

While the Datasets API provides a unified interface to different file formats,some specific methods exist for Parquet Datasets.

Some processing frameworks such as Dask (optionally) use a_metadata filewith partitioned datasets which includes information about the schema and therow group metadata of the full dataset. Using such a file can give a moreefficient creation of a parquet Dataset, since it does not need to infer theschema and crawl the directories for all Parquet files (this is especially thecase for filesystems where accessing files is expensive). Theparquet_dataset() function allows us to create a Dataset from a partitioneddataset with a_metadata file:

dataset=ds.parquet_dataset("/path/to/dir/_metadata")

By default, the constructedDataset object for Parquet datasets mapseach fragment to a single Parquet file. If you want fragments mapping to eachrow group of a Parquet file, you can use thesplit_by_row_group() method ofthe fragments:

fragments=list(dataset.get_fragments())fragments[0].split_by_row_group()

This method returns a list of new Fragments mapping to each row group ofthe original Fragment (Parquet file). Bothget_fragments() andsplit_by_row_group() accept an optional filter expression to get afiltered list of fragments.

Manual specification of the Dataset#

Thedataset() function allows easy creation of a Dataset viewing a directory,crawling all subdirectories for files and partitioning information. Howeversometimes discovery is not required and the dataset’s files and partitionsare already known (for example, when this information is stored in metadata).In this case it is possible to create a Dataset explicitly without anyautomatic discovery or inference.

For the example here, we are going to use a dataset where the file names containadditional partitioning information:

# creating a dummy dataset: directory with two filesIn [40]:table=pa.table({'col1':range(3),'col2':np.random.randn(3)})In [41]:(base/"parquet_dataset_manual").mkdir(exist_ok=True)In [42]:pq.write_table(table,base/"parquet_dataset_manual"/"data_2018.parquet")In [43]:pq.write_table(table,base/"parquet_dataset_manual"/"data_2019.parquet")

To create a Dataset from a list of files, we need to specify the paths, schema,format, filesystem, and partition expressions manually:

In [44]:frompyarrowimportfsIn [45]:schema=pa.schema([("year",pa.int64()),("col1",pa.int64()),("col2",pa.float64())])In [46]:dataset=ds.FileSystemDataset.from_paths(   ....:["data_2018.parquet","data_2019.parquet"],schema=schema,format=ds.ParquetFileFormat(),   ....:filesystem=fs.SubTreeFileSystem(str(base/"parquet_dataset_manual"),fs.LocalFileSystem()),   ....:partitions=[ds.field('year')==2018,ds.field('year')==2019])   ....:

Since we specified the “partition expressions” for our files, this informationis materialized as columns when reading the data and can be used for filtering:

In [47]:dataset.to_table().to_pandas()Out[47]:   year  col1      col20  2018     0 -0.9776471  2018     1  0.2002122  2018     2  1.1125893  2019     0 -0.9776474  2019     1  0.2002125  2019     2  1.112589In [48]:dataset.to_table(filter=ds.field('year')==2019).to_pandas()Out[48]:   year  col1      col20  2019     0 -0.9776471  2019     1  0.2002122  2019     2  1.112589

Another benefit of manually listing the files is that the order of the filescontrols the order of the data. When performing an ordered read (or a read toa table) then the rows returned will match the order of the files given. Thisonly applies when the dataset is constructed with a list of files. Thereare no order guarantees given when the files are instead discovered by scanninga directory.

Iterative (out of core or streaming) reads#

The previous examples have demonstrated how to read the data into a table usingto_table(). This isuseful if the dataset is small or there is only a small amount of data that needs tobe read. The dataset API contains additional methods to read and process large amountsof data in a streaming fashion.

The easiest way to do this is to use the methodDataset.to_batches(). Thismethod returns an iterator of record batches. For example, we can use this method tocalculate the average of a column without loading the entire column into memory:

In [49]:importpyarrow.computeaspcIn [50]:col2_sum=0In [51]:count=0In [52]:forbatchindataset.to_batches(columns=["col2"],filter=~ds.field("col2").is_null()):   ....:col2_sum+=pc.sum(batch.column("col2")).as_py()   ....:count+=batch.num_rows   ....:In [53]:mean_a=col2_sum/count

Customizing the batch size#

An iterative read of a dataset is often called a “scan” of the dataset and pyarrowuses an object called aScanner to do this. A Scanner is created for youautomatically by theto_table() andto_batches() method of the dataset.Any arguments you pass to these methods will be passed on to the Scanner constructor.

One of those parameters is thebatch_size. This controls the maximum size of thebatches returned by the scanner. Batches can still be smaller than thebatch_sizeif the dataset consists of small files or those files themselves consist of smallrow groups. For example, a parquet file with 10,000 rows per row group will yieldbatches with, at most, 10,000 rows unless thebatch_size is set to a smaller value.

The default batch size is one million rows and this is typically a good default butyou may want to customize it if you are reading a large number of columns.

A note on transactions & ACID guarantees#

The dataset API offers no transaction support or any ACID guarantees. This affectsboth reading and writing. Concurrent reads are fine. Concurrent writes or writesconcurring with reads may have unexpected behavior. Various approaches can be usedto avoid operating on the same files such as using a unique basename template foreach writer, a temporary directory for new files, or separate storage of the filelist instead of relying on directory discovery.

Unexpectedly killing the process while a write is in progress can leave the systemin an inconsistent state. Write calls generally return as soon as the bytes to bewritten have been completely delivered to the OS page cache. Even though a writeoperation has been completed it is possible for part of the file to be lost ifthere is a sudden power loss immediately after the write call.

Most file formats have magic numbers which are written at the end. This means apartial file write can safely be detected and discarded. The CSV file format doesnot have any such concept and a partially written CSV file may be detected as valid.

Writing Datasets#

The dataset API also simplifies writing data to a dataset usingwrite_dataset() . This can be useful whenyou want to partition your data or you need to write a large amount of data. Abasic dataset write is similar to writing a table except that you specify a directoryinstead of a filename.

In [54]:table=pa.table({"a":range(10),"b":np.random.randn(10),"c":[1,2]*5})In [55]:ds.write_dataset(table,"sample_dataset",format="parquet")

The above example will create a single file named part-0.parquet in our sample_datasetdirectory.

Warning

If you run the example again it will replace the existing part-0.parquet file.Appending files to an existing dataset requires specifying a newbasename_template for each call tods.write_datasetto avoid overwrite.

Writing partitioned data#

A partitioning object can be used to specify how your output data should be partitioned.This uses the same kind of partitioning objects we used for reading datasets. To writeour above data out to a partitioned directory we only need to specify how we want thedataset to be partitioned. For example:

In [56]:part=ds.partitioning(   ....:pa.schema([("c",pa.int16())]),flavor="hive"   ....:)   ....:In [57]:ds.write_dataset(table,"partitioned_dataset",format="parquet",partitioning=part)

This will create two files. Half our data will be in the dataset_root/c=1 directory andthe other half will be in the dataset_root/c=2 directory.

Partitioning performance considerations#

Partitioning datasets has two aspects that affect performance: it increases the number offiles and it creates a directory structure around the files. Both of these have benefitsas well as costs. Depending on the configuration and the size of your dataset, the costscan outweigh the benefits.

Because partitions split up the dataset into multiple files, partitioned datasets can beread and written with parallelism. However, each additional file adds a little overhead inprocessing for filesystem interaction. It also increases the overall dataset size sinceeach file has some shared metadata. For example, each parquet file contains the schema andgroup-level statistics. The number of partitions is a floor for the number of files. Ifyou partition a dataset by date with a year of data, you will have at least 365 files. Ifyou further partition by another dimension with 1,000 unique values, you will have up to365,000 files. This fine of partitioning often leads to small files that mostly consist ofmetadata.

Partitioned datasets create nested folder structures, and those allow us to prune whichfiles are loaded in a scan. However, this adds overhead to discovering files in the dataset,as we’ll need to recursively “list directory” to find the data files. Too finepartitions can cause problems here: Partitioning a dataset by date for a years worthof data will require 365 list calls to find all the files; adding another column withcardinality 1,000 will make that 365,365 calls.

The most optimal partitioning layout will depend on your data, access patterns, and whichsystems will be reading the data. Most systems, including Arrow, should work across arange of file sizes and partitioning layouts, but there are extremes you should avoid. Theseguidelines can help avoid some known worst cases:

  • Avoid files smaller than 20MB and larger than 2GB.

  • Avoid partitioning layouts with more than 10,000 distinct partitions.

For file formats that have a notion of groups within a file, such as Parquet, similarguidelines apply. Row groups can provide parallelism when reading and allow data skippingbased on statistics, but very small groups can cause metadata to be a significant portionof file size. Arrow’s file writer provides sensible defaults for group sizing in most cases.

Configuring files open during a write#

When writing data to the disk, there are a few parameters that can beimportant to optimize the writes, such as the number of rows per file andthe maximum number of open files allowed during the write.

Set the maximum number of files opened with themax_open_files parameter ofwrite_dataset().

Ifmax_open_files is set greater than 0 then this will limit the maximumnumber of files that can be left open. This only applies to writing partitioneddatasets, where rows are dispatched to the appropriate file depending on theirpartition values. If an attempt is made to open too many files then the leastrecently used file will be closed. If this setting is set too low you may endup fragmenting your data into many small files.

If your process is concurrently using other file handlers, either with adataset scanner or otherwise, you may hit a system file handler limit. Forexample, if you are scanning a dataset with 300 files and writing out to900 files, the total of 1200 files may be over a system limit. (On Linux,this might be a “Too Many Open Files” error.) You can either reduce thismax_open_files setting or increase the file handler limit on yoursystem. The default value is 900 which allows some number of filesto be open by the scanner before hitting the default Linux limit of 1024.

Another important configuration used inwrite_dataset() ismax_rows_per_file.

Set the maximum number of rows written in each file with themax_rows_per_filesparameter ofwrite_dataset().

Ifmax_rows_per_file is set greater than 0 then this will limit how manyrows are placed in any single file. Otherwise there will be no limit and onefile will be created in each output directory unless files need to be closed to respectmax_open_files. This setting is the primary way to control file size.For workloads writing a lot of data, files can get very large without arow count cap, leading to out-of-memory errors in downstream readers. Therelationship between row count and file size depends on the dataset schemaand how well compressed (if at all) the data is.

Configuring rows per group during a write#

The volume of data written to the disk per each group can be configured.This configuration includes a lower and an upper bound.The minimum number of rows required to form a row group isdefined with themin_rows_per_group parameter ofwrite_dataset().

Note

Ifmin_rows_per_group is set greater than 0 then this will cause thedataset writer to batch incoming data and only write the row groups to thedisk when sufficient rows have accumulated. The final row group size may beless than this value if other options such asmax_open_files ormax_rows_per_file force smaller row group sizes.

The maximum number of rows allowed per group is defined with themax_rows_per_group parameter ofwrite_dataset().

Ifmax_rows_per_group is set greater than 0 then the dataset writer may splitup large incoming batches into multiple row groups. If this value is set thenmin_rows_per_group should also be set or else you may end up with very smallrow groups (e.g. if the incoming row group size is just barely larger than this value).

Row groups are built into the Parquet and IPC/Feather formats but don’t affect JSON or CSV.When reading back Parquet and IPC formats in Arrow, the row group boundaries become therecord batch boundaries, determining the default batch size of downstream readers.Additionally, row groups in Parquet files have column statistics which can help readersskip irrelevant data but can add size to the file. As an extreme example, if one setsmax_rows_per_group=1 in Parquet, they will have large files because most of the fileswill be row group statistics.

Writing large amounts of data#

The above examples wrote data from a table. If you are writing a large amount of datayou may not be able to load everything into a single in-memory table. Fortunately, thewrite_dataset() method also accepts an iterable of record batches. This makes it reallysimple, for example, to repartition a large dataset without loading the entire datasetinto memory:

In [58]:old_part=ds.partitioning(   ....:pa.schema([("c",pa.int16())]),flavor="hive"   ....:)   ....:In [59]:new_part=ds.partitioning(   ....:pa.schema([("c",pa.int16())]),flavor=None   ....:)   ....:In [60]:input_dataset=ds.dataset("partitioned_dataset",partitioning=old_part)# A scanner can act as an iterator of record batches but you could also receive# data from the network (e.g. via flight), from your own scanning, or from any# other method that yields record batches.  In addition, you can pass a dataset# into write_dataset directly but this method is useful if you want to customize# the scanner (e.g. to filter the input dataset or set a maximum batch size)In [61]:scanner=input_dataset.scanner()In [62]:ds.write_dataset(scanner,"repartitioned_dataset",format="parquet",partitioning=new_part)

After the above example runs our data will be in dataset_root/1 and dataset_root/2directories. In this simple example we are not changing the structure of the data(only the directory naming schema) but you could also use this mechanism to changewhich columns are used to partition the dataset. This is useful when you expect toquery your data in specific ways and you can utilize partitioning to reduce theamount of data you need to read.

Customizing & inspecting written files#

By default the dataset API will create files named “part-i.format” where “i” is a integergenerated during the write and “format” is the file format specified in the write_datasetcall. For simple datasets it may be possible to know which files will be created but forlarger or partitioned datasets it is not so easy. Thefile_visitor keyword can be usedto supply a visitor that will be called as each file is created:

In [63]:deffile_visitor(written_file):   ....:print(f"path={written_file.path}")   ....:print(f"size={written_file.size} bytes")   ....:print(f"metadata={written_file.metadata}")   ....:
In [64]:ds.write_dataset(table,"dataset_visited",format="parquet",partitioning=part,   ....:file_visitor=file_visitor)   ....:path=dataset_visited/c=1/part-0.parquetsize=815 bytesmetadata=<pyarrow._parquet.FileMetaData object at 0x7fe02cd5c090>  created_by: parquet-cpp-arrow version 22.0.0  num_columns: 2  num_rows: 5  num_row_groups: 1  format_version: 2.6  serialized_size: 0path=dataset_visited/c=2/part-0.parquetsize=817 bytesmetadata=<pyarrow._parquet.FileMetaData object at 0x7fe02cd07830>  created_by: parquet-cpp-arrow version 22.0.0  num_columns: 2  num_rows: 5  num_row_groups: 1  format_version: 2.6  serialized_size: 0

This will allow you to collect the filenames that belong to the dataset and store them elsewherewhich can be useful when you want to avoid scanning directories the next time you need to readthe data. It can also be used to generate the _metadata index file used by other tools such asDask or Spark to create an index of the dataset.

Configuring format-specific parameters during a write#

In addition to the common options shared by all formats there are also format specific optionsthat are unique to a particular format. For example, to allow truncated timestamps while writingParquet files:

In [65]:parquet_format=ds.ParquetFileFormat()In [66]:write_options=parquet_format.make_write_options(allow_truncated_timestamps=True)In [67]:ds.write_dataset(table,"sample_dataset2",format="parquet",partitioning=part,   ....:file_options=write_options)   ....: