Distributed XGBoost with PySpark

Starting from version 1.7.0, xgboost supports pyspark estimator APIs.

Note

The integration is only tested on Linux distributions.

XGBoost PySpark Estimator

SparkXGBRegressor

SparkXGBRegressor is a PySpark ML estimator. It implements the XGBoost classificationalgorithm based on XGBoost python library, and it can be used in PySpark Pipelineand PySpark ML meta algorithms like CrossValidator/TrainValidationSplit/OneVsRest.

We can create aSparkXGBRegressor estimator like:

fromxgboost.sparkimportSparkXGBRegressorxgb_regressor=SparkXGBRegressor(features_col="features",label_col="label",num_workers=2,)

The above snippet creates a spark estimator which can fit on a spark dataset, and return aspark model that can transform a spark dataset and generate dataset with predictioncolumn. We can set almost all of xgboost sklearn estimator parameters asSparkXGBRegressor parameters, but some parameter such asnthread is forbidden inspark estimator, and some parameters are replaced with pyspark specific parameters such asweight_col,validation_indicator_col, for details please seeSparkXGBRegressordoc.

The following code snippet shows how to train a spark xgboost regressor model,first we need to prepare a training dataset as a spark dataframe contains“label” column and “features” column(s), the “features” column(s) must bepyspark.ml.linalg.Vectortype or spark array type or a list of feature column names.

xgb_regressor_model=xgb_regressor.fit(train_spark_dataframe)

The following code snippet shows how to predict test data using a spark xgboost regressor model,first we need to prepare a test dataset as a spark dataframe contains“features” and “label” column, the “features” column must bepyspark.ml.linalg.Vectortype or spark array type.

transformed_test_spark_dataframe=xgb_regressor_model.transform(test_spark_dataframe)

The above snippet code returns atransformed_test_spark_dataframe that contains the inputdataset columns and an appended column “prediction” representing the prediction results.

SparkXGBClassifier

SparkXGBClassifier estimator has similar API withSparkXGBRegressor, but it has somepyspark classifier specific params, e.g.raw_prediction_col andprobability_col parameters.Correspondingly, by default,SparkXGBClassifierModel transforming test dataset willgenerate result dataset with 3 new columns:

  • “prediction”: represents the predicted label.

  • “raw_prediction”: represents the output margin values.

  • “probability”: represents the prediction probability on each label.

XGBoost PySpark GPU support

XGBoost PySpark fully supports GPU acceleration. Users are not only able to enableefficient training but also utilize their GPUs for the whole PySpark pipeline includingETL and inference. In below sections, we will walk through an example of training on aSpark standalone cluster with GPU support. To get started, first we need to install someadditional packages, then we can set thedevice parameter tocuda orgpu.

Prepare the necessary packages

Aside from the PySpark and XGBoost modules, we also need thecuDF package for handling Spark dataframe. Werecommend using either Conda or Virtualenv to manage python dependencies for PySparkjobs. Please refer toHow to Manage Python Dependencies in PySparkfor more details on PySpark dependency management.

In short, to create a Python environment that can be sent to a remote cluster usingvirtualenv and pip:

python-mvenvxgboost_envsourcexgboost_env/bin/activatepipinstallpyarrowpandasvenv-packxgboost# https://docs.rapids.ai/install#pip-installpipinstallcudf-cu11--extra-index-url=https://pypi.nvidia.comvenv-pack-oxgboost_env.tar.gz

With Conda:

condacreate-y-nxgboost_env-cconda-forgeconda-packpython=3.9condaactivatexgboost_env# use conda when the supported version of xgboost (1.7) is released on conda-forgepipinstallxgboostcondainstallcudfpyarrowpandas-crapids-cnvidia-cconda-forgecondapack-f-oxgboost_env.tar.gz

Write your PySpark application

Below snippet is a small example for training xgboost model with PySpark. Notice that we areusing a list of feature names instead of vector type as the input. The parameter"device=cuda"specifically indicates that the training will be performed on a GPU.

fromxgboost.sparkimportSparkXGBRegressorspark=SparkSession.builder.getOrCreate()# read data into spark dataframetrain_data_path="xxxx/train"train_df=spark.read.parquet(data_path)test_data_path="xxxx/test"test_df=spark.read.parquet(test_data_path)# assume the label column is named "class"label_name="class"# get a list with feature column namesfeature_names=[x.nameforxintrain_df.schemaifx.name!=label_name]# create a xgboost pyspark regressor estimator and set device="cuda"regressor=SparkXGBRegressor(features_col=feature_names,label_col=label_name,num_workers=2,device="cuda",)# train and return the modelmodel=regressor.fit(train_df)# predict on test datapredict_df=model.transform(test_df)predict_df.show()

Like other distributed interfaces, thedevice parameter doesn’t support specifying ordinal as GPUs are managed by Spark instead of XGBoost (good:device=cuda, bad:device=cuda:0).

Submit the PySpark application

Assuming you have configured the Spark standalone cluster with GPU support. Otherwise, pleaserefer tospark standalone configuration with GPU support.

Starting from XGBoost 2.0.1, stage-level scheduling is automatically enabled. Therefore,if you are using Spark standalone cluster version 3.4.0 or higher, we strongly recommendconfiguring the"spark.task.resource.gpu.amount" as a fractional value. This willenable running multiple tasks in parallel during the ETL phase. An example configurationwould be"spark.task.resource.gpu.amount=1/spark.executor.cores". However, if you areusing a XGBoost version earlier than 2.0.1 or a Spark standalone cluster version below 3.4.0,you still need to set"spark.task.resource.gpu.amount" equal to"spark.executor.resource.gpu.amount".

Note

As of now, the stage-level scheduling feature in XGBoost is limited to the Spark standalone cluster mode.However, we have plans to expand its compatibility to YARN and Kubernetes once Spark 3.5.1 is officially released.

exportPYSPARK_DRIVER_PYTHON=pythonexportPYSPARK_PYTHON=./environment/bin/pythonspark-submit\--masterspark://<master-ip>:7077\--confspark.executor.cores=12\--confspark.task.cpus=1\--confspark.executor.resource.gpu.amount=1\--confspark.task.resource.gpu.amount=0.08\--archivesxgboost_env.tar.gz#environment\xgboost_app.py

The above command submits the xgboost pyspark application with the python environment created by pip or conda,specifying a request for 1 GPU and 12 CPUs per executor. So you can see, a total of 12 tasks per executor will beexecuted concurrently during the ETL phase.

Model Persistence

Similar to standard PySpark ml estimators, one can persist and reuse the model withsaveandload methods:

regressor=SparkXGBRegressor()model=regressor.fit(train_df)# save the modelmodel.save("/tmp/xgboost-pyspark-model")# load the modelmodel2=SparkXGBRankerModel.load("/tmp/xgboost-pyspark-model")

To export the underlying booster model used by XGBoost:

regressor=SparkXGBRegressor()model=regressor.fit(train_df)# the same booster object returned by xgboost.trainbooster:xgb.Booster=model.get_booster()booster.predict(...)booster.save_model("model.json")# or model.ubj, depending on your choice of format.

This booster is not only shared by other Python interfaces but also used by all theXGBoost bindings including the C, Java, and the R package. Lastly, one can extract thebooster file directly from a saved spark estimator without going through the getter:

importxgboostasxgbbst=xgb.Booster()# Loading the model saved in previous snippetbst.load_model("/tmp/xgboost-pyspark-model/model/part-00000")

Accelerate the whole pipeline for xgboost pyspark

WithRAPIDS Accelerator for Apache Spark, youcan leverage GPUs to accelerate the whole pipeline (ETL, Train, Transform) for xgboostpyspark without the need for any code modifications. Likewise, you have the option to configurethe"spark.task.resource.gpu.amount" setting as a fractional value, enabling a highernumber of tasks to be executed in parallel during the ETL phase. please refer toSubmit the PySpark application for more details.

An example submit command is shown below with additional spark configurations and dependencies:

exportPYSPARK_DRIVER_PYTHON=pythonexportPYSPARK_PYTHON=./environment/bin/pythonspark-submit\--masterspark://<master-ip>:7077\--confspark.executor.cores=12\--confspark.task.cpus=1\--confspark.executor.resource.gpu.amount=1\--confspark.task.resource.gpu.amount=0.08\--packagescom.nvidia:rapids-4-spark_2.12:24.04.1\--confspark.plugins=com.nvidia.spark.SQLPlugin\--confspark.sql.execution.arrow.maxRecordsPerBatch=1000000\--archivesxgboost_env.tar.gz#environment\xgboost_app.py

When rapids plugin is enabled, both of the JVM rapids plugin and the cuDF Python packageare required. More configuration options can be found in the RAPIDS link above along withdetails on the plugin.

Advanced Usage

XGBoost needs to repartition the input dataset to the num_workers to ensure there will benum_workers training tasks running at the same time. However, repartition is a costly operation.

If there is a scenario where reading the data from source and directly fitting it to XGBoostwithout introducing the shuffle stage, users can avoid the need for repartitioning by settingthe Spark configuration parametersspark.sql.files.maxPartitionNum andspark.sql.files.minPartitionNum to num_workers. This tells Spark to automatically partitionthe dataset into the desired number of partitions.

However, if the input dataset is skewed (i.e. the data is not evenly distributed), settingthe partition number to num_workers may not be efficient. In this case, users can settheforce_repartition=true option to explicitly force XGBoost to repartition the dataset,even if the partition number is already equal to num_workers. This ensures the data is evenlydistributed across the workers.