Note
Go to the endto download the full example code.
Collection of examples for using xgboost.spark estimator interface
@author: Weichen Xu
importnumpyasnpimportsklearn.datasetsfrompyspark.ml.evaluationimportMulticlassClassificationEvaluator,RegressionEvaluatorfrompyspark.ml.linalgimportVectorsfrompyspark.sqlimportDataFrame,SparkSessionfrompyspark.sql.functionsimportrandfromsklearn.model_selectionimporttrain_test_splitfromxgboost.sparkimportSparkXGBClassifier,SparkXGBRegressorspark=SparkSession.builder.master("local[*]").getOrCreate()defcreate_spark_df(X:np.ndarray,y:np.ndarray)->DataFrame:returnspark.createDataFrame(spark.sparkContext.parallelize([(Vectors.dense(features),float(label))forfeatures,labelinzip(X,y)]),["features","label"],)# load diabetes dataset (regression dataset)diabetes_X,diabetes_y=sklearn.datasets.load_diabetes(return_X_y=True)diabetes_X_train,diabetes_X_test,diabetes_y_train,diabetes_y_test=train_test_split(diabetes_X,diabetes_y,test_size=0.3,shuffle=True)diabetes_train_spark_df=create_spark_df(diabetes_X_train,diabetes_y_train)diabetes_test_spark_df=create_spark_df(diabetes_X_test,diabetes_y_test)# train xgboost regressor modelxgb_regressor=SparkXGBRegressor(max_depth=5)xgb_regressor_model=xgb_regressor.fit(diabetes_train_spark_df)transformed_diabetes_test_spark_df=xgb_regressor_model.transform(diabetes_test_spark_df)regressor_evaluator=RegressionEvaluator(metricName="rmse")print(f"regressor rmse={regressor_evaluator.evaluate(transformed_diabetes_test_spark_df)}")diabetes_train_spark_df2=diabetes_train_spark_df.withColumn("validationIndicatorCol",rand(1)>0.7)# train xgboost regressor model with validation datasetxgb_regressor2=SparkXGBRegressor(max_depth=5,validation_indicator_col="validationIndicatorCol")xgb_regressor_model2=xgb_regressor2.fit(diabetes_train_spark_df2)transformed_diabetes_test_spark_df2=xgb_regressor_model2.transform(diabetes_test_spark_df)print(f"regressor2 rmse={regressor_evaluator.evaluate(transformed_diabetes_test_spark_df2)}")# load iris dataset (classification dataset)iris_X,iris_y=sklearn.datasets.load_iris(return_X_y=True)iris_X_train,iris_X_test,iris_y_train,iris_y_test=train_test_split(iris_X,iris_y,test_size=0.3,shuffle=True)iris_train_spark_df=create_spark_df(iris_X_train,iris_y_train)iris_test_spark_df=create_spark_df(iris_X_test,iris_y_test)# train xgboost classifier modelxgb_classifier=SparkXGBClassifier(max_depth=5)xgb_classifier_model=xgb_classifier.fit(iris_train_spark_df)transformed_iris_test_spark_df=xgb_classifier_model.transform(iris_test_spark_df)classifier_evaluator=MulticlassClassificationEvaluator(metricName="f1")print(f"classifier f1={classifier_evaluator.evaluate(transformed_iris_test_spark_df)}")iris_train_spark_df2=iris_train_spark_df.withColumn("validationIndicatorCol",rand(1)>0.7)# train xgboost classifier model with validation datasetxgb_classifier2=SparkXGBClassifier(max_depth=5,validation_indicator_col="validationIndicatorCol")xgb_classifier_model2=xgb_classifier2.fit(iris_train_spark_df2)transformed_iris_test_spark_df2=xgb_classifier_model2.transform(iris_test_spark_df)print(f"classifier2 f1={classifier_evaluator.evaluate(transformed_iris_test_spark_df2)}")spark.stop()