Distributed XGBoost with Dask
Dask is a parallel computing library built on Python. Dask allowseasy management of distributed workers and excels at handling large distributed datascience workflows. The implementation in XGBoost originates fromdask-xgboost with some extended functionalities and adifferent interface. The tutorial here focuses on basic usage of dask with CPU treealgorithms. For an overview of GPU based training and internal workings, seeA New,Official Dask API for XGBoost.
Note
The integration is not tested with Windows.
Contents
Requirements
Dask can be installed using either pip or conda (see the daskinstallationdocumentation for more information). Foraccelerating XGBoost with GPUs,dask-cuda isrecommended for creating GPU clusters.
Overview
A dask cluster consists of three different components: a centralized scheduler, one ormore workers, and one or more clients which act as the user-facing entry point for submittingtasks to the cluster. When using XGBoost with dask, one needs to call the XGBoost dask interfacefrom the client side. Below is a small example which illustrates basic usage of running XGBooston a dask cluster:
fromxgboostimportdaskasdxgbimportdask.arrayasdaimportdask.distributedif__name__=="__main__":cluster=dask.distributed.LocalCluster()client=dask.distributed.Client(cluster)# X and y must be Dask dataframes or arraysnum_obs=1e5num_features=20X=da.random.random(size=(num_obs,num_features),chunks=(1000,num_features))y=da.random.random(size=(num_obs,1),chunks=(1000,1))dtrain=dxgb.DaskDMatrix(client,X,y)# or# dtrain = dxgb.DaskQuantileDMatrix(client, X, y)output=dxgb.train(client,{"verbosity":2,"tree_method":"hist","objective":"reg:squarederror"},dtrain,num_boost_round=4,evals=[(dtrain,"train")],)
Here we first create a cluster in single-node mode withdistributed.LocalCluster, then connect adistributed.Client tothis cluster, setting up an environment for later computation. Notice that the clusterconstruction is guarded by__name__=="__main__", which is necessary otherwise theremight be obscure errors.
We then create axgboost.dask.DaskDMatrix object and pass it toxgboost.dask.train(), along with some other parameters, much like XGBoost’snormal, non-dask interface. Unlike that interface,data andlabel must be eitherDaskDataFrame orDaskArray instances.
The primary difference with XGBoost’s dask interface iswe pass our dask client as an additional argument for carrying out the computation. Note that ifclient is set toNone, XGBoost will use the default client returned by dask.
There are two sets of APIs implemented in XGBoost. The first set is functional APIillustrated in above example. Given the data and a set of parameters, thetrain functionreturns a model and the computation history as a Python dictionary:
{"booster":Booster,"history":dict,}
For prediction, pass theoutput returned bytrain intoxgboost.dask.predict():
prediction=dxgb.predict(client,output,dtrain)# Or equivalently, pass ``output['booster']``:prediction=dxgb.predict(client,output['booster'],dtrain)
Eliminating the construction of DaskDMatrix is also possible, this can make thecomputation a bit faster when meta information likebase_margin is not needed:
prediction=dxgb.predict(client,output,X)# Use inplace version.prediction=dxgb.inplace_predict(client,output,X)
Hereprediction is a daskArray object containing predictions from model if inputis aDaskDMatrix orda.Array. When putting dask collection directly into thepredict function or usingxgboost.dask.inplace_predict(), the output typedepends on input data. See next section for details.
Alternatively, XGBoost also implements the Scikit-Learn interface withDaskXGBClassifier,DaskXGBRegressor,DaskXGBRanker and 2 random forest variances. This wrapper issimilar to the single node Scikit-Learn interface in xgboost, with dask collection asinputs and has an additionalclient attribute. See following sections andXGBoost Dask Feature Walkthrough for more examples.
Running prediction
In previous example we usedDaskDMatrix as input topredict function. Inpractice, it’s also possible to callpredict function directly on dask collectionslikeArray andDataFrame and might have better prediction performance. WhenDataFrame is used as prediction input, the result is a daskSeries instead ofarray. Also, there’s in-place predict support on dask interface, which can help reducingboth memory usage and prediction time.
# dtrain is the DaskDMatrix defined above.prediction=dxgb.predict(client,booster,dtrain)
or equivalently:
# where X is a dask DataFrame or dask Array.prediction=dxgb.predict(client,booster,X)
Also for inplace prediction:
# where X is a dask DataFrame or dask Array backed by cupy or cuDF.booster.set_param({"device":"cuda"})prediction=dxgb.inplace_predict(client,booster,X)
When input isda.Array object, output is alwaysda.Array. However, if the inputtype isdd.DataFrame, output can bedd.Series,dd.DataFrame orda.Array,depending on output shape. For example, when SHAP-based prediction is used, the returnvalue can have 3 or 4 dimensions , in such cases anArray is always returned.
The performance of running prediction, either usingpredict orinplace_predict, issensitive to number of blocks. Internally, it’s implemented usingda.map_blocks anddd.map_partitions. When number of partitions is large and each of them have onlysmall amount of data, the overhead of calling predict becomes visible. On the other hand,if not using GPU, the number of threads used for prediction on each block matters. Rightnow, xgboost uses single thread for each partition. If the number of blocks on eachworkers is smaller than number of cores, then the CPU workers might not be fully utilized.
One simple optimization for running consecutive predictions is usingdistributed.Future:
dataset=[X_0,X_1,X_2]booster_f=client.scatter(booster,broadcast=True)futures=[]forXindataset:# Here we pass in a future instead of concrete boostershap_f=dxgb.predict(client,booster_f,X,pred_contribs=True)futures.append(shap_f)results=client.gather(futures)
This is only available on functional interface, as the Scikit-Learn wrapper doesn’t knowhow to maintain a valid future for booster. To obtain the booster object fromScikit-Learn wrapper object:
cls=dxgb.DaskXGBClassifier()cls.fit(X,y)booster=cls.get_booster()
Scikit-Learn Estimator Interface
As mentioned previously, there’s another interface that mimics the scikit-learn estimatorswith higher level of of abstraction. The interface is easier to use compared to thefunctional interface but with more constraints. It’s worth mentioning that, although theinterface mimics scikit-learn estimators, it doesn’t work with normal scikit-learnutilities likeGridSearchCV as scikit-learn doesn’t understand distributed dask datacollection.
fromdistributedimportLocalCluster,Clientfromxgboostimportdaskasdxgbdefmain(client:Client)->None:X,y=load_data()clf=dxgb.DaskXGBClassifier(n_estimators=100,tree_method="hist")clf.client=client# assign the clientclf.fit(X,y,eval_set=[(X,y)])proba=clf.predict_proba(X)if__name__=="__main__":withLocalCluster()ascluster:withClient(cluster)asclient:main(client)
GPU acceleration
For most of the use cases with GPUs, theDask-CUDA project should be used to create the cluster, which automatically configures the correct device ordinal for worker processes. As a result, users should NOT specify the ordinal (good:device=cuda, bad:device=cuda:1). SeeExample of training with Dask on GPU andUse scikit-learn regressor interface with GPU histogram tree method for worked examples.
Working with other clusters
Using Dask’sLocalCluster is convenient for getting started quickly on a local machine. Once you’re ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can useDask-CUDA, for example, for GPUs and you can use Dask Cloud Provider todeploy Dask clusters in the cloud. See theDask documentation for a more comprehensive list.
In the example below, aKubeCluster is used fordeploying Dask on Kubernetes:
fromdask_kubernetes.operatorimportKubeCluster# Need to install the ``dask-kubernetes`` packagefromdask_kubernetes.operator.kubecluster.kubeclusterimportCreateModefromdask.distributedimportClientfromxgboostimportdaskasdxgbimportdask.arrayasdadefmain():'''Connect to a remote kube cluster with GPU nodes and run training on it.'''m=1000n=10kWorkers=2# assuming you have 2 GPU nodes on that cluster.# You need to work out the worker-spec yourself. See document in dask_kubernetes for# its usage. Here we just want to show that XGBoost works on various clusters.# See notes below for why we use pre-allocated cluster.withKubeCluster(name="xgboost-test",image="my-image-name:latest",n_workers=kWorkers,create_mode=CreateMode.CONNECT_ONLY,shutdown_on_close=False,)ascluster:withClient(cluster)asclient:X=da.random.random(size=(m,n),chunks=100)y=X.sum(axis=1)regressor=dxgb.DaskXGBRegressor(n_estimators=10,missing=0.0)regressor.client=clientregressor.set_params(tree_method='hist',device="cuda")regressor.fit(X,y,eval_set=[(X,y)])if__name__=='__main__':# Launch the kube cluster on somewhere like GKE, then run this as client process.# main function will connect to that cluster and start training xgboost model.main()
Different cluster classes might have subtle differences like network configuration, orspecific cluster implementation might contains bugs that we are not aware of. Open anissue if such case is found and there’s no documentation on how to resolve it in thatcluster implementation.
An interesting aspect of the Kubernetes cluster is that the pods may become availableafter the Dask workflow has begun, which can cause issues with distributed XGBoost sinceXGBoost expects the nodes used by input data to remain unchanged during training. To useKubernetes clusters, it is necessary to wait for all the pods to be online beforesubmitting XGBoost tasks. One can either create a wait function in Python or simplypre-allocate a cluster with k8s tools (likekubectl) before running dask workflows. Topre-allocate a cluster, we can first generate the cluster spec using dask kubernetes:
importjsonfromdask_kubernetes.operatorimportmake_cluster_specspec=make_cluster_spec(name="xgboost-test",image="my-image-name:latest",n_workers=16)withopen("cluster-spec.json","w")asfd:json.dump(spec,fd,indent=2)
kubectlapply-f./cluster-spec.json
Check whether the pods are available:
kubectlgetpods
Once all pods have been initialized, the Dask XGBoost workflow can be run, as in theprevious example. It is important to ensure that the cluster sets the parametercreate_mode=CreateMode.CONNECT_ONLY and optionallyshutdown_on_close=False if youdo not want to shut down the cluster after a single job.
Threads
XGBoost has built in support for parallel computation through threads by the settingnthread parameter (n_jobs for scikit-learn). If these parameters are set, theywill override the configuration in Dask. For example:
withdask.distributed.LocalCluster(n_workers=7,threads_per_worker=4)ascluster:
There are 4 threads allocated for each dask worker. Then by default XGBoost will use 4threads in each process for training. But ifnthread parameter is set:
output=dxgb.train(client,{"verbosity":1,"nthread":8,"tree_method":"hist"},dtrain,num_boost_round=4,evals=[(dtrain,"train")],)
XGBoost will use 8 threads in each training process.
Working with asyncio
Added in version 1.2.0.
XGBoost’s dask interface supports the newasyncio in Python and can beintegrated into asynchronous workflows. For using dask with asynchronous operations,please refer tothis dask example and document indistributed. To use XGBoost’s Daskinterface asynchronously, theclient which is passed as an argument for training andprediction must be operating in asynchronous mode by specifyingasynchronous=True whentheclient is created (example below). All functions (includingDaskDMatrix)provided by the functional interface will then return coroutines which can then be awaitedto retrieve their result. Please note that XGBoost is a compute-bounded application, whereparallelism is more important than concurrency. The support forasyncio is more aboutcompatibility instead of performance gain.
Functional interface:
asyncwithdask.distributed.Client(scheduler_address,asynchronous=True)asclient:X,y=generate_array()m=awaitdxgb.DaskDMatrix(client,X,y)output=awaitdxgb.train(client,{},dtrain=m)with_m=awaitdxgb.predict(client,output,m)with_X=awaitdxgb.predict(client,output,X)inplace=awaitdxgb.inplace_predict(client,output,X)# Use ``client.compute`` instead of the ``compute`` method from dask collectionprint(awaitclient.compute(with_m))
While for the Scikit-Learn interface, trivial methods likeset_params and accessing classattributes likeevals_result() do not requireawait. Other methods involvingactual computation will return a coroutine and hence require awaiting:
asyncwithdask.distributed.Client(scheduler_address,asynchronous=True)asclient:X,y=generate_array()regressor=awaitdxgb.DaskXGBRegressor(verbosity=1,n_estimators=2)regressor.set_params(tree_method='hist')# trivial method, synchronous operationregressor.client=client# accessing attribute, synchronous operationregressor=awaitregressor.fit(X,y,eval_set=[(X,y)])prediction=awaitregressor.predict(X)# Use `client.compute` instead of the `compute` method from dask collectionprint(awaitclient.compute(prediction))
Evaluation and Early Stopping
Added in version 1.3.0.
The Dask interface allows the use of validation sets that are stored in distributed collections (Dask DataFrame or Dask Array). These can be used for evaluation and early stopping.
To enable early stopping, pass one or more validation sets containingDaskDMatrix objects.
importdask.arrayasdafromxgboostimportdaskasdxgbnum_rows=1e6num_features=100num_partitions=10rows_per_chunk=num_rows/num_partitionsdata=da.random.random(size=(num_rows,num_features),chunks=(rows_per_chunk,num_features))labels=da.random.random(size=(num_rows,1),chunks=(rows_per_chunk,1))X_eval=da.random.random(size=(num_rows,num_features),chunks=(rows_per_chunk,num_features))y_eval=da.random.random(size=(num_rows,1),chunks=(rows_per_chunk,1))dtrain=dxgb.DaskDMatrix(client=client,data=data,label=labels)dvalid=dxgb.DaskDMatrix(client=client,data=X_eval,label=y_eval)result=dxgb.train(client=client,params={"objective":"reg:squarederror",},dtrain=dtrain,num_boost_round=10,evals=[(dvalid,"valid1")],early_stopping_rounds=3)
When validation sets are provided toxgboost.dask.train() in this way, the model object returned byxgboost.dask.train() contains a history of evaluation metrics for each validation set, across all boosting rounds.
print(result["history"])# {'valid1': OrderedDict([('rmse', [0.28857, 0.28858, 0.288592, 0.288598])])}
If early stopping is enabled by also passingearly_stopping_rounds, you can check the best iteration in the returned booster.
booster=result["booster"]print(booster.best_iteration)best_model=booster[:booster.best_iteration]
Other customization
XGBoost dask interface accepts other advanced features found in single node Pythoninterface, including callback functions, custom evaluation metric and objective:
defeval_error_metric(predt,dtrain:xgb.DMatrix):label=dtrain.get_label()r=np.zeros(predt.shape)gt=predt>0.5r[gt]=1-label[gt]le=predt<=0.5r[le]=label[le]return'CustomErr',np.sum(r)# custom callbackearly_stop=xgb.callback.EarlyStopping(rounds=early_stopping_rounds,metric_name="CustomErr",data_name="Train",save_best=True,)booster=dxgb.train(client,params={"objective":"binary:logistic","eval_metric":["error","rmse"],"tree_method":"hist",},dtrain=D_train,evals=[(D_train,"Train"),(D_valid,"Valid")],feval=eval_error_metric,# custom evaluation metricnum_boost_round=100,callbacks=[early_stop],)
Hyper-parameter tuning
Seehttps://github.com/coiled/dask-xgboost-nyctaxi for a set of examples of using XGBoostwith dask and optuna.
Learning to Rank
Added in version 3.0.0.
Note
Position debiasing is not yet supported.
There are two operation modes in the Dask learning to rank for performance reasons. Thedifference is whether a distributed global sort is needed. Please seeDistributed Training forhow ranking works with distributed training in general. Below we will discuss some of theDask-specific features.
First, if you use theDaskQuantileDMatrix interface or theDaskXGBRanker withallow_group_split set toTrue,XGBoost will try to sort and group the samples for each worker based on the query ID. Thismode tries to skip the global sort and sort only worker-local data, and hence nointer-worker data shuffle. Please note that even worker-local sort is costly, particularlyin terms of memory usage as there’s no spilling whensort_values() is used, and we need to concatenate thedata. XGBoost first checks whether the QID is already sorted before actually performingthe sorting operation. One can choose this if the query groups are relatively consecutive,meaning most of the samples within a query group are close to each other and are likely tobe resided to the same worker. Don’t use this if you have performed a random shuffle onyour data.
If the input data is random, then there’s no way we can guarantee most of data within thesame group being in the same worker. For large query groups, this might not be anissue. But for small query groups, it’s possible that each worker gets only one or twosamples from their group for all groups, which can lead to disastrous performance. In thatcase, we can partition the data according to query group, which is the default behavior oftheDaskXGBRanker unless theallow_group_split is set toTrue. This mode performs a sort and a groupby on the entire dataset in addition to anencoding operation for the query group IDs. Along with partition fragmentation, thisoption can lead to slow performance. SeeLearning to rank with the Dask Interface for a worked example.
Troubleshooting
In some environments XGBoost might fail to resolve the IP address of the scheduler, asymptom is user receiving
OSError:[Errno99]Cannotassignrequestedaddresserrorduring training. A quick workaround is to specify the address explicitly. To do thatthe collectiveConfigis used:Added in version 3.0.0.
importdaskfromdistributedimportClientfromxgboostimportdaskasdxgbfromxgboost.collectiveimportConfig# let xgboost know the scheduler addresscoll_cfg=Config(retry=1,timeout=20,tracker_host_ip="10.23.170.98",tracker_port=0)withClient(scheduler_file="sched.json")asclient:reg=dxgb.DaskXGBRegressor(coll_cfg=coll_cfg)
Please note that XGBoost requires a different port than dask. By default, on a unix-likesystem XGBoost uses the port 0 to find available ports, which may fail if a user isrunning in a restricted docker environment. In this case, please open additional portsin the container and specify it as in the above snippet.
If you encounter a NCCL system error while training with GPU enabled, which usuallyincludes the error messageNCCL failure: unhandled system error, you can specify itsnetwork configuration using one of the environment variables listed in theNCCLdocument such asthe
NCCL_SOCKET_IFNAME. In addition, you can useNCCL_DEBUGto obtain debuglogs.If NCCL fails to initialize in a container environment, it might be caused by limitedsystem shared memory. With docker, one can try the flag:–shm-size=4g.
MIG (Multi-Instance GPU) is not yet supported by NCCL. You will receive an error messagethat includesMultiple processes within a communication group … upon initialization.
Starting from version 2.1.0, to reduce the size of the binary wheel, the XGBoost package(installed using pip) loads NCCL from the environment instead of bundling itdirectly. This means that if you encounter an error message like“Failed to load nccl …”, it indicates that NCCL is not installed or properlyconfigured in your environment.
To resolve this issue, you can install NCCL using pip:
pipinstallnvidia-nccl-cu12# (or with any compatible CUDA version)The default conda installation of XGBoost should not encounter this error. If you areusing a customized XGBoost, please make sure one of the followings is true:
XGBoost is NOT compiled with theUSE_DLOPEN_NCCL flag.
Thedmlc_nccl_path parameter is set to full NCCL path when initializing the collective.
Here are some additional tips for troubleshooting NCCL dependency issues:
Check the NCCL installation path and verify that it’s installed correctly. We try tofind NCCL by using
fromnvidia.ncclimportlibin Python when XGBoost is installedusing pip.Ensure that you have the correct CUDA version installed. NCCL requires a compatibleCUDA version to function properly.
If you are not using distributed training with XGBoost and yet see this error, pleaseopen an issue on GitHub.
If you continue to encounter NCCL dependency issues, please open an issue on GitHub.
IPv6 Support
Added in version 1.7.0.
XGBoost has initial IPv6 support for the dask interface on Linux. Due to most of thecluster support for IPv6 is partial (dual stack instead of IPv6 only), we requireadditional user configuration similar toTroubleshooting to help XGBoost obtain thecorrect address information:
importdaskfromdistributedimportClientfromxgboostimportdaskasdxgb# let xgboost know the scheduler address, use the same bracket format as dask.withdask.config.set({"xgboost.scheduler_address":"[fd20:b6f:f759:9800::]"}):withClient("[fd20:b6f:f759:9800::]")asclient:reg=dxgb.DaskXGBRegressor(tree_method="hist")
When GPU is used, XGBoost employsNCCL as theunderlying communication framework, which may require some additional configuration viaenvironment variable depending on the setting of the cluster. Please note that IPv6support is Unix only.
Logging the evaluation results
By default, the Dask interface prints evaluation results in the scheduler process. Thismakes it difficult for a user to monitor training progress. We can define customevaluation monitors using callback functions. SeeExample of forwarding evaluation logs to the client for a worked example on how toforward the logs to the client process. In the example, there are two potential solutionsusing Dask builtin methods, includingdistributed.Client.forward_logging() anddistributed.print(). Both of them have some caveats but can be a good startingpoint for developing more sophisticated methods like writing to files.
Why is the initialization ofDaskDMatrix so slow and throws weird errors
The dask API in XGBoost requires construction ofDaskDMatrix. With the Scikit-Learninterface,DaskDMatrix is implicitly constructed for all input data during thefit orpredict steps. You might have observed thatDaskDMatrix construction can take large amounts of time,and sometimes throws errors that don’t seem to be relevant toDaskDMatrix. Here is abrief explanation for why. By default most dask computations arelazily evaluated, whichmeans that computation is not carried out until you explicitly ask for a result by, for example,callingcompute(). See the previous link for details in dask, andthis wiki for information on the general concept of lazy evaluation.TheDaskDMatrix constructor forces lazy computations to be evaluated, which means it’swhere all your earlier computation actually being carried out, including operations likedd.read_csv(). To isolate the computation inDaskDMatrix from other lazycomputations, one can explicitly wait for results of input data before constructing aDaskDMatrix.Also dask’sdiagnostics dashboard can be used tomonitor what operations are currently being performed.
Reproducible Result
In a single node mode, we can always expect the same training result between runs as alongas the underlying platforms are the same. However, it’s difficult to obtain reproducibleresult in a distributed environment, since the tasks might get different machineallocation or have different amount of available resources during differentsessions. There are heuristics and guidelines on how to achieve it but no proven methodfor guaranteeing such deterministic behavior. The Dask interface in XGBoost tries toprovide reproducible result with best effort. This section highlights some known criteriaand try to share some insights into the issue.
There are primarily two different tasks for XGBoost the carry out, training andinference. Inference is reproducible given the same software and hardware along with thesame run-time configurations. The remaining of this section will focus on training.
Many of the challenges come from the fact that we are using approximation algorithms, Thesketching algorithm used to find histogram bins is an approximation to the exact quantilealgorithm, theAUC metric in a distributed environment is an approximation to the exactAUC score, and floating-point number is an approximation to real number. Floating-pointis an issue as its summation is not associative, meaning\((a + b) + c\) does notnecessarily equal to\(a + (b + c)\), even though this property holds true for realnumber. As a result, whenever we change the order of a summation, the result candiffer. This imposes the requirement that, in order to have reproducible output fromXGBoost, the entire pipeline needs to be reproducible.
The software stack is the same for each runs. This goes without saying. XGBoost mightgenerate different outputs between different versions. This is expected as we mightchange the default value of hyper-parameter, or the parallel strategy that generatesdifferent floating-point result. We guarantee the correctness the algorithms, but thereare lots of wiggle room for the final output. The situation is similar for manydependencies, for instance, the random number generator might differ from platform toplatform.
The hardware stack is the same for each runs. This includes the number of workers, andthe amount of available resources on each worker. XGBoost can generate different resultsusing different number of workers. This is caused by the approximation issue mentionedpreviously.
Similar to the hardware constraint, the network topology is also a factor in finaloutput. If we change topology the workers might be ordered differently, leading todifferent ordering of floating-point operations.
The random seed used in various place of the pipeline.
The partitioning of data needs to be reproducible. This is related to the availableresources on each worker. Dask might partition the data differently for each runaccording to its own scheduling policy. For instance, if there are some additional tasksin the cluster while you are running the second training session for XGBoost, some ofthe workers might have constrained memory and Dask may not push the training data forXGBoost to that worker. This change in data partitioning can lead to different outputmodels. If you are using a shared Dask cluster, then the result is likely to varybetween runs.
The operations performed on dataframes need to be reproducible. There are someoperations likeDataFrame.merge not being deterministic on parallel hardwares like GPUwhere the order of the index might differ from run to run.
It’s expected to have different results when training the model in a distributedenvironment than training the model using a single node due to aforementioned criteria.
Memory Usage
Here are some practices on reducing memory usage with dask and xgboost.
In a distributed work flow, data is best loaded by dask collections directly instead ofloaded by client process. When loading with client process is unavoidable, use
client.scatterto distribute data from client process to workers. See [2] for anice summary.When using GPU input, like dataframe loaded by
dask_cudf, you can tryxgboost.dask.DaskQuantileDMatrixas a drop in replacement forDaskDMatrixto reduce overall memory usage. SeeExample of training with Dask on GPU for an example.Use in-place prediction when possible.
References: