Distributed training Stay organized with collections Save and categorize content based on your preferences.
This page describes how to run distributed training jobs onVertex AI.
Code requirements
Use an ML framework that supports distributed training. In your training code,you can use theCLUSTER_SPEC orTF_CONFIG environment variablesto reference specific parts of your training cluster.
Structure of the training cluster
If you run a distributed training job with Vertex AI, you specifymultiple machines (nodes) in atraining cluster. The training serviceallocates the resources for the machine types you specify. Your running job on agiven node is called areplica. A group of replicas with the sameconfiguration is called aworker pool.
Each replica in the training cluster is given a single role or task indistributed training. For example:
Primary replica: Exactly one replica is designated theprimary replica. This task manages the others and reports status for thejob as a whole.
Worker(s): One or more replicas may be designated asworkers. Thesereplicas do their portion of the work as you designate in your jobconfiguration.
Parameter server(s): If supported by your ML framework, one or morereplicas may be designated asparameter servers. These replicas storemodel parameters and coordinate shared model state between the workers.
Evaluator(s): If supported by your ML framework, one or more replicasmay be designated asevaluators. These replicas can be used to evaluateyour model. If you are using TensorFlow, note that TensorFlow generallyexpects that you use no more than one evaluator.
Configure a distributed training job
You can configure any Vertex AI serverless training job as a distributedtraining job bydefining multiple worker pools. You can also run distributed training within atraining pipeline or a hyperparameter tuning job.
To configure a distributed training job, define your list of worker pools(workerPoolSpecs[]),designating oneWorkerPoolSpec for each type of task:
Position inworkerPoolSpecs[] | Task performed in cluster |
|---|---|
First (workerPoolSpecs[0]) | Primary, chief, scheduler, or "master" |
Second (workerPoolSpecs[1]) | Secondary, replicas, workers |
Third (workerPoolSpecs[2]) | Parameter servers, Reduction Server |
Fourth (workerPoolSpecs[3]) | Evaluators |
You must specify a primary replica, which coordinates the work done by all theother replicas. Use the first worker pool specification only for your primaryreplica, and set itsreplicaCountto1:
{ "workerPoolSpecs": [ // `WorkerPoolSpec` for worker pool 0, primary replica, required { "machineSpec": {...}, "replicaCount": 1, "diskSpec": {...}, ... }, // `WorkerPoolSpec` for worker pool 1, optional {}, // `WorkerPoolSpec` for worker pool 2, optional {}, // `WorkerPoolSpec` for worker pool 3, optional {} ] ...}Specify additional worker pools
Depending on your ML framework, you may specify additional worker pools forother purposes. For example, if you are using TensorFlow, you could specifyworker pools to configure worker replicas, parameter server replicas, andevaluator replicas.
The order of the worker pools you specify in theworkerPoolSpecs[] listdetermines thetype of worker pool. Set empty values forworker pools that you don't want to use, so that you can skip them in theworkerPoolSpecs[] list in order to specify worker pools that you do want touse. For example:
If you want to specify a job that has only a primary replica and a parameterserver worker pool, you must set an empty value for worker pool 1:
{ "workerPoolSpecs": [ // `WorkerPoolSpec` for worker pool 0, required { "machineSpec": {...}, "replicaCount": 1, "diskSpec": {...}, ... }, // `WorkerPoolSpec` for worker pool 1, optional {}, // `WorkerPoolSpec` for worker pool 2, optional { "machineSpec": {...}, "replicaCount": 1, "diskSpec": {...}, ... }, // `WorkerPoolSpec` for worker pool 3, optional {} ] ...}Reduce training time with Reduction Server
When you train a large ML model using multiple nodes, communicating gradientsbetween nodes can contribute significant latency.Reduction Server is anall-reduce algorithm that can increase throughput and reduce latency fordistributed training. Vertex AI makes Reduction Server availablein a Docker container image that you can use for one of your worker pools duringdistributed training.
To learn about how Reduction Server works, seeFaster distributed GPU training with Reduction Server on Vertex AI.
Prerequisites
You can use Reduction Server if you meet the following requirements:
You are performing distributed training with GPU workers.
Your training code uses TensorFlow or PyTorch and is configured for multi-hostdata-parallel training with GPUs usingNCCLall-reduce. (You might also be able to use other ML frameworks that use NCCL.)
The containers running on your primary node (
workerPoolSpecs[0]) and workers(workerPoolSpecs[1]) support Reduction Server. Specifically, each containeris one of the following:Aprebuilt TensorFlow trainingcontainer, version 2.3 orlater.
Aprebuilt Pytorch trainingcontainer, version 1.4 orlater.
Acustom container with NCCL 2.7 orlater and the
google-reduction-serverpackage installed. You can installthis package on a custom container image by adding the following line toyour Dockerfile:RUN echo "deb https://packages.cloud.google.com/apt google-fast-socket main" | tee /etc/apt/sources.list.d/google-fast-socket.list && \ curl -s -L https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \ apt update && apt install -y google-reduction-server
Train using Reduction Server
To use Reduction Server, do the following when you create aserverless training resource:
Specify one of the following URIs in the
containerSpec.imageUrifield of the third worker pool(workerPoolSpecs[2]):us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latesteurope-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latestasia-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest
Choosing the multi-region closest to where you are performingserverless training might reduce latency.
When selecting themachine type and number of nodesfor the third worker pool, make sure that the total network bandwidth of thethird worker pool matches or exceeds the total network bandwidth of the firstand second worker pools.
To learn about the maximum available bandwidth of each node in the secondworker pool, seeNetwork bandwidth andGPUs.
You do not use GPUs for the Reduction Server nodes. To learn about themaximum available bandwidth of each node in the third worker pool, see the"Maximum egress bandwidth (Gbps)" columns inGeneral-purpose machinefamily.
For example, if you configure the first and second worker pools to use 5
n1-highmem-96nodes, each with 8NVIDIA_TESLA_V100GPUs, then each nodehas a maximum available bandwidth of 100 Gbps, for a total bandwidth of 500 Gbps. In order to match this bandwidth in the third worker pool, you mightuse 16n1-highcpu-16nodes, each with a maximum bandwidth of 32 Gbps, for atotal bandwidth of 512 Gbps.We recommend that you use the
n1-highcpu-16machine type for ReductionServer nodes, because this machine type offers relatively high bandwidth forits resources.
The following command provides an example of how to create aCustomJobresource that uses Reduction Server:
gcloudaicustom-jobscreate\--region=LOCATION\--display-name=JOB_NAME\--worker-pool-spec=machine-type=n1-highmem-96,replica-count=1,accelerator-type=NVIDIA_TESLA_V100,accelerator-count=8,container-image-uri=CUSTOM_CONTAINER_IMAGE_URI\--worker-pool-spec=machine-type=n1-highmem-96,replica-count=4,accelerator-type=NVIDIA_TESLA_V100,accelerator-count=8,container-image-uri=CUSTOM_CONTAINER_IMAGE_URI\--worker-pool-spec=machine-type=n1-highcpu-16,replica-count=16,container-image-uri=us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latestFor more context, see theguide to creating aCustomJob.
To see an example of how to perform distributed training in PyTorch by using Reduction Server, run the "PyTorch distributed training with Vertex AI Reduction Server" notebook in one of the following environments:
Open in Colab |Open in Colab Enterprise |Openin Vertex AI Workbench |View on GitHub
To see an example of how to perform parallel training in XGBoost by using Dask, run the "Distributed XGBoost training with Dask" notebook in one of the following environments:
Open in Colab |Open in Colab Enterprise |Openin Vertex AI Workbench |View on GitHub
Best practices for training using Reduction Server
To optimize the performance and efficiency of your Reduction Server trainingjobs, consider the following best practices for configuring your cluster.
Machine type and count
In Reduction Server training, each worker needs to connect to all of the reducerhosts. To minimize the number of connections on the worker host, usea machine type with the highest network bandwidth for your reducer host.
A good choice for reducer hosts is a general purpose N1/N2 VM with at least 16 vCPUthat provides32 Gbps egress bandwidth,such asn1-highcpu-16 andn2-highcpu-16. Tier 1 VM bandwidth for N1/N2 VMsincreases themaximum egress bandwidth ranging from 50 Gbps and 100 Gbps, making these agood choice for reducer VM nodes.
The total egress bandwidth of workers and reducers should be the same. Forexample, if you use 8a2-megagpu-16g VMs as workers, you should use at least25n1-highcpu-16 VMs as reducers.
`(8 worker VMs * 100 Gbps) / 32 Gbps egress = 25 reducer VMs`.Batch small messages together
Reduction Server works best if the messages to be aggregated are sufficiently large.Most ML frameworks already provide techniques under different terminology forbatching small gradient tensors before performing all-reduce.
Horovod
Horovod supportsTensor Fusionto batch small tensors for all-reduce. Tensorsare filled in a fusion buffer until the buffer is fully filled and theall-reduce operation on the buffer executes. You can adjust the size of thefusion buffer by setting theHOROVOD_FUSION_THRESHOLD environment variable.
The recommended value for theHOROVOD_FUSION_THRESHOLD environment variableis at least 128 MB. In this case, set theHOROVOD_FUSION_THRESHOLD environmentvariable to 134217728 (128 * 1024 * 1024).
PyTorch
PyTorchDistributedDataParallelsupports batch messages as "gradient bucketing". Set thebucket_cap_mb parameterin theDistributedDataParallel constructor to control the size of your batch buckets.The default size is 25 MB.
BEST PRACTICE: The recommended value of bucket_cap_mb is 64 (64 MB).
Environment variables for your cluster
Vertex AI populates an environment variable,CLUSTER_SPEC, onevery replica to describe how the overall cluster is set up. LikeTensorFlow'sTF_CONFIG,CLUSTER_SPEC describes every replica in the cluster,including its index and role (primary replica, worker, parameter server, orevaluator).
When you run distributed training with TensorFlow,TF_CONFIG is parsed tobuildtf.train.ClusterSpec.Similarly, when you run distributed training with other ML frameworks, you mustparseCLUSTER_SPEC to populate any environment variables or settings requiredby the framework.
The format ofCLUSTER_SPEC
TheCLUSTER_SPEC environment variable is a JSON string with the followingformat:
| Key | Description | |
|---|---|---|
"cluster" | The cluster description for your custom container. As with The cluster description contains a list of replica names for each worker pool you specify. | |
"workerpool0" | All distributed training jobs have one primary replica in the first worker pool. | |
"workerpool1" | This worker pool contains worker replicas, if you specified them when creating your job. | |
"workerpool2" | This worker pool contains parameter servers, if you specified them when creating your job. | |
"workerpool3" | This worker pool contains evaluators, if you specified them when creating your job. | |
"environment" | The stringcloud. | |
"task" | Describes the task of the particular node on which your code is running. You can use this information to write code for specific workers in a distributed job. This entry is a dictionary with the following keys: | |
"type" | The type of worker pool this task is running in. For example,"workerpool0" refers to the primary replica. | |
"index" | The zero-based index of the task. For example, if your training job includes two workers, this value is set to | |
"trial" | The identifier of the hyperparameter tuning trial currently running. When you configure hyperparameter tuning for your job, you set a number of trials to train. This value gives you a way to differentiate in your code between trials that are running. The identifier is a string value containing the trial number, starting at 1. | |
job | The | |
CLUSTER_SPEC example
Here is an example value:
{ "cluster":{ "workerpool0":[ "cmle-training-workerpool0-ab-0:2222" ], "workerpool1":[ "cmle-training-workerpool1-ab-0:2222", "cmle-training-workerpool1-ab-1:2222" ], "workerpool2":[ "cmle-training-workerpool2-ab-0:2222", "cmle-training-workerpool2-ab-1:2222" ], "workerpool3":[ "cmle-training-workerpool3-ab-0:2222", "cmle-training-workerpool3-ab-1:2222", "cmle-training-workerpool3-ab-2:2222" ] }, "environment":"cloud", "task":{ "type":"workerpool0", "index":0, "trial":"TRIAL_ID" }, "job": { ... }}The format ofTF_CONFIG
In addition toCLUSTER_SPEC, Vertex AI sets theTF_CONFIG environment variableon each replica of all distributed training jobs. Vertex AI doesnot setTF_CONFIG for single-replica training jobs.
CLUSTER_SPEC andTF_CONFIG share some values, but they have differentformats. Both environment variables include additional fields beyond whatTensorFlow requires.
Distributed training with TensorFlow works the same way when you use customcontainers as when you use a prebuilt container.
TheTF_CONFIG environment variable is a JSON string with the following format:
TF_CONFIG fields | |||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
cluster | The TensorFlow cluster description. A dictionary mapping one or more task names ( This is a valid first argument for the | ||||||||||
task | The task description of the VM where this environment variable is set. For a given training job, this dictionary is different on every VM. You can use this information to customize what code runs on each VM in a distributed training job. You can also use it to change the behavior of your training code for different trials of a hyperparameter tuning job. This dictionary includes the following key-value pairs:
| ||||||||||
job | The | ||||||||||
environment | The string | ||||||||||
TF_CONFIG example
The following example code prints theTF_CONFIG environment variable to yourtraining logs:
importjsonimportostf_config_str=os.environ.get('TF_CONFIG')tf_config_dict=json.loads(tf_config_str)# Convert back to string just for pretty printingprint(json.dumps(tf_config_dict,indent=2))In a hyperparameter tuning job that runs in runtime version 2.1 or later anduses a master worker, two workers, and a parameter server, this code producesthe following log for one of the workers during the first hyperparameter tuningtrial. The example output hides thejob field for conciseness and replacessome IDs with generic values.
{ "cluster": { "chief": [ "training-workerpool0-[ID_STRING_1]-0:2222" ], "ps": [ "training-workerpool2-[ID_STRING_1]-0:2222" ], "worker": [ "training-workerpool1-[ID_STRING_1]-0:2222", "training-workerpool1-[ID_STRING_1]-1:2222" ] }, "environment": "cloud", "job": { ... }, "task": { "cloud": "[ID_STRING_2]", "index": 0, "trial": "1", "type": "worker" }}When to useTF_CONFIG
TF_CONFIG is set only for distributed training jobs.
You likely don't need to interact with theTF_CONFIG environment variable directly in your training code. Only access thetheTF_CONFIG environment variable if TensorFlow's distribution strategies andVertex AI's standard hyperparameter tuning workflow, both describedin the next sections, don't work for your job.
Distributed training
Vertex AI sets theTF_CONFIG environment variable to extend thespecifications that TensorFlow requires for distributedtraining.
To perform distributed training with TensorFlow, use thetf.distribute.StrategyAPI.In particular, we recommend that you use the Keras API together with theMultiWorkerMirroredStrategyor, if youspecify parameter servers for your job, theParameterServerStrategy.However, note that TensorFlow only provides experimental support forthese strategies.
These distribution strategies use theTF_CONFIG environment variable to assignroles to each VM in your training job and to facilitate communication betweenthe VMs. You don't need to access theTF_CONFIG environment variable directlyin your training code, because TensorFlow handles it for you.
Only parse theTF_CONFIG environment variable directly if you want tocustomize how the different VMs running your training job behave.
Hyperparameter tuning
When you run ahyperparameter tuningjob, Vertex AIprovides different arguments to your training code for each trial. Your trainingcode does not necessarily need to be aware of what trial is currently running.In addition, you can monitor the progress of hyperparameter tuningjobs in Google Cloud console.
If needed, your code can read the current trial number fromthetrial fieldof thetask field of theTF_CONFIG environment variable.
What's next
- Create atraining pipeline.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-18 UTC.
Open in Colab
Open in Colab Enterprise
Openin Vertex AI Workbench
View on GitHub