Distributed training

This page describes how to run distributed training jobs onVertex AI.

Code requirements

Use an ML framework that supports distributed training. In your training code,you can use theCLUSTER_SPEC orTF_CONFIG environment variablesto reference specific parts of your training cluster.

Structure of the training cluster

If you run a distributed training job with Vertex AI, you specifymultiple machines (nodes) in atraining cluster. The training serviceallocates the resources for the machine types you specify. Your running job on agiven node is called areplica. A group of replicas with the sameconfiguration is called aworker pool.

Each replica in the training cluster is given a single role or task indistributed training. For example:

Configure a distributed training job

You can configure any Vertex AI serverless training job as a distributedtraining job bydefining multiple worker pools. You can also run distributed training within atraining pipeline or a hyperparameter tuning job.

To configure a distributed training job, define your list of worker pools(workerPoolSpecs[]),designating oneWorkerPoolSpec for each type of task:

Position inworkerPoolSpecs[]Task performed in cluster
First (workerPoolSpecs[0])Primary, chief, scheduler, or "master"
Second (workerPoolSpecs[1])Secondary, replicas, workers
Third (workerPoolSpecs[2])Parameter servers, Reduction Server
Fourth (workerPoolSpecs[3])Evaluators

You must specify a primary replica, which coordinates the work done by all theother replicas. Use the first worker pool specification only for your primaryreplica, and set itsreplicaCountto1:

{  "workerPoolSpecs": [     // `WorkerPoolSpec` for worker pool 0, primary replica, required     {       "machineSpec": {...},       "replicaCount": 1,       "diskSpec": {...},       ...     },     // `WorkerPoolSpec` for worker pool 1, optional     {},     // `WorkerPoolSpec` for worker pool 2, optional     {},     // `WorkerPoolSpec` for worker pool 3, optional     {}   ]   ...}

Specify additional worker pools

Depending on your ML framework, you may specify additional worker pools forother purposes. For example, if you are using TensorFlow, you could specifyworker pools to configure worker replicas, parameter server replicas, andevaluator replicas.

The order of the worker pools you specify in theworkerPoolSpecs[] listdetermines thetype of worker pool. Set empty values forworker pools that you don't want to use, so that you can skip them in theworkerPoolSpecs[] list in order to specify worker pools that you do want touse. For example:

If you want to specify a job that has only a primary replica and a parameterserver worker pool, you must set an empty value for worker pool 1:

{  "workerPoolSpecs": [     // `WorkerPoolSpec` for worker pool 0, required     {       "machineSpec": {...},       "replicaCount": 1,       "diskSpec": {...},       ...     },     // `WorkerPoolSpec` for worker pool 1, optional     {},     // `WorkerPoolSpec` for worker pool 2, optional     {       "machineSpec": {...},       "replicaCount": 1,       "diskSpec": {...},       ...     },     // `WorkerPoolSpec` for worker pool 3, optional     {}   ]   ...}

Reduce training time with Reduction Server

When you train a large ML model using multiple nodes, communicating gradientsbetween nodes can contribute significant latency.Reduction Server is anall-reduce algorithm that can increase throughput and reduce latency fordistributed training. Vertex AI makes Reduction Server availablein a Docker container image that you can use for one of your worker pools duringdistributed training.

To learn about how Reduction Server works, seeFaster distributed GPU training with Reduction Server on Vertex AI.

Prerequisites

You can use Reduction Server if you meet the following requirements:

  • You are performing distributed training with GPU workers.

  • Your training code uses TensorFlow or PyTorch and is configured for multi-hostdata-parallel training with GPUs usingNCCLall-reduce. (You might also be able to use other ML frameworks that use NCCL.)

  • The containers running on your primary node (workerPoolSpecs[0]) and workers(workerPoolSpecs[1]) support Reduction Server. Specifically, each containeris one of the following:

    • Aprebuilt TensorFlow trainingcontainer, version 2.3 orlater.

    • Aprebuilt Pytorch trainingcontainer, version 1.4 orlater.

    • Acustom container with NCCL 2.7 orlater and thegoogle-reduction-server package installed. You can installthis package on a custom container image by adding the following line toyour Dockerfile:

      RUN echo "deb https://packages.cloud.google.com/apt google-fast-socket main" | tee /etc/apt/sources.list.d/google-fast-socket.list && \    curl -s -L https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \    apt update && apt install -y google-reduction-server

Train using Reduction Server

To use Reduction Server, do the following when you create aserverless training resource:

  1. Specify one of the following URIs in thecontainerSpec.imageUrifield of the third worker pool(workerPoolSpecs[2]):

    • us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest
    • europe-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest
    • asia-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest

    Choosing the multi-region closest to where you are performingserverless training might reduce latency.

  2. When selecting themachine type and number of nodesfor the third worker pool, make sure that the total network bandwidth of thethird worker pool matches or exceeds the total network bandwidth of the firstand second worker pools.

    To learn about the maximum available bandwidth of each node in the secondworker pool, seeNetwork bandwidth andGPUs.

    You do not use GPUs for the Reduction Server nodes. To learn about themaximum available bandwidth of each node in the third worker pool, see the"Maximum egress bandwidth (Gbps)" columns inGeneral-purpose machinefamily.

    For example, if you configure the first and second worker pools to use 5n1-highmem-96 nodes, each with 8NVIDIA_TESLA_V100 GPUs, then each nodehas a maximum available bandwidth of 100 Gbps, for a total bandwidth of 500 Gbps. In order to match this bandwidth in the third worker pool, you mightuse 16n1-highcpu-16 nodes, each with a maximum bandwidth of 32 Gbps, for atotal bandwidth of 512 Gbps.

    We recommend that you use then1-highcpu-16 machine type for ReductionServer nodes, because this machine type offers relatively high bandwidth forits resources.

The following command provides an example of how to create aCustomJobresource that uses Reduction Server:

gcloudaicustom-jobscreate\--region=LOCATION\--display-name=JOB_NAME\--worker-pool-spec=machine-type=n1-highmem-96,replica-count=1,accelerator-type=NVIDIA_TESLA_V100,accelerator-count=8,container-image-uri=CUSTOM_CONTAINER_IMAGE_URI\--worker-pool-spec=machine-type=n1-highmem-96,replica-count=4,accelerator-type=NVIDIA_TESLA_V100,accelerator-count=8,container-image-uri=CUSTOM_CONTAINER_IMAGE_URI\--worker-pool-spec=machine-type=n1-highcpu-16,replica-count=16,container-image-uri=us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest

For more context, see theguide to creating aCustomJob.

Best practices for training using Reduction Server

To optimize the performance and efficiency of your Reduction Server trainingjobs, consider the following best practices for configuring your cluster.

Machine type and count

In Reduction Server training, each worker needs to connect to all of the reducerhosts. To minimize the number of connections on the worker host, usea machine type with the highest network bandwidth for your reducer host.

A good choice for reducer hosts is a general purpose N1/N2 VM with at least 16 vCPUthat provides32 Gbps egress bandwidth,such asn1-highcpu-16 andn2-highcpu-16. Tier 1 VM bandwidth for N1/N2 VMsincreases themaximum egress bandwidth ranging from 50 Gbps and 100 Gbps, making these agood choice for reducer VM nodes.

The total egress bandwidth of workers and reducers should be the same. Forexample, if you use 8a2-megagpu-16g VMs as workers, you should use at least25n1-highcpu-16 VMs as reducers.

`(8 worker VMs * 100 Gbps) / 32 Gbps egress = 25 reducer VMs`.

Batch small messages together

Reduction Server works best if the messages to be aggregated are sufficiently large.Most ML frameworks already provide techniques under different terminology forbatching small gradient tensors before performing all-reduce.

Horovod

Horovod supportsTensor Fusionto batch small tensors for all-reduce. Tensorsare filled in a fusion buffer until the buffer is fully filled and theall-reduce operation on the buffer executes. You can adjust the size of thefusion buffer by setting theHOROVOD_FUSION_THRESHOLD environment variable.

The recommended value for theHOROVOD_FUSION_THRESHOLD environment variableis at least 128 MB. In this case, set theHOROVOD_FUSION_THRESHOLD environmentvariable to 134217728 (128 * 1024 * 1024).

PyTorch

PyTorchDistributedDataParallelsupports batch messages as "gradient bucketing". Set thebucket_cap_mb parameterin theDistributedDataParallel constructor to control the size of your batch buckets.The default size is 25 MB.

BEST PRACTICE: The recommended value of bucket_cap_mb is 64 (64 MB).

Environment variables for your cluster

Vertex AI populates an environment variable,CLUSTER_SPEC, onevery replica to describe how the overall cluster is set up. LikeTensorFlow'sTF_CONFIG,CLUSTER_SPEC describes every replica in the cluster,including its index and role (primary replica, worker, parameter server, orevaluator).

When you run distributed training with TensorFlow,TF_CONFIG is parsed tobuildtf.train.ClusterSpec.Similarly, when you run distributed training with other ML frameworks, you mustparseCLUSTER_SPEC to populate any environment variables or settings requiredby the framework.

The format ofCLUSTER_SPEC

TheCLUSTER_SPEC environment variable is a JSON string with the followingformat:

Key Description
"cluster"

The cluster description for your custom container. As withTF_CONFIG, this object is formatted as a TensorFlow cluster specification, and can be passed to the constructor oftf.train.ClusterSpec.

The cluster description contains a list of replica names for each worker pool you specify.

"workerpool0" All distributed training jobs have one primary replica in the first worker pool.
"workerpool1" This worker pool contains worker replicas, if you specified them when creating your job.
"workerpool2" This worker pool contains parameter servers, if you specified them when creating your job.
"workerpool3" This worker pool contains evaluators, if you specified them when creating your job.
"environment" The stringcloud.
"task" Describes the task of the particular node on which your code is running. You can use this information to write code for specific workers in a distributed job. This entry is a dictionary with the following keys:
"type" The type of worker pool this task is running in. For example,"workerpool0" refers to the primary replica.
"index"

The zero-based index of the task. For example, if your training job includes two workers, this value is set to0 on one of them and1 on the other.

"trial" The identifier of the hyperparameter tuning trial currently running. When you configure hyperparameter tuning for your job, you set a number of trials to train. This value gives you a way to differentiate in your code between trials that are running. The identifier is a string value containing the trial number, starting at 1.
job

TheCustomJobSpec that you provided to create the current training job, represented as a dictionary.

CLUSTER_SPEC example

Here is an example value:

{   "cluster":{      "workerpool0":[         "cmle-training-workerpool0-ab-0:2222"      ],      "workerpool1":[         "cmle-training-workerpool1-ab-0:2222",         "cmle-training-workerpool1-ab-1:2222"      ],      "workerpool2":[         "cmle-training-workerpool2-ab-0:2222",         "cmle-training-workerpool2-ab-1:2222"      ],      "workerpool3":[         "cmle-training-workerpool3-ab-0:2222",         "cmle-training-workerpool3-ab-1:2222",         "cmle-training-workerpool3-ab-2:2222"      ]   },   "environment":"cloud",   "task":{      "type":"workerpool0",      "index":0,      "trial":"TRIAL_ID"   },   "job": {      ...   }}

The format ofTF_CONFIG

In addition toCLUSTER_SPEC, Vertex AI sets theTF_CONFIG environment variableon each replica of all distributed training jobs. Vertex AI doesnot setTF_CONFIG for single-replica training jobs.

CLUSTER_SPEC andTF_CONFIG share some values, but they have differentformats. Both environment variables include additional fields beyond whatTensorFlow requires.

Distributed training with TensorFlow works the same way when you use customcontainers as when you use a prebuilt container.

TheTF_CONFIG environment variable is a JSON string with the following format:

TF_CONFIG fields
cluster

The TensorFlow cluster description. A dictionary mapping one or more task names (chief,worker,ps, ormaster) to lists of network addresses where these tasks are running. For a given training job, this dictionary is the same on every VM.

This is a valid first argument for thetf.train.ClusterSpec constructor. Note that this dictionary never containsevaluator as a key, since evaluators are not considered part of the training cluster even if you use them for your job.

task

The task description of the VM where this environment variable is set. For a given training job, this dictionary is different on every VM. You can use this information to customize what code runs on each VM in a distributed training job. You can also use it to change the behavior of your training code for different trials of a hyperparameter tuning job.

This dictionary includes the following key-value pairs:

task fields
type

The type of task that this VM is performing. This value is set toworker on workers,ps on parameter servers, andevaluator on evaluators. On your job's master worker, the value is set to eitherchief ormaster.

index

The zero-based index of the task. For example, if your training job includes two workers, this value is set to0 on one of them and1 on the other.

trial

The ID of the hyperparameter tuning trial currently running on this VM. This field is only set if the current training job is a hyperparameter tuning job.

For hyperparameter tuning jobs, Vertex AI runs your training code repeatedly in many trials with different hyperparameters each time. This field contains the current trial number, starting at1 for the first trial.

cloud

An ID used internally by Vertex AI. You can ignore this field.

job

TheCustomJobSpec that you provided to create the current training job, represented as a dictionary.

environment

The stringcloud.

TF_CONFIG example

The following example code prints theTF_CONFIG environment variable to yourtraining logs:

importjsonimportostf_config_str=os.environ.get('TF_CONFIG')tf_config_dict=json.loads(tf_config_str)# Convert back to string just for pretty printingprint(json.dumps(tf_config_dict,indent=2))

In a hyperparameter tuning job that runs in runtime version 2.1 or later anduses a master worker, two workers, and a parameter server, this code producesthe following log for one of the workers during the first hyperparameter tuningtrial. The example output hides thejob field for conciseness and replacessome IDs with generic values.

{  "cluster": {    "chief": [      "training-workerpool0-[ID_STRING_1]-0:2222"    ],    "ps": [      "training-workerpool2-[ID_STRING_1]-0:2222"    ],    "worker": [      "training-workerpool1-[ID_STRING_1]-0:2222",      "training-workerpool1-[ID_STRING_1]-1:2222"    ]  },  "environment": "cloud",  "job": {    ...  },  "task": {    "cloud": "[ID_STRING_2]",    "index": 0,    "trial": "1",    "type": "worker"  }}

When to useTF_CONFIG

TF_CONFIG is set only for distributed training jobs.

You likely don't need to interact with theTF_CONFIG environment variable directly in your training code. Only access thetheTF_CONFIG environment variable if TensorFlow's distribution strategies andVertex AI's standard hyperparameter tuning workflow, both describedin the next sections, don't work for your job.

Distributed training

Vertex AI sets theTF_CONFIG environment variable to extend thespecifications that TensorFlow requires for distributedtraining.

To perform distributed training with TensorFlow, use thetf.distribute.StrategyAPI.In particular, we recommend that you use the Keras API together with theMultiWorkerMirroredStrategyor, if youspecify parameter servers for your job, theParameterServerStrategy.However, note that TensorFlow only provides experimental support forthese strategies.

These distribution strategies use theTF_CONFIG environment variable to assignroles to each VM in your training job and to facilitate communication betweenthe VMs. You don't need to access theTF_CONFIG environment variable directlyin your training code, because TensorFlow handles it for you.

Only parse theTF_CONFIG environment variable directly if you want tocustomize how the different VMs running your training job behave.

Hyperparameter tuning

When you run ahyperparameter tuningjob, Vertex AIprovides different arguments to your training code for each trial. Your trainingcode does not necessarily need to be aware of what trial is currently running.In addition, you can monitor the progress of hyperparameter tuningjobs in Google Cloud console.

If needed, your code can read the current trial number fromthetrial fieldof thetask field of theTF_CONFIG environment variable.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-18 UTC.