Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.

License

NotificationsYou must be signed in to change notification settings

uber/petastorm

Repository files navigation

Build StatusCode coverageLicenseLatest Version

Petastorm is an open source data access library developed at Uber ATG. This library enables single machine ordistributed training and evaluation of deep learning models directly from datasets in Apache Parquetformat. Petastorm supports popular Python-based machine learning (ML) frameworks such asTensorflow,PyTorch, andPySpark. It can also be used from pure Python code.

Documentation web site:https://petastorm.readthedocs.io

pip install petastorm

There are several extra dependencies that are defined by thepetastorm package that are not installed automatically.The extras are:tf,tf_gpu,torch,opencv,docs,test.

For example to trigger installation of GPU version of tensorflow and opencv, use the following pip command:

pip install petastorm[opencv,tf_gpu]

A dataset created using Petastorm is stored inApache Parquet format.On top of a Parquet schema, petastorm also stores higher-level schema information that makes multidimensional arrays into a native part of a petastorm dataset.

Petastorm supports extensible data codecs. These enable a user to use one of the standard data compressions (jpeg, png) or implement her own.

Generating a dataset is done using PySpark.PySpark natively supports Parquet format, making it easy to run on a single machine or on a Spark compute cluster.Here is a minimalistic example writing out a table with some random data.

importnumpyasnpfrompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportIntegerTypefrompetastorm.codecsimportScalarCodec,CompressedImageCodec,NdarrayCodecfrompetastorm.etl.dataset_metadataimportmaterialize_datasetfrompetastorm.unischemaimportdict_to_spark_row,Unischema,UnischemaField# The schema defines how the dataset schema looks likeHelloWorldSchema=Unischema('HelloWorldSchema', [UnischemaField('id',np.int32, (),ScalarCodec(IntegerType()),False),UnischemaField('image1',np.uint8, (128,256,3),CompressedImageCodec('png'),False),UnischemaField('array_4d',np.uint8, (None,128,30,None),NdarrayCodec(),False),])defrow_generator(x):"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""return {'id':x,'image1':np.random.randint(0,255,dtype=np.uint8,size=(128,256,3)),'array_4d':np.random.randint(0,255,dtype=np.uint8,size=(4,128,30,3))}defgenerate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):rowgroup_size_mb=256spark=SparkSession.builder.config('spark.driver.memory','2g').master('local[2]').getOrCreate()sc=spark.sparkContext# Wrap dataset materialization portion. Will take care of setting up spark environment variables as# well as save petastorm specific metadatarows_count=10withmaterialize_dataset(spark,output_url,HelloWorldSchema,rowgroup_size_mb):rows_rdd=sc.parallelize(range(rows_count))\            .map(row_generator)\            .map(lambdax:dict_to_spark_row(HelloWorldSchema,x))spark.createDataFrame(rows_rdd,HelloWorldSchema.as_spark_schema()) \            .coalesce(10) \            .write \            .mode('overwrite') \            .parquet(output_url)
  • HelloWorldSchema is an instance of aUnischema object.Unischema is capable of rendering types of its fields into differentframework specific formats, such as: SparkStructType, Tensorflowtf.DType and numpynumpy.dtype.
  • To define a dataset field, you need to specify atype,shape, acodec instance and whether the field is nullable for each field of theUnischema.
  • We use PySpark for writing output Parquet files. In this example, we launchPySpark on a local box (.master('local[2]')). Of course for a largerscale dataset generation we would need a real compute cluster.
  • We wrap spark dataset generation code with thematerialize_datasetcontext manager. The context manager is responsible for configuring rowgroup size at the beginning and write out petastorm specific metadata at theend.
  • The row generating code is expected to return a Python dictionary indexed bya field name. We userow_generator function for that.
  • dict_to_spark_row converts the dictionary into apyspark.Rowobject while ensuring schemaHelloWorldSchema compliance (shape,type and is-nullable condition are tested).
  • Once we have apyspark.DataFrame we write it out to a parquetstorage. The parquet schema is automatically derived fromHelloWorldSchema.

Thepetastorm.reader.Reader class is the main entry point for usercode that accesses the data from an ML framework such as Tensorflow or Pytorch.The reader has multiple features such as:

  • Selective column readout
  • Multiple parallelism strategies: thread, process, single-threaded (for debug)
  • N-grams readout support
  • Row filtering (row predicates)
  • Shuffling
  • Partitioning for multi-GPU training
  • Local caching

Reading a dataset is simple using thepetastorm.reader.Reader class which can be created using thepetastorm.make_reader factory method:

frompetastormimportmake_readerwithmake_reader('hdfs://myhadoop/some_dataset')asreader:forrowinreader:print(row)

hdfs://... andfile://... are supported URL protocols.

Once aReader is instantiated, you can use it as an iterator.

To hookup the reader into a tensorflow graph, you can use thetf_tensorsfunction:

frompetastorm.tf_utilsimporttf_tensorswithmake_reader('file:///some/localpath/a_dataset')asreader:row_tensors=tf_tensors(reader)withtf.Session()assession:for_inrange(3):print(session.run(row_tensors))

Alternatively, you can use newtf.data.Dataset API;

frompetastorm.tf_utilsimportmake_petastorm_datasetwithmake_reader('file:///some/localpath/a_dataset')asreader:dataset=make_petastorm_dataset(reader)iterator=dataset.make_one_shot_iterator()tensor=iterator.get_next()withtf.Session()assess:sample=sess.run(tensor)print(sample.id)

As illustrated inpytorch_example.py,reading a petastorm dataset from pytorchcan be done via the adapter classpetastorm.pytorch.DataLoader,which allows custom pytorch collating function and transforms to be supplied.

Be sure you havetorch andtorchvision installed:

pip install torchvision

The minimalist example below assumes the definition of aNet class andtrain andtest functions, included inpytorch_example:

importtorchfrompetastorm.pytorchimportDataLoadertorch.manual_seed(1)device=torch.device('cpu')model=Net().to(device)optimizer=torch.optim.SGD(model.parameters(),lr=0.01,momentum=0.5)def_transform_row(mnist_row):transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))    ])return (transform(mnist_row['image']),mnist_row['digit'])transform=TransformSpec(_transform_row,removed_fields=['idx'])withDataLoader(make_reader('file:///localpath/mnist/train',num_epochs=10,transform_spec=transform,seed=1,shuffle_rows=True),batch_size=64)astrain_loader:train(model,device,train_loader,10,optimizer,1)withDataLoader(make_reader('file:///localpath/mnist/test',num_epochs=10,transform_spec=transform),batch_size=1000)astest_loader:test(model,device,test_loader)

If you are working with very large batch sizes and do not need support for Decimal/strings we provide apetastorm.pytorch.BatchedDataLoader that can buffer using Torch tensors (cpu orcuda) with a signficantly higher throughput.

If the size of your dataset can fit into system memory, you can use an in-memory version dataloaderpetastorm.pytorch.InMemBatchedDataLoader. This dataloader only reades the dataset once, and caches data in memory to avoid additional I/O for multiple epochs.

Spark converter API simplifies the data conversion from Spark to TensorFlow or PyTorch.The input Spark DataFrame is first materialized in the parquet format and then loaded asatf.data.Dataset ortorch.utils.data.DataLoader.

The minimalist example below assumes the definition of a compiledtf.keras model and aSpark DataFrame containing a feature column followed by a label column.

frompetastorm.sparkimportSparkDatasetConverter,make_spark_converterimporttensorflow.compat.v1astf# pylint: disable=import-error# specify a cache dir first.# the dir is used to save materialized spark dataframe filesspark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF,'hdfs:/...')df= ...# `df` is a spark dataframe# create a converter from `df`# it will materialize `df` to cache dir.converter=make_spark_converter(df)# make a tensorflow dataset from `converter`withconverter.make_tf_dataset()asdataset:# the `dataset` is `tf.data.Dataset` object# dataset transformation can be done if neededdataset=dataset.map(...)# we can train/evaluate model on the `dataset`model.fit(dataset)# when exiting the context, the reader of the dataset will be closed# delete the cached files of the dataframe.converter.delete()

The minimalist example below assumes the definition of aNet class andtrain andtest functions, included inpytorch_example.py,and a Spark DataFrame containing a feature column followed by a label column.

frompetastorm.sparkimportSparkDatasetConverter,make_spark_converter# specify a cache dir first.# the dir is used to save materialized spark dataframe filesspark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF,'hdfs:/...')df_train,df_test= ...# `df_train` and `df_test` are spark dataframesmodel=Net()# create a converter_train from `df_train`# it will materialize `df_train` to cache dir. (the same for df_test)converter_train=make_spark_converter(df_train)converter_test=make_spark_converter(df_test)# make a pytorch dataloader from `converter_train`withconverter_train.make_torch_dataloader()asdataloader_train:# the `dataloader_train` is `torch.utils.data.DataLoader` object# we can train model using the `dataloader_train`train(model,dataloader_train, ...)# when exiting the context, the reader of the dataset will be closed# the same for `converter_test`withconverter_test.make_torch_dataloader()asdataloader_test:test(model,dataloader_test, ...)# delete the cached files of the dataframes.converter_train.delete()converter_test.delete()

A Petastorm dataset can be read into a Spark DataFrame using PySpark, where you canuse a wide range of Spark tools to analyze and manipulate the dataset.

# Create a dataframe object from a parquet filedataframe=spark.read.parquet(dataset_url)# Show a schemadataframe.printSchema()# Count alldataframe.count()# Show a single columndataframe.select('id').show()

SQL can be used to query a Petastorm dataset:

spark.sql('SELECT count(id) ''from parquet.`file:///tmp/hello_world_dataset`').collect()

You can find a full code sample here:pyspark_hello_world.py,

Petastorm can also be used to read data directly from Apache Parquet stores. To achieve that, usemake_batch_reader (and notmake_reader). The following table summarizes the differencesmake_batch_reader andmake_reader functions.

make_readermake_batch_reader
Only Petastorm datasets (created using materializes_dataset)Any Parquet store (some native Parquet column typesare not supported yet.
The reader returns one record at a time.The reader returns batches of records. The size of thebatch is not fixed and defined by Parquet row-groupsize.
Predicates passed tomake_reader are evaluated per single row.Predicates passed tomake_batch_reader are evaluated per batch.
Can filter parquet file based on thefilters argument.Can filter parquet file based on thefilters argument

See theTroubleshooting page and please submit aticket if you can't find ananswer.

  1. Gruener, R., Cheng, O., and Litvin, Y. (2018)Introducing Petastorm: Uber ATG's Data Access Library for Deep Learning. URL:https://eng.uber.com/petastorm/
  2. QCon.ai 2019:"Petastorm: A Light-Weight Approach to Building ML Pipelines".

We prefer to receive contributions in the form of GitHub pull requests. Please send pull requests against thegithub.com/uber/petastorm repository.

  • If you are looking for some ideas on what to contribute, check outgithub issues and comment on the issue.
  • If you have an idea for an improvement, or you'd like to report a bug but don't have time to fix it please acreate a github issue.

To contribute a patch:

  • Break your work into small, single-purpose patches if possible. It's much harder to merge in a large change with a lot of disjoint features.
  • Submit the patch as a GitHub pull request against the master branch. For a tutorial, see the GitHub guides on forking a repo and sending a pull request.
  • Include a detailed describtion of the proposed change in the pull request.
  • Make sure that your code passes the unit tests. You can find instructions how to run the unit testshere.
  • Add new unit tests for your code.

Thank you in advance for your contributions!

See theDevelopment for development related information.

About

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp