Movatterモバイル変換


[0]ホーム

URL:


Skip to contents

Working with multi-file data sets

Source:vignettes/dataset.Rmd
dataset.Rmd

Apache Arrow lets you work efficiently with single and multi-filedata sets even when that data set is too large to be loaded into memory.With the help of Arrow Dataset objects you can analyze this kind of datausing familiardplyr syntax.This article introduces Datasets and shows you how to analyze them withdplyr and arrow: we’ll start by ensuring both packages are loaded

library(arrow, warn.conflicts=FALSE)library(dplyr, warn.conflicts=FALSE)

Example: NYC taxi data

The primary motivation for Arrow’s Datasets object is to allow usersto analyze extremely large datasets. As an example, consider theNewYork City taxi trip record data that is widely used in big dataexercises and competitions. To demonstrate the capabilities of ApacheArrow we host a Parquet-formatted version this data in a public AmazonS3 bucket: in its full form, our version of the data set is one verylarge table with about 1.7 billion rows and 24 columns, where each rowcorresponds to a single taxi ride sometime between 2009 and 2022. Adatadictionary for this version of the NYC taxi data is alsoavailable.

This multi-file data set is comprised of 158 distinct Parquet files,each corresponding to a month of data. A single file is typically around400-500MB in size, and the full data set is about 70GB in size. It isnot a small data set – it is slow to download and does not fit in memoryon a typical machine 🙂 – so we also host a “tiny” version of the NYCtaxi data that is formatted in exactly the same way but includes onlyone out of every thousand entries in the original data set (i.e.,individual files are <1MB in size, and the “tiny” data set is only70MB)

If you have Amazon S3 support enabled in arrow (true for most users;see links at the end of this article if you need to troubleshoot this),you can connect to a copy of the “tiny taxi data” stored on S3 with thiscommand:

bucket<-s3_bucket("voltrondata-labs-datasets/nyc-taxi-tiny")

Alternatively you could connect to a copy of the data on Google CloudStorage (GCS) using the following command:

bucket<-gs_bucket("voltrondata-labs-datasets/nyc-taxi-tiny", anonymous=TRUE)

If you want to use the full data set, replacenyc-taxi-tiny withnyc-taxi in the code above.Apart from size – and with it the cost in time, bandwidth usage, and CPUcycles – there is no difference in the two versions of the data: you cantest your code using the tiny taxi data and then check how it scalesusing the full data set.

To make a local copy of the data set stored in thebucket to a folder called"nyc-taxi", use thecopy_files() function:

copy_files(from=bucket, to="nyc-taxi")

For the purposes of this article, we assume that the NYC taxi dataset(either the full data or the tiny version) has been downloaded locallyand exists in an"nyc-taxi" directory.

Opening Datasets

The first step in the process is to create a Dataset object thatpoints at the data directory:

ds<-open_dataset("nyc-taxi")

It is important to note that when we do this, the data values are notloaded into memory. Instead, Arrow scans the data directory to findrelevant files, parses the file paths looking for a “Hive-stylepartitioning” (see below), and reads headers of the data files toconstruct a Schema that contains metadata describing the structure ofthe data. For more information about Schemas see themetadata article.

Two questions naturally follow from this: what kind of files doesopen_dataset() look for, and what structure does it expectto find in the file paths? Let’s start by looking at the file types.

By defaultopen_dataset() looks for Parquet files butyou can override this using theformat argument. Forexample if the data were encoded as CSV files we could setformat = "csv" to connect to the data. The Arrow Datasetinterface supports several file formats including:

  • "parquet" (the default)
  • "feather" or"ipc" (aliases for"arrow"; as Feather version 2 is the Arrow fileformat)
  • "csv" (comma-delimited files) and"tsv"(tab-delimited files)
  • "text" (generic text-delimited files - use thedelimiter argument to specify which to use)

In the case of text files, you can pass the following parsing optionstoopen_dataset() to ensure that files are readcorrectly:

  • delim
  • quote
  • escape_double
  • escape_backslash
  • skip_empty_rows

An alternative when working with text files is to useopen_delim_dataset(),open_csv_dataset(), oropen_tsv_dataset(). These functions are wrappers aroundopen_dataset() but with parameters that mirrorread_csv_arrow(),read_delim_arrow(), andread_tsv_arrow() to allow for easy switching betweenfunctions for opening single files and functions for openingdatasets.

For example:

ds<-open_csv_dataset("nyc-taxi/csv/")

For more information on these arguments and on parsing delimited textfiles generally, see the help documentation forread_delim_arrow() andopen_delim_dataset().

Next, what information doesopen_dataset() expect tofind in the file paths? By default, the Dataset interface looks forHive-style partitioning structure inwhich folders are named using a “key=value” convention, and data filesin a folder contain the subset of the data for which the key has therelevant value. For example, in the NYC taxi data file paths look likethis:

year=2009/month=1/part-0.parquetyear=2009/month=2/part-0.parquet...

From this,open_dataset() infers that the first listedParquet file contains the data for January 2009. In that sense, ahive-style partitioning is self-describing: the folder names stateexplicitly how the Dataset has been split across files.

Sometimes the directory partitioning isn’t self describing; that is,it doesn’t contain field names. For example, suppose the NYC taxi dataused file paths like these:

2009/01/part-0.parquet2009/02/part-0.parquet...

In that case,open_dataset() would need some hints as tohow to use the file paths. In this case, you could providec("year", "month") to thepartitioningargument, saying that the first path segment gives the value foryear, and the second segment ismonth. Everyrow in2009/01/part-0.parquet has a value of 2009 foryear and 1 formonth, even though thosecolumns may not be present in the file. In other words, we would openthe data like this:

ds<-open_dataset("nyc-taxi", partitioning=c("year","month"))

Either way, when you look at the Dataset, you can see that inaddition to the columns present in every file, there are also columnsyear andmonth. These columns are not presentin the files themselves: they are inferred from the partitioningstructure.

ds
#### FileSystemDataset with 158 Parquet files## vendor_name: string## pickup_datetime: timestamp[ms]## dropoff_datetime: timestamp[ms]## passenger_count: int64## trip_distance: double## pickup_longitude: double## pickup_latitude: double## rate_code: string## store_and_fwd: string## dropoff_longitude: double## dropoff_latitude: double## payment_type: string## fare_amount: double## extra: double## mta_tax: double## tip_amount: double## tolls_amount: double## total_amount: double## improvement_surcharge: double## congestion_surcharge: double## pickup_location_id: int64## dropoff_location_id: int64## year: int32## month: int32

Querying Datasets

Now that we have a Dataset object that refers to our data, we canconstruct dplyr-style queries. This is possible because arrow supplies aback end that allows users to manipulate tabular Arrow data using dplyrverbs. Here’s an example: suppose you are curious about tipping behaviorin the longest taxi rides. Let’s find the median tip percentage forrides with fares greater than $100 in 2015, broken down by the number ofpassengers:

system.time(ds|>filter(total_amount>100,year==2015)|>select(tip_amount,total_amount,passenger_count)|>mutate(tip_pct=100*tip_amount/total_amount)|>group_by(passenger_count)|>summarise(    median_tip_pct=median(tip_pct),    n=n())|>collect()|>print())
#### # A tibble: 10 x 3##    passenger_count median_tip_pct      n##              <int>          <dbl>  <int>##  1               1           16.6 143087##  2               2           16.2  34418##  3               5           16.7   5806##  4               4           11.4   4771##  5               6           16.7   3338##  6               3           14.6   8922##  7               0           10.1    380##  8               8           16.7     32##  9               9           16.7     42## 10               7           16.7     11####    user  system elapsed##   4.436   1.012   1.402

You’ve just selected a subset from a Dataset that contains around 2billion rows, computed a new column, and aggregated it. All within a fewseconds on a modern laptop. How does this work?

There are three reasons arrow can accomplish this task soquickly:

First, arrow adopts a lazy evaluation approach to queries: when dplyrverbs are called on the Dataset, they record their actions but do notevaluate those actions on the data until you runcollect().We can see this by taking the same code as before and leaving off thefinal step:

ds|>filter(total_amount>100,year==2015)|>select(tip_amount,total_amount,passenger_count)|>mutate(tip_pct=100*tip_amount/total_amount)|>group_by(passenger_count)|>summarise(    median_tip_pct=median(tip_pct),    n=n())
#### FileSystemDataset (query)## passenger_count: int64## median_tip_pct: double## n: int32#### See $.data for the source Arrow object

This version of the code returns an output instantly and shows themanipulations you’ve made, without loading data from the files. Becausethe evaluation of these queries is deferred, you can build up a querythat selects down to a small subset without generating intermediate datasets that could potentially be large.

Second, all work is pushed down to the individual data files, anddepending on the file format, chunks of data within files. As a result,you can select a subset of data from a much larger data set bycollecting the smaller slices from each file: you don’t have to load thewhole data set in memory to slice from it.

Third, because of partitioning, you can ignore some files entirely.In this example, by filteringyear == 2015, all filescorresponding to other years are immediately excluded: you don’t have toload them in order to find that no rows match the filter. For Parquetfiles – which contain row groups with statistics on the data containedwithin groups – there may be entire chunks of data you can avoidscanning because they have no rows wheretotal_amount > 100.

One final thing to note about querying Datasets. Suppose you attemptto call unsupported dplyr verbs or unimplemented functions in your queryon an Arrow Dataset. In that case, the arrow package raises an error.However, for dplyr queries on Arrow Table objects (which are alreadyin-memory), the package automatically callscollect()before processing that dplyr verb. To learn more about the dplyr backend, see thedata wranglingarticle.

Batch processing (experimental)

Sometimes you want to run R code on the entire Dataset, but thatDataset is much larger than memory. You can usemap_batcheson a Dataset query to process it batch-by-batch.

Note:map_batches is experimental andnot recommended for production use.

As an example, to randomly sample a Dataset, usemap_batches to sample a percentage of rows from eachbatch:

sampled_data<-ds|>filter(year==2015)|>select(tip_amount,total_amount,passenger_count)|>map_batches(~as_record_batch(sample_frac(as.data.frame(.),1e-4)))|>mutate(tip_pct=tip_amount/total_amount)|>collect()str(sampled_data)
#### tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)##  $ tip_amount     : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...##  $ total_amount   : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...##  $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...##  $ tip_pct        : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...

This function can also be used to aggregate summary statistics over aDataset by computing partial results for each batch and then aggregatingthose partial results. Extending the example above, you could fit amodel to the sample data and then usemap_batches tocompute the MSE on the full Dataset.

model<-lm(tip_pct~total_amount+passenger_count, data=sampled_data)ds|>filter(year==2015)|>select(tip_amount,total_amount,passenger_count)|>mutate(tip_pct=tip_amount/total_amount)|>map_batches(function(batch){batch|>as.data.frame()|>mutate(pred_tip_pct=predict(model, newdata=.))|>filter(!is.nan(tip_pct))|>summarize(sse_partial=sum((pred_tip_pct-tip_pct)^2), n_partial=n())|>as_record_batch()})|>summarize(mse=sum(sse_partial)/sum(n_partial))|>pull(mse)
#### [1] 0.1304284

Dataset options

There are a few ways you can control the Dataset creation to adapt tospecial use cases.

Work with files in a directory

If you are working with a single file or a set of files that are notall in the same directory, you can provide a file path or a vector ofmultiple file paths toopen_dataset(). This is useful if,for example, you have a single CSV file that is too big to read intomemory. You could pass the file path toopen_dataset(), usegroup_by() to partition the Dataset into manageable chunks,then usewrite_dataset() to write each chunk to a separateParquet file—all without needing to read the full CSV file into R.

Explicitly declare column names and data types

You can specify theschema argument toopen_dataset() to declare the columns and their data types.This is useful if you have data files that have different storage schema(for example, a column could beint32 in one andint8 in another) and you want to ensure that the resultingDataset has a specific type.

To be clear, it’s not necessary to specify a schema, even in thisexample of mixed integer types, because the Dataset constructor willreconcile differences like these. The schema specification just lets youdeclare what you want the result to be.

Explicitly declare partition format

Similarly, you can provide a Schema in thepartitioningargument ofopen_dataset() in order to declare the types ofthe virtual columns that define the partitions. This would be useful, inthe NYC taxi data example, if you wanted to keepmonth as astring instead of an integer.

Work with multiple data sources

Another feature of Datasets is that they can be composed of multipledata sources. That is, you may have a directory of partitioned Parquetfiles in one location, and in another directory, files that haven’t beenpartitioned. Or, you could point to an S3 bucket of Parquet data and adirectory of CSVs on the local file system and query them together as asingle Dataset. To create a multi-source Dataset, provide a list ofDatasets toopen_dataset() instead of a file path, orconcatenate them with a command likebig_dataset <- c(ds1, ds2).

Writing Datasets

As you can see, querying a large Dataset can be made quite fast bystorage in an efficient binary columnar format like Parquet or Featherand partitioning based on columns commonly used for filtering. However,data isn’t always stored that way. Sometimes you might start with onegiant CSV. The first step in analyzing data is cleaning is up andreshaping it into a more usable form.

Thewrite_dataset() function allows you to take aDataset or another tabular data object—an Arrow Table or RecordBatch, oran R data frame—and write it to a different file format, partitionedinto multiple files.

Assume that you have a version of the NYC Taxi data as CSV:

ds<-open_dataset("nyc-taxi/csv/", format="csv")

You can write it to a new location and translate the files to theFeather format by callingwrite_dataset() on it:

write_dataset(ds,"nyc-taxi/feather", format="feather")

Next, let’s imagine that thepayment_type column issomething you often filter on, so you want to partition the data by thatvariable. By doing so you ensure that a filter likepayment_type == "Cash" will touch only a subset of fileswherepayment_type is always"Cash".

One natural way to express the columns you want to partition on is touse thegroup_by() method:

ds|>group_by(payment_type)|>write_dataset("nyc-taxi/feather", format="feather")

This will write files to a directory tree that looks like this:

system("tree nyc-taxi/feather")
## feather## ├── payment_type=1## │   └── part-18.arrow## ├── payment_type=2## │   └── part-19.arrow## ...## └── payment_type=UNK##     └── part-17.arrow#### 18 directories, 23 files

Note that the directory names arepayment_type=Cash andsimilar: this is the Hive-style partitioning described above. This meansthat when you callopen_dataset() on this directory, youdon’t have to declare what the partitions are because they can be readfrom the file paths. (To instead write bare values for partitionsegments, i.e. Cash rather thanpayment_type=Cash, callwrite_dataset() withhive_style = FALSE.)

Perhaps, though,payment_type == "Cash" is the only datayou ever care about, and you just want to drop the rest and have asmaller working set. For this, you canfilter() them outwhen writing:

ds|>filter(payment_type=="Cash")|>write_dataset("nyc-taxi/feather", format="feather")

The other thing you can do when writing Datasets is select a subsetof columns or reorder them. Suppose you never care aboutvendor_id, and being a string column, it can take up a lotof space when you read it in, so let’s drop it:

ds|>group_by(payment_type)|>select(-vendor_id)|>write_dataset("nyc-taxi/feather", format="feather")

Note that while you can select a subset of columns, you cannotcurrently rename columns when writing a Dataset.

Partitioning performance considerations

Partitioning Datasets has two aspects that affect performance: itincreases the number of files and it creates a directory structurearound the files. Both of these have benefits as well as costs.Depending on the configuration and the size of your Dataset, the costscan outweigh the benefits.

Because partitions split up the Dataset into multiple files,partitioned Datasets can be read and written with parallelism. However,each additional file adds a little overhead in processing for filesysteminteraction. It also increases the overall Dataset size since each filehas some shared metadata. For example, each parquet file contains theschema and group-level statistics. The number of partitions is a floorfor the number of files. If you partition a Dataset by date with a yearof data, you will have at least 365 files. If you further partition byanother dimension with 1,000 unique values, you will have up to 365,000files. This fine of partitioning often leads to small files that mostlyconsist of metadata.

Partitioned Datasets create nested folder structures, and those allowus to prune which files are loaded in a scan. However, this addsoverhead to discovering files in the Dataset, as we’ll need torecursively “list directory” to find the data files. Too fine partitionscan cause problems here: Partitioning a dataset by date for a yearsworth of data will require 365 list calls to find all the files; addinganother column with cardinality 1,000 will make that 365,365 calls.

The most optimal partitioning layout will depend on your data, accesspatterns, and which systems will be reading the data. Most systems,including Arrow, should work across a range of file sizes andpartitioning layouts, but there are extremes you should avoid. Theseguidelines can help avoid some known worst cases:

  • Avoid files smaller than 20MB and larger than 2GB.
  • Avoid partitioning layouts with more than 10,000 distinctpartitions.

For file formats that have a notion of groups within a file, such asParquet, similar guidelines apply. Row groups can provide parallelismwhen reading and allow data skipping based on statistics, but very smallgroups can cause metadata to be a significant portion of file size.Arrow’s file writer provides sensible defaults for group sizing in mostcases.

Transactions / ACID guarantees

The Dataset API offers no transaction support or any ACID guarantees.This affects both reading and writing. Concurrent reads are fine.Concurrent writes or writes concurring with reads may have unexpectedbehavior. Various approaches can be used to avoid operating on the samefiles such as using a unique basename template for each writer, atemporary directory for new files, or separate storage of the file listinstead of relying on directory discovery.

Unexpectedly killing the process while a write is in progress canleave the system in an inconsistent state. Write calls generally returnas soon as the bytes to be written have been completely delivered to theOS page cache. Even though a write operation has been completed it ispossible for part of the file to be lost if there is a sudden power lossimmediately after the write call.

Most file formats have magic numbers which are written at the end.This means a partial file write can safely be detected and discarded.The CSV file format does not have any such concept and a partiallywritten CSV file may be detected as valid.

Further reading


[8]ページ先頭

©2009-2025 Movatter.jp