Run Spark on Ray cluster on Vertex AI Stay organized with collections Save and categorize content based on your preferences.
To see an example of getting started with Spark on Ray on Vertex AI, run the "Spark on Ray on Vertex AI" notebook in one of the following environments:
Open in Colab |Open in Colab Enterprise |Openin Vertex AI Workbench |View on GitHub
TheRayDP Python library makes itpossible to run Spark on a Ray cluster. This document covers installing,configuring and running RayDP on Ray on Vertex AI (Ray cluster onVertex AI).
Installation
Ray on Vertex AI enables users to run their applications using the opensourceRay framework.RayDP provides APIs for running Spark on Ray. The prebuilt container imagesavailable to create a Ray cluster onVertex AI don't come with RayDP pre-installed. This means youmust create acustom Ray cluster on Vertex AIimage for your Ray cluster on Vertex AI to run RayDP applications onRay cluster on Vertex AI. The following section explains how to build aRayDP custom image.
Build a Ray on Vertex AI custom container image
Use this dockerfile to create a custom container image for Ray on Vertex AI that has RayDP installed.
FROMus-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latestRUNapt-getupdate-y\&&pipinstall--no-cache-dirraydppyarrow==14.0
You can use the latest Ray cluster on Vertex AI prebuilt image forcreating the RayDP custom image. You can also install other Python packagesthat you anticipate you'll use in your applications. Thepyarrow==14.0 is dueto a dependency constraint of Ray 2.9.3.
Build and push the custom container image
Create a Docker repository inArtifact Registry before you build yourcustom image (seeWork with container imagesfor how to create and configure your Docker repository). After you create thedocker repository, build and push the custom container image using theDockerfile.
dockerbuild.-t[LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]dockerpush[LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Where:
LOCATION: The Cloud Storage location (for example, us-central1) that youcreated in your Artifact Registry.PROJECT_ID: Your Google Cloud project ID.DOCKER_REPOSITORY: The name of the docker repository that you created.IMAGE_NAME: The name of your custom container images.
Create a Ray cluster on Vertex AI
Use the custom container image built in the previous step to create a Raycluster on Vertex AI. You can use the Vertex AI SDK for Python forcreating a Ray cluster on Vertex AI.
If you haven't done so yet, install the required Python libraries.
pipinstall--quietgoogle-cloud-aiplatform\ray[all]==2.9.3\google-cloud-aiplatform[ray]
Configure Head and Worker nodes and create the cluster usingVertex AI SDK for Python. For example:
importloggingimportrayfromgoogle.cloudimportaiplatformfromgoogle.cloud.aiplatformimportvertex_rayfromvertex_rayimportResourceshead_node_type=Resources(machine_type="n1-standard-16",node_count=1,custom_image=[CUSTOM_CONTAINER_IMAGE_URI],)worker_node_types=[Resources(machine_type="n1-standard-8",node_count=2,custom_image=[CUSTOM_CONTAINER_IMAGE_URI],)]ray_cluster_resource_name=vertex_ray.create_ray_cluster(head_node_type=head_node_type,worker_node_types=worker_node_types,cluster_name=[CLUSTER_NAME],)
Where:
CUSTOM_CONTAINER_IMAGE_URI: TheURI of the custom container image pushed toArtifact Registry.CLUSTER_NAME: The name of your Ray cluster on Vertex AI.
Spark on Ray cluster on Vertex AI
Before you run your Spark application, create a Spark sessionusing the RayDP API. You can use the Ray client for doing this interactivelyor use the Ray job API. The Ray job API is recommended, especially forproduction and long-running applications. The RayDP API provides parameters toconfigure the Spark session, as well as supportingSpark Configuration.Learn more about the RayDP API for creating Spark Session seeSpark master actors node affinity.
RayDP with Ray client
You can use RayTaskorActor to create aSpark cluster and session on the Ray cluster on Vertex AI. Ray Task, orActor, is required to use aRay Clientto create a Spark session on the Ray cluster on Vertex AI. The followingcode shows how a Ray Actor can create a Spark Session, run aSpark application, and stop a Spark cluster on a Ray cluster onVertex AI using RayDP.
To interactively connect to the Ray cluster on Vertex AI,seeConnect to a Ray cluster through Ray Client
@ray.remoteclassSparkExecutor:importpysparkspark:pyspark.sql.SparkSession=Nonedef__init__(self):importrayimportraydpself.spark=raydp.init_spark(app_name="RAYDP ACTOR EXAMPLE",num_executors=1,executor_cores=1,executor_memory="500M",)defget_data(self):df=self.spark.createDataFrame([("sue",32),("li",3),("bob",75),("heo",13),],["first_name","age"],)returndf.toJSON().collect()defstop_spark(self):importraydpraydp.stop_spark()s=SparkExecutor.remote()data=ray.get(s.get_data.remote())print(data)ray.get(s.stop_spark.remote())
RayDP with Ray Job API
Ray client is useful for small experiments that require interactiveconnection with the Ray cluster.TheRay Job APIis the recommended way to run long-running and production jobs on a Raycluster. This also applies to running Spark applications on the Ray cluster onVertex AI.
Create a Python script that contains your Spark application code. For example:
importpysparkimportraydpdefget_data(spark:pyspark.sql.SparkSession):df=spark.createDataFrame([("sue",32),("li",3),("bob",75),("heo",13),],["first_name","age"],)returndf.toJSON().collect()defstop_spark():raydp.stop_spark()if__name__=='__main__':spark=raydp.init_spark(app_name="RAYDP JOB EXAMPLE",num_executors=1,executor_cores=1,executor_memory="500M",)print(get_data(spark))stop_spark()
Submit the job to run the python script using Ray Job API. For example:
fromray.job_submissionimportJobSubmissionClientclient=JobSubmissionClient(RAY_ADDRESS)job_id=client.submit_job(# Entrypoint shell command to executeentrypoint="python [SCRIPT_NAME].py",# Path to the local directory that contains the python script file.runtime_env={"working_dir":".",})
Where:
SCRIPT_NAME: The filename of the script that you created.
Reading Cloud Storage files from Spark application
It's common practice to store data files in a Google Cloud Storage bucket.You can read these files in multiple ways from a Spark application that'srunning on the Ray cluster on Vertex AI. This section explains twotechniques forreading Cloud Storage files from Spark applications running on Ray Clusteron Vertex AI.
Use the Google Cloud Storage Connector
You can use the Google Cloud Connector for Hadoop to read files from aCloud Storage bucket from your Spark application. After you create a Sparksession using RayDP, you can read files using a few configuration parameter. Thefollowing code shows how to read a CSV file stored in a Cloud Storage bucketfrom a Spark application on the Ray cluster on Vertex AI.
importraydpspark=raydp.init_spark(app_name="RayDP Cloud Storage Example 1",configs={"spark.jars":"https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar","spark.hadoop.fs.AbstractFileSystem.gs.impl":"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS","spark.hadoop.fs.gs.impl":"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",},num_executors=2,executor_cores=4,executor_memory="500M",)spark_df=spark.read.csv([GCS_FILE_URI],header=True,inferSchema=True)
Where:
GCS_FILE_URI: The URI of a file stored in a Cloud Storage bucket.For example: gs://my-bucket/my-file.csv.
Use Ray data
The Google Cloud connector provides a way to read files from a Google Cloudbucket and it may be sufficient for most use cases. You might want to useRay Data to read files from the Google Cloud bucket when you need to use Ray'sdistributed processing for reading data, or when you face issues readingGoogle Cloud file with Google Cloud connector. This could possiblyhappen because of Java dependency conflicts when some other applicationdependencies are added to the Spark Java classpath using eitherspark.jars.packages orspark.jars.
importraydpimportrayspark=raydp.init_spark(app_name="RayDP Cloud Storage Example 2",configs={"spark.jars.packages":"com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3","spark.jars.repositories":"https://mmlspark.azureedge.net/maven","spark.jars":"https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar","spark.hadoop.fs.AbstractFileSystem.gs.impl":"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS","spark.hadoop.fs.gs.impl":"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",},num_executors=2,executor_cores=4,executor_memory="500M",)# This doesn't work even though the Cloud Storage connector Jar and other parameters havebeenaddedtotheSparkconfiguration.#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)ray_dataset=ray.data.read_csv(GCS_FILE_URI)spark_df=ray_dataset.to_spark(spark)
Pyspark Pandas UDF on Ray cluster on Vertex AI
Pyspark Pandas UDFssometimes require additional code when you use them in your Sparkapplication running on a Ray cluster on Vertex AI. This is usuallyrequired when the Pandas UDF uses a Python library that isn't available onthe Ray cluster on Vertex AI. You can package thePython dependenciesof an application using the runtime environment with the Ray Job API. After yousubmit the Ray job to the cluster, Ray installs those dependencies in thePython virtual environment that it creates for running the job. ThePandas UDFs,however, don't use the same virtual environment. Instead, they use the defaultPython System environment. If that dependency isn't available in the Systemenvironment, you might need to install it within your Pandas UDF. In thefollowing example, install thestatsmodels library within the UDF.
importpandasaspdimportpysparkimportraydpfrompyspark.sql.functionsimportpandas_udffrompyspark.sql.typesimportStringTypedeftest_udf(spark:pyspark.sql.SparkSession):importpandasaspddf=spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))returndf.select(func('Lottery','Literacy','Pop1831')).collect()@pandas_udf(StringType())deffunc(s1:pd.Series,s2:pd.Series,s3:pd.Series)->str:importnumpyasnpimportsubprocessimportsyssubprocess.check_call([sys.executable,"-m","pip","install","statsmodels"])importstatsmodels.apiassmimportstatsmodels.formula.apiassmfd={'Lottery':s1,'Literacy':s2,'Pop1831':s3}data=pd.DataFrame(d)# Fit regression model (using the natural log of one of the regressors)results=smf.ols('Lottery ~ Literacy + np.log(Pop1831)',data=data).fit()returnresults.summary().as_csv()if__name__=='__main__':spark=raydp.init_spark(app_name="RayDP UDF Example",num_executors=2,executor_cores=4,executor_memory="1500M",)print(test_udf(spark))raydp.stop_spark()
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.
Open in Colab
Open in Colab Enterprise
Openin Vertex AI Workbench
View on GitHub