Note

Go to the endto download the full example code.

Experimental support for distributed training with external memory

Added in version 3.0.0.

Seethe tutorial for more details. To run theexample, following packages in addition to XGBoost native dependencies are required:

  • scikit-learn

  • loky

Ifdevice iscuda, following are also needed:

  • cupy

  • rmm

  • cuda-python

Not shown in this example, but you should pay attention to NUMA configuration asdiscussed in the tutorial.

importargparseimportmultiprocessingasmpimportosimportsysimporttempfileimporttracebackfromfunctoolsimportpartial,update_wrapper,wrapsfromtypingimportCallable,List,ParamSpec,Tuple,TypeVarimportnumpyasnpfromlokyimportget_reusable_executorfromsklearn.datasetsimportmake_regressionimportxgboostfromxgboostimportcollectiveascollfromxgboost.trackerimportRabitTrackerdefdevice_mem_total()->int:"""The total number of bytes of memory this GPU has."""importcuda.bindings.runtimeascudartstatus,free,total=cudart.cudaMemGetInfo()ifstatus!=cudart.cudaError_t.cudaSuccess:raiseRuntimeError(cudart.cudaGetErrorString(status))returntotaldefmake_batches(n_samples_per_batch:int,n_features:int,n_batches:int,tmpdir:str,rank:int)->List[Tuple[str,str]]:files:List[Tuple[str,str]]=[]rng=np.random.RandomState(rank)foriinrange(n_batches):X,y=make_regression(n_samples_per_batch,n_features,random_state=rng)X_path=os.path.join(tmpdir,f"X-r{rank}-{i}.npy")y_path=os.path.join(tmpdir,f"y-r{rank}-{i}.npy")np.save(X_path,X)np.save(y_path,y)files.append((X_path,y_path))returnfilesclassIterator(xgboost.DataIter):"""A custom iterator for loading files in batches."""def__init__(self,device:str,file_paths:List[Tuple[str,str]])->None:self.device=deviceself._file_paths=file_pathsself._it=0# XGBoost will generate some cache files under the current directory with the# prefix "cache"super().__init__(cache_prefix=os.path.join(".","cache"))defload_file(self)->Tuple[np.ndarray,np.ndarray]:"""Load a single batch of data."""X_path,y_path=self._file_paths[self._it]# When the `ExtMemQuantileDMatrix` is used, the device must match. GPU cannot# consume CPU input data and vice-versa.ifself.device=="cpu":X=np.load(X_path)y=np.load(y_path)else:X=cp.load(X_path)y=cp.load(y_path)assertX.shape[0]==y.shape[0]returnX,ydefnext(self,input_data:Callable)->bool:"""Advance the iterator by 1 step and pass the data to XGBoost.  This function        is called by XGBoost during the construction of ``DMatrix``        """ifself._it==len(self._file_paths):# return False to let XGBoost know this is the end of iterationreturnFalse# input_data is a keyword-only function passed in by XGBoost and has the similar# signature to the ``DMatrix`` constructor.X,y=self.load_file()input_data(data=X,label=y)self._it+=1returnTruedefreset(self)->None:"""Reset the iterator to its beginning"""self._it=0defsetup_rmm()->None:"""Setup RMM for GPU-based external memory training.    It's important to use RMM with `CudaAsyncMemoryResource` or `ArenaMemoryResource`    for GPU-based external memory to improve performance. If XGBoost is not built with    RMM support, a warning is raised when constructing the `DMatrix`.    """importrmmfromrmm.allocators.cupyimportrmm_cupy_allocatorfromrmm.mrimportArenaMemoryResourceifnotxgboost.build_info()["USE_RMM"]:returntotal=device_mem_total()mr=rmm.mr.CudaMemoryResource()mr=ArenaMemoryResource(mr,arena_size=int(total*0.9))rmm.mr.set_current_device_resource(mr)# Set the allocator for cupy as well.cp.cuda.set_allocator(rmm_cupy_allocator)R=TypeVar("R")P=ParamSpec("P")deftry_run(fn:Callable[P,R])->Callable[P,R]:"""Loky aborts the process without printing out any error message if there's an    exception.    """@wraps(fn)definner(*args:P.args,**kwargs:P.kwargs)->R:try:returnfn(*args,**kwargs)exceptExceptionase:print(traceback.format_exc(),file=sys.stderr)raiseRuntimeError("Running into exception in worker.")fromereturninner@try_rundefhist_train(worker_idx:int,tmpdir:str,device:str,rabit_args:dict)->None:"""The hist tree method can use a special data structure `ExtMemQuantileDMatrix` for    faster initialization and lower memory usage.    """# Make sure XGBoost is using RMM for all allocations.withcoll.CommunicatorContext(**rabit_args),xgboost.config_context(use_rmm=True):# Generate the data for demonstration. The sythetic data is sharded by workers.files=make_batches(n_samples_per_batch=4096,n_features=16,n_batches=17,tmpdir=tmpdir,rank=coll.get_rank(),)# Since we are running two workers on a single node, we should divide the number# of threads between workers.n_threads=os.cpu_count()assertn_threadsisnotNonen_threads=max(n_threads//coll.get_world_size(),1)it=Iterator(device,files)Xy=xgboost.ExtMemQuantileDMatrix(it,missing=np.nan,enable_categorical=False,nthread=n_threads)# Check the device is correctly set.ifdevice=="cuda":# Check the first deviceassert(int(os.environ["CUDA_VISIBLE_DEVICES"].split(",")[0])<coll.get_world_size())booster=xgboost.train({"tree_method":"hist","max_depth":4,"device":it.device,"nthread":n_threads,},Xy,evals=[(Xy,"Train")],num_boost_round=10,)booster.predict(Xy)defmain(tmpdir:str,args:argparse.Namespace)->None:n_workers=2tracker=RabitTracker(host_ip="127.0.0.1",n_workers=n_workers)tracker.start()rabit_args=tracker.worker_args()definitializer(device:str)->None:# Set CUDA device before launching child processes.ifdevice=="cuda":# name: LokyProcess-1lop,sidx=mp.current_process().name.split("-")idx=int(sidx)-1# 1-based indexing from loky# Assuming two workers for demo.devices=",".join([str(idx),str((idx+1)%n_workers)])# P0: CUDA_VISIBLE_DEVICES=0,1# P1: CUDA_VISIBLE_DEVICES=1,0os.environ["CUDA_VISIBLE_DEVICES"]=devicessetup_rmm()withget_reusable_executor(max_workers=n_workers,initargs=(args.device,),initializer=initializer)aspool:# Poor man's curryingfn=update_wrapper(partial(hist_train,tmpdir=tmpdir,device=args.device,rabit_args=rabit_args),hist_train,)pool.map(fn,range(n_workers))if__name__=="__main__":parser=argparse.ArgumentParser()parser.add_argument("--device",choices=["cpu","cuda"],default="cpu")args=parser.parse_args()ifargs.device=="cuda":importcupyascpwithtempfile.TemporaryDirectory()astmpdir:main(tmpdir,args)else:withtempfile.TemporaryDirectory()astmpdir:main(tmpdir,args)

Gallery generated by Sphinx-Gallery