At scale¶
AWS SDK for pandas supportsRay andModin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes.Head to ourtutorial to set up a self-managed Ray cluster on Amazon Elastic Compute Cloud (Amazon EC2).
Getting Started¶
Install the library with the these two optional dependencies to enable distributed mode:
>>>pipinstall"awswrangler[ray,modin]"
Once installed, you can use the library in your code as usual:
>>>importawswrangleraswr
At import, SDK for pandas checks ifray andmodin are in the installation path and enables distributed mode.To confirm that you are in distributed mode, run:
>>>print(f"Execution Engine:{wr.engine.get()}")>>>print(f"Memory Format:{wr.memory_format.get()}")
which show that both Ray and Modin are enabled as an execution engine and memory format, respectively.You can switch back to non-distributed mode at any point (SeeSwitching modes below).
Initialization of the Ray cluster is lazy and only triggered when the first distributed API is executed.At that point, SDK for pandas looks for an environment variable calledWR_ADDRESS.If found, it is used to send commands to a remote cluster.If not found, a local Ray runtime is initialized on your machine instead.Alternatively, you can trigger Ray initialization with:
>>>wr.engine.initialize()
In distributed mode, the sameawswrangler APIs can now handle much larger datasets:
# Read 1.6 Gb Parquet datadf=wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/")# Drop vendor_id columndf.drop("vendor_id",axis=1,inplace=True)# Filter trips over 1 miledf1=df[df["trip_distance"]>1]
In the example above, New York City Taxi data is read from Amazon S3 into a distributedModin data frame.Modin is a drop-in replacement for Pandas. It exposes the same APIs but enables you to use all of the cores on your machine, or all of the workers in an entire cluster, leading to improved performance and scale.To use it, make sure to replace your pandas import statement with modin:
>>>importmodin.pandasaspd# instead of import pandas as pd
Failing to do so means that all operations run on a single thread instead of leveraging the entire cluster resources.
Note that in distributed mode, allawswrangler APIs return and accept Modin data frames, not pandas.
Supported APIs¶
This table lists theawswrangler APIs available in distributed mode (i.e. that can run at scale):
Service | API | Implementation |
|---|---|---|
|
| ✅ |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
|
| ✅ |
| ✅ | |
|
| ✅ |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
|
| ✅ |
| ✅ | |
| ✅ | |
| ✅ | |
| ✅ | |
|
| ✅ |
|
| ✅ |
| ✅ | |
| ✅ |
Switching modes¶
The following commands showcase how to switch between distributed and non-distributed modes:
# Switch to non-distributedwr.engine.set("python")wr.memory_format.set("pandas")# Switch to distributedwr.engine.set("ray")wr.memory_format.set("modin")
Similarly, you can set theWR_ENGINE andWR_MEMORY_FORMAT environment variablesto the desired engine and memory format, respectively.
Caveats¶
S3FS Filesystem¶
When Ray is chosen as an engine,S3Fs is used instead of boto3 for certain API calls.These include listing a large number of S3 objects for example.This choice was made for performance reasons as a boto3 implementation can be much slower in some cases.As a side effect,users won’t be able to use thes3_additional_kwargs input parameter as it’s currently not supported by S3Fs.
Unsupported kwargs¶
Most AWS SDK for pandas calls support passing theboto3_session argument.While this is acceptable for an application running in a single process,distributed applications require the session to be serialized and passed to the worker nodes in the cluster.This constitutes a security risk.As a result, passingboto3_session when using the Ray runtime is not supported.
To learn more¶
Head to our latesttutorials to discover even more features.
A runbook with common errors when running the library with Ray is availablehere.