Distributed XGBoost with Ray
Ray is a general purpose distributed execution framework.Ray can be used to scale computations from a single node to a cluster of hundredsof nodes without changing any code.
The Python bindings of Ray come with a collection of well maintainedmachine learning libraries for hyperparameter optimization and model serving.
TheXGBoost-Ray project providesan interface to run XGBoost training and prediction jobs on a Ray cluster. It allowsto utilize distributed data representations, such asModin dataframes,as well as distributed loading from cloud storage (e.g. Parquet files).
XGBoost-Ray integrates well with hyperparameter optimization library Ray Tune, andimplements advanced fault tolerance handling mechanisms. With Ray you can scaleyour training jobs to hundreds of nodes just by adding newnodes to a cluster. You can also use Ray to leverage multi GPU XGBoost training.
Installing and starting Ray
Ray can be installed from PyPI like this:
pipinstallray
If you’re using Ray on a single machine, you don’t need to do anything else -XGBoost-Ray will automatically start a local Ray cluster when used.
If you want to use Ray on a cluster, you can use theRay cluster launcher.
Installing XGBoost-Ray
XGBoost-Ray is also available via PyPI:
pipinstallxgboost_ray
This will install all dependencies needed to run XGBoost on Ray, includingRay itself if it hasn’t been installed before.
Using XGBoost-Ray for training and prediction
XGBoost-Ray uses the same API as core XGBoost. There are only two differences:
Instead of using a
xgboost.DMatrix, you’ll use axgboost_ray.RayDMatrixobjectThere is an additional
xgboost_ray.RayParamsparameter that you can use to configure distributed training.
Simple training example
To run this simple example, you’ll need to installscikit-learn (withpipinstallsklearn).
In this example, we will load thebreast cancer datasetand train a binary classifier using two actors.
fromxgboost_rayimportRayDMatrix,RayParams,trainfromsklearn.datasetsimportload_breast_cancertrain_x,train_y=load_breast_cancer(return_X_y=True)train_set=RayDMatrix(train_x,train_y)evals_result={}bst=train({"objective":"binary:logistic","eval_metric":["logloss","error"],},train_set,evals_result=evals_result,evals=[(train_set,"train")],verbose_eval=False,ray_params=RayParams(num_actors=2,cpus_per_actor=1))bst.save_model("model.xgb")print("Final training error:{:.4f}".format(evals_result["train"]["error"][-1]))
The only differences compared to the non-distributed API arethe import statement (xgboost_ray instead ofxgboost), using theRayDMatrix instead of theDMatrix, and passing axgboost_ray.RayParams object.
The return object is a regularxgboost.Booster instance.
Simple prediction example
fromxgboost_rayimportRayDMatrix,RayParams,predictfromsklearn.datasetsimportload_breast_cancerimportxgboostasxgbdata,labels=load_breast_cancer(return_X_y=True)dpred=RayDMatrix(data,labels)bst=xgb.Booster(model_file="model.xgb")pred_ray=predict(bst,dpred,ray_params=RayParams(num_actors=2))print(pred_ray)
In this example, the data will be split across two actors. The result arraywill integrate this data in the correct order.
The RayParams object
TheRayParams object is used to configure various settings relating to the distributedtraining.
Multi GPU training
Ray automatically detects GPUs on cluster nodes.In order to start training on multiple GPUs, all you have to do isto set thegpus_per_actor parameter of theRayParams object, as wellas thenum_actors parameter for multiple GPUs:
ray_params=RayParams(num_actors=4,gpus_per_actor=1,)
This will train on four GPUs in parallel.
Note that it usually does not make sense to allocate more than one GPU per actor,as XGBoost relies on distributed libraries such as Dask or Ray to utilize multiGPU training.
Setting the number of CPUs per actor
XGBoost natively utilizes multi threading to speed up computations. Thus ifyour are training on CPUs only, there is likely no benefit in using more thanone actor per node. In that case, assuming you have a cluster of homogeneous nodes,set the number of CPUs per actor to the number of CPUs available on each node,and the number of actors to the number of nodes.
If you are using multi GPU training on a single node, divide the number ofavailable CPUs evenly across all actors. For instance, if you have 16 CPUs and4 GPUs available, each actor should access 1 GPU and 4 CPUs.
If you are using a cluster of heterogeneous nodes (with different amounts of CPUs),you might just want to use thegreatest common divisorfor the number of CPUs per actor. E.g. if you have a cluster of three nodes with4, 8, and 12 CPUs, respectively, you’d start 6 actors with 4 CPUs each for maximumCPU utilization.
Fault tolerance
XGBoost-Ray supports two fault tolerance modes. Innon-elastic training, whenevera training actor dies (e.g. because the node goes down), the training job will stop,XGBoost-Ray will wait for the actor (or its resources) to become available again(this might be on a different node), and then continue training once all actors are back.
Inelastic-training, whenever a training actor dies, the rest of the actorscontinue training without the dead actor. If the actor comes back, it will be re-integratedinto training again.
Please note that in elastic-training this means that you will train on fewer datafor some time. The benefit is that you can continue training even if a node goesaway for the remainder of the training run, and don’t have to wait until it is back up again.In practice this usually leads to a very minor decrease in accuracy but a much shortertraining time compared to non-elastic training.
Both training modes can be configured using the respectivexgboost_ray.RayParamsparameters.
Hyperparameter optimization
XGBoost-Ray integrates well withhyperparameter optimization framework Ray Tune.Ray Tune uses Ray to start multiple distributed trials with different hyperparameter configurations.If used with XGBoost-Ray, these trials will then start their own distributed trainingjobs.
XGBoost-Ray automatically reports evaluation results back to Ray Tune. There’s onlya few things you need to do:
Put your XGBoost-Ray training call into a function accepting parameter configurations(
train_modelin the example below).Create a
xgboost_ray.RayParamsobject (ray_paramsin the example below).Define the parameter search space (
configdict in the example below).- Call
tune.run(): The
metricparameter should contain the metric you’d like to optimize.Usually this consists of the prefix passed to theevalsargument ofxgboost_ray.train(), and aneval_metricpassed in theXGBoost parameters (train-errorin the example below).The
modeshould either beminormax, depending on whetheryou’d like to minimize or maximize the metricThe
resources_per_actorshould be set usingray_params.get_tune_resources().This will make sure that each trial has the necessary resources available tostart their distributed training jobs.
- Call
fromxgboost_rayimportRayDMatrix,RayParams,trainfromsklearn.datasetsimportload_breast_cancernum_actors=4num_cpus_per_actor=1ray_params=RayParams(num_actors=num_actors,cpus_per_actor=num_cpus_per_actor)deftrain_model(config):train_x,train_y=load_breast_cancer(return_X_y=True)train_set=RayDMatrix(train_x,train_y)evals_result={}bst=train(params=config,dtrain=train_set,evals_result=evals_result,evals=[(train_set,"train")],verbose_eval=False,ray_params=ray_params)bst.save_model("model.xgb")fromrayimporttune# Specify the hyperparameter search space.config={"tree_method":"approx","objective":"binary:logistic","eval_metric":["logloss","error"],"eta":tune.loguniform(1e-4,1e-1),"subsample":tune.uniform(0.5,1.0),"max_depth":tune.randint(1,9)}# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`analysis=tune.run(train_model,config=config,metric="train-error",mode="min",num_samples=4,resources_per_trial=ray_params.get_tune_resources())print("Best hyperparameters",analysis.best_config)
Ray Tune supports varioussearch algorithms and libraries (e.g. BayesOpt, Tree-Parzen estimators),smart schedulers like successive halving,and other features. Please refer to theRay Tune documentationfor more information.