- Notifications
You must be signed in to change notification settings - Fork12
🎛 Distributed machine learning made simple.
License
ml-tooling/lazycluster
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Distributed machine learning made simple.
Use your preferred distributed ML framework like alazy engineer.
Getting Started •Highlights •Features •API Docs •Support •Report a Bug •Contribution
lazycluster is a Python library intended to liberate data scientists and machine learning engineers by abstractingaway cluster management and configuration so that they are able to focus on their actual tasks. Especially, the easyand convenient cluster setup with Python for various distributed machine learning frameworks is emphasized.
- High-Level API for starting clusters:
- Lower-level API for:
- ManagingRuntimes orRuntimeGroups to:
- A-/synchronously executeRuntimeTasks by leveraging the power of ssh
- Expose services (e.g. a DB) from or to a
Runtimeor in a wholeRuntimeGroup
- ManagingRuntimes orRuntimeGroups to:
- Command line interface (CLI)
- List all available
Runtimes - Add a
Runtimeconfiguration - Delete a
Runtimeconfiguration
- List all available
Concept Definition:Runtime
A
Runtimeis the logical representation of a remote host. Typically, the host is another server or a virtual machine / container on another server. This python class provides several methods for utilizing remote resources such as the port exposure from / to aRuntimeas well as the execution ofRuntimeTasks. ARuntimehas a working directory. Usually, the execution of aRuntimeTaskis conducted relatively to this directory if no other path is explicitly given. The working directory can be manually set during the initialization. Otherwise, a temporary directory gets created that might eventually be removed.
Concept Definition:RuntimeGroup
A
RuntimeGroupis the representation of logically relatedRuntimesand provides convenient methods for managing those relatedRuntimes. Most methods are wrappers around their counterparts in theRuntimeclass. Typical usage examples are exposing a port (i.e. a service such as a DB) in theRuntimeGroup, transfer files, or execute aRuntimeTaskon theRuntimes. Additionally, all concreteRuntimeCluster (e.g. theHyperoptCluster) implementations rely onRuntimeGroupsfor example.
The
managerrefers to the host where you are actually using the lazycluster library, since all desired lazycluster entities are managed from here.Caution: It is not to be confused with theRuntimeManager class.
Concept Definition:RuntimeTask
A
RuntimeTaskis a composition of multiple elemantary task steps, namelysend file,get file,run command(shell),run function(python). ARuntimeTaskcan be executed on a remote host either by handing it over to aRuntimeobject or standalone by handing over afabric Connection object to the execute method of theRuntimeTask. Consequently, all invididual task steps are executed sequentially. Moreover, aRuntimeTaskobject captures the output (stdout/stderr) of the remote execution in its execution log. An example for aRuntimeTaskcould be to send a csv file to aRuntime, execute a python function that is transforming the csv file and finally get the file back.
pip install lazycluster
# Most up-to-date development versionpip install --upgrade git+https://github.com/ml-tooling/lazycluster.git@developFor lazycluster usage on themanager:
Unix based OS
Python >= 3.6
ssh client (e.g. openssh-client)
Passwordless ssh access to the
Runtimehosts(recommended)Configure passwordless ssh access (click to expand...)
- Create a key pair on the manager as describedhere or use an existing one
- Install lazycluster on the manager
- Create the ssh configuration for each host to be used as Runtime by using the lazycluster CLI command
lazycluster add-runtimeas describedhere anddo not forget to specify the--id-fileargument. - Finally, enable the passwordless ssh access by copying the public key to each Runtime as descibedhere
Runtime host requirements:
- Unix based OS
- Python >= 3.6
- ssh server (e.g. openssh-server)
Note:
Passwordless ssh needs to be setup for the hosts to be used asRuntimes for the most convenient user experience. Otherwise, you need to pass the connection details to Runtime.__init__ via connection_kwargs. These parameters will be passed on to thefabric.Connection.
Start aDask cluster.
fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.dask_clusterimportDaskCluster# Automatically generate a group based on the ssh configurationruntime_manager=RuntimeManager()runtime_group=runtime_manager.create_group()# Start the Dask cluster instances using the RuntimeGroupdask_cluster=DaskCluster(runtime_group)dask_cluster.start()# => Now, you can start using the running Dask cluster# Get Dask client to interact with the cluster# Note: This will give you a dask.distributed.Client which is not# a lazycluster cluster but a Dask one insteadclient=cluster.get_client()
Execute a Python function on a remote host and access the return data.
fromlazyclusterimportRuntimeTask,Runtime# Define a Python function which will be executed remotelydefhello(name:str):return'Hello '+name+'!'# Compose a `RuntimeTask`task=RuntimeTask('my-first_task').run_command('echo Hello World!') \ .run_function(hello,name='World')# Actually execute it remotely in a `Runtime`task=Runtime('host-1').execute_task(task,execute_async=False)# The stdout from from the executing `Runtime` can be accessed# via the execution log of the `RuntimeTask`task.print_log()# Print the return of the `hello()` callgenerator=task.function_returnsprint(next(generator))
Thelazycluster project is maintained byJan Kalkan. Pleaseunderstand that we won't be able to provide individual support via email. We also believe that help is much morevaluable if it's shared publicly so that more people can benefit from it.
| Type | Channel |
|---|---|
| 🚨Bug Reports | |
| 🎁Feature Requests | |
| 👩💻Usage Questions | |
| 🗯General Discussion |
Details (click to expand...)
For a full list of CLI commands please uselazycluster --help. For the help of a specific command please uselazycluster COMMAND --help.
Moreover, also incative hosts will be shown. Inactive means, that the host could not be reached via ssh and instantiated as a valid Runtime.
# Will print a short list of active / inactive Runtimeslazycluster list-runtimes# will print a list of active / inactive Runtimes incl. additional host information# Note: This is slower as compared to omittin the -l optionlazycluster list-runtimes -l
The host is namedlocalhost for userroot accessible onlocalhost port22 using the private key file found under~/.ssh/id_rsa.
Note: Add command will only add the ssh configuration on themanager. For a complete guide on how to setup passwordless ssh check theprerequisites section.
lazycluster add-runtime localhost root@localhost:22 --id_file~/.ssh/id_rsaNote: Corresponding remote ikernel will be deleted too if present.
lazycluster delete-runtime host-1
Details (click to expand...)
ARuntime has a working directory. Usually, the execution of aRuntimeTask is conducted relatively to this directory if no other path is explicitly given. The working directory can be manually set during the initialization. Otherwise, a temporary directory gets created that might eventually be removed.
fromlazyclusterimportRuntime,RuntimeGrouprt_1=Runtime('host-1')rt_2=Runtime('host-2',working_dir='/workspace')# In this case you get a group where both Runtimes have different working directories.# The working directory on host-1 will be a temp one and gets removed eventually.runtime_group=RuntimeGroup([rt_1,rt_2])# Here, the group internally creates Runtimes for both hosts and sets its working directory.runtime_group=RuntimeGroup(hosts=['host-1','host-2'],working_dir='/workspace')
Moreover, you can set environment variables for the Runtimes. These variables can then be accessed when executing a Python function on the Runtime or executing a shell command. Per default the working directory is set as an env variable and the class constantRuntime.WORKING_DIR_ENV_VAR_NAME will give you the name of the variable. The working directory is always accessible also if manually update the env_variables.
# Directly set the env vars per Runtimesrt=Runtime('host-1')rt.env_variables= {'foo':'bar'}# Or use the convenient method to the the env vars# for all Runtimes in a RuntimeGroupruntime_group=RuntimeGroup(hosts=['host-1','host-2'])group.set_env_variables({'foo':'bar'})
Details (click to expand...)
TheRuntimeManager can automatically detect all availableRuntimes based on themanager's local ssh config and eventually create a necessaryRuntimeGroup for you.
fromlazyclusterimportRuntimeManager,RuntimeGroupruntime_group=RuntimeManager().create_group()
Start aDask cluster for scalable analytics
Details (click to expand...)
Most simple way to use Dask in a cluster based on aRuntimeGroup created by theRuntimeManager. TheRuntimeManager can automatically detect all availableRuntimes based on themanager's ssh config and eventually create a necessaryRuntimeGroup for you. ThisRuntimeGroup is then handed over toDaskCluster during initialization.
The DASKscheduler instance gets started on themanager. Additionally, multiple DASKworker processes get started in theRuntimeGroup, i.e. in theRuntimes. The default number of workers is equal to the number ofRuntimes in theRuntimeGroup.
Prerequisite:Please make sure that you have Dask installed on themanager. This can be done usingpip install -q "dask[complete]".
Details (click to expand...)
fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.dask_clusterimportDaskCluster# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect# available hosts (i.e. Runtimes) and create the group for you.runtime_group=RuntimeManager().create_group()# 2nd: Create the DaskCluster instance with the RuntimeGroup.cluster=DaskCluster(runtime_group)# 3rd: Let the DaskCluster instantiate all entities on Runtimes# of the RuntimeGroup using default values. For custom# configuration check the DaskCluster API documentation.cluster.start()# => Now, all cluster entities should be started and you can simply use# it as documented in the Dask distributed documentation.
Test the cluster setup
# Define test functions to be executed in parallel via DASKdefsquare(x):returnx**2defneg(x):return-x# Get a DASK client instanceclient=cluster.get_client()# Execute the computationA=client.map(square,range(10))B=client.map(neg,A)total=client.submit(sum,B, )res=total.result()print('Result: '+str(res))
Use different strategies for launching the master and the worker instances.
Details (click to expand...)
Use different strategies for launching the master and the worker instance by providing custom implementation oflazycluster.cluster.MasterLauncher andlazycluster.cluster.WorkerLauncher. The default implementations arelazycluster.cluster.dask_cluster.LocalMasterLauncher andlazycluster.cluster.dask_cluster.RoundRobinLauncher.
cluster=DaskCluster(RuntimeManager().create_group(),MyMasterLauncherImpl(),MyWorkerLauncherImpl())cluster.start()
Distributed hyperparameter tuning withHyperopt
Details (click to expand...)
Most simple way to use Hyperopt in a cluster based on aRuntimeGroup created by theRuntimeManager. TheRuntimeManager can automatically detect all availableRuntimes based on themanager's ssh config and eventually create a necessaryRuntimeGroup for you. ThisRuntimeGroup is then handed over toHyperoptCluster during initialization.
A MongoDB instance gets started on themanager. Additionally, multiple hyperoptworker processes get started in theRuntimeGroup, i.e. on the containedRuntimes. The default number of workers is equal to the number ofRuntimes in theRuntimeGroup.
Prerequisites:
- MongoDB server must be installed on themanager.
- Note: When using theml-workspace as the
masterthen you can use the provided install script for MongoDB which can be found under/resources/tools.
- Note: When using theml-workspace as the
- Hyperopt must be installed on all
Runtimeswhere hyperopt workers will be started- Note: When using theml-workspace as hosts for the
Runtimesthen hyperopt is already pre-installed.
- Note: When using theml-workspace as hosts for the
Launch a cluster (click to expand...)
For a detailed documentation of customizing options and default values check out theAPI docs
fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.hyperopt_clusterimportHyperoptCluster# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect# available hosts (i.e. Runtimes) and create the group for you.runtime_group=RuntimeManager().create_group()# 2nd: Create the HyperoptCluster instance with the RuntimeGroup.cluster=HyperoptCluster(runtime_group)# 3rd: Let the HyperoptCluster instantiate all entities on Runtimes of the RuntimeGroup using default values. For custom# configuration check the HyperoptCluster API documentation.cluster.start()# => Now, all cluster entities should be started and you can simply use# it as documented in the hyperopt documentation. We recommend to call# cluster.cleanup() once you are done.
Test the cluster setup using the simpleexample to minimize the sin function.
Note: The call tofmin is also done on themanager. Theobjective_function gets sent to the hyperopt workers by fmin via MongoDB. So there is no need to trigger the execution offmin or theobjective_function on the individualRuntimes. Seehyperopt docs for detailed explanation.
importmathfromhyperoptimportfmin,tpe,hpfromhyperopt.mongoexpimportMongoTrials# You can retrieve the the actual url required by MongoTrials form the cluster instancetrials=MongoTrials(cluster.mongo_trial_url,exp_key='exp1')objective_function=math.sinbest=fmin(objective_function,hp.uniform('x',-2,2),trials=trials,algo=tpe.suggest,max_evals=10)# Ensures that MongoDB gets stopped and other resourcescluster.cleanup()
Now, we will cenceptually demonstrate how to uselazycluster w/ hyperopt to optimize hyperparameters of afasttext model. Note, this should not be a fasttext demo and thus the actual usage of fasttext is not optimized. Thus, you should read the related docs for this purpose. The example should just highlight how to get fasttext up and running in a distributed setting using lazycluster.
fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.hyperopt_clusterimportHyperoptClusterimportos# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect# available hosts (i.e. Runtimes) and create the group with a persistent# working directory for you.runtime_group=RuntimeManager().create_group(working_dir='~/hyperopt')# 2nd: Send the training - and test dataset to all Runtimespath_to_datasets='/path_on_manager'train_file_name='train.csv'train_path=os.path.join(path_to_datasets,train_file_name)test_file_name='train.csv'test_path=os.path.join(path_to_datasets,test_file_name)# Per default the file will be send asynchronously to Runtime's working directoryruntime_group.send_file(train_file_name)runtime_group.send_file(test_file_name)# 3rd: Create the HyperoptCluster instance with the RuntimeGroup.cluster=HyperoptCluster(runtime_group)# 4th: Let the HyperoptCluster instantiate all entities on# Runtimes of the RuntimeGroup using default values.# For custom configuration check the HyperoptCluster API documentation.cluster.start()# 5th: Ensure that the processes for sending the files terminated already,# since we sent the files async in 2nd step.runtime_group.join()# => Now, all cluster entities are started, datasets transferred, and you# can simply use the lcuster as documented in the hyperopt documentation.# 6th: Define the objective function to be minimized by Hyperopt in order to find the# best hyperparameter combination.deftrain(params):importfasttextimportostrain_path=os.path.join(os.environ['WORKING_DIR'],params['train_set_file_name'])test_path=os.path.join(os.environ['WORKING_DIR'],params['test_set_file_name'])model=fasttext.train_supervised(input=train_path,lr=float(params['learning_rate']),dim=int(params['vector_dim']),ws=int(params['window_size']),epoch=int(params['epochs']),minCount=int(params['min_count']),neg=int(params['negativ_sampling']),t=float(params['sampling']),wordNgrams=1,# word ngrams other than 1 crashbucket=int(params['bucket']),pretrainedVectors=str(params['pretrained_vectors']),lrUpdateRate=int(params['lr_update_rate']),thread=int(params['threads']),verbose=2 )number_of_classes,precision,recall=model.test(test_path)f1=2* ((precision*recall)/ (precision+recall))# Return value must be negative because hyperopt's fmin tries to minimize the objective# function. You can think of it as minimizing an artificial loss function.return-1*f1fromhyperoptimportfmin,tpe,hpfromhyperopt.mongoexpimportMongoTrials# 7th: Define the searh space for the paramters to be optimized. Check further functions# of Hyperopt's hp module that might suit your specific requirement. This should just# give you an idea and not show how to best use fasttext.search_space= {'min_count':hp.quniform('min_count',2,20,1),'window_size':hp.quniform('window_size',4,15,1),'vector_dim':hp.quniform('vector_dim',100,300,1),'learning_rate':0.4,'lr_update_rate':100,'negativ_sampling':hp.quniform('negativ_sampling',5,20,1),'sampling':hp.uniform('sampling',0,10**-3),'bucket':2000000,'epochs':hp.quniform('epochs',3,30,1),'pretrained_vectors':'','threads':8,'train_set_file_name':train_file_name,'test_set_file_name':test_file_name}# 8th: Actually, execute the hyperparameter optimization. Use the mongo_trial_url# property of your HyperoptCluster instance to get the url in the format# required by MongoTrials.trials=MongoTrials(cluster.mongo_trial_url,exp_key='exp1')best=fmin(train,search_space,tpe.suggest,500,trials)print(best)
Debugging (click to expand...)
In general you should read theLogging, exception handling and debugging section first so that you are aware of the general options lazycluster offers for debugging.
So the first step is to successfully launch a Hyperopt cluster by using the corresponding lazycluster class. If you experience problems until this point you should analyze the exceptions which should guide you forward to a solution. If this given error is not self explaining then please consider to provide meaningful feedback here so that it will be soon. Common problems until the cluster is started are:
- MongoDB or hyperopt are not installed, i.e. the prerequisites are not yet fulfilled.=> Ensure that the prerequisites are fulfilled. Consider usingml-workspace to get rid of dependency problems.
- MongoDB is already running (under the same dbpath). This might especially happen if you started a cluster before and the cleanup did not happen correctly. Usually, the cleanup should happenatexit but sometimes it simply does not work depending on your execution environment.=> to prevent this problem you can and should explicitly call the
cleanup()method of theHyperoptClusterinstance=> to solve the problem if MongoDB is still running just typelsof -i | grep mongodinto a terminal. Finally, use thekill pidcommand with the process ID you got from issuing the previous command.
Once the Hyperopt cluster is running, you can startusing it. It should be noted, that the following is mainly about finding Hyperopt related issues since lazycluster basically did its job already. Typically, this means you have a bug in your objective function that you try to minimize with Hyperopt.
First, you could use theprint_log() method of your hyperopt to check the execution log. If you can't find any error here, then check theexecution log files or redirect the execution log from files to stdout of themanager by settingdebug=True in the start methods of theHyperoptCluster class.
Alternatively, you can ssh into one of yourRuntimes and manually start a hyperopt-worker process. You can find the respective shell command in thehyperopt docs. Moreover, you can get the necessary url for the--mongo argument by accessing the python propertymongo_url from yourHyperoptCluster instance once its running. Consequently, the newly started worker will poll a job from the master (i.e. MongoDB) and start its execution. Now you should see the error in the terminal once it occurs.
We found two common bug types related to the objective function. First, make sure that the hyper-/parameters you are passing to your model have the correct datatypes. Sounds trivial, right? :)
Next, you typically use some training - and test dataset on your Runtimes inside your objective function. So the correct file paths may be a bit tricky at first. You should understand that the objective function gets communicated to the hyperopt worker processes byfmin() via MongoDB. Consequently, the objective function gets executed as it is on the Runtimes and the paths must exist on theRuntimes. TheRuntime's working directory as documented in theAPI docs is of interest here. It should be noted, that the path of this directory is available on the Runtimes. Consequently, we recommend that you manually set a working directory on yourRuntimes and move the training - and test dataset files relative to the working directory. This can also be done onRuntimeGroup level. Now, you can create a relative path to the files inside your objective_function withos.path.join(os.environ['WORKING_DIR'], 'relative_file_path').Note: The advantage of manually setting a working directory in this case is that a manually set working directory does not get removed at the end. Consequently, you do not need to move the files each time you start the execution. This hint can safe you quite a lot of time especially when you need to restart the exectuion mutliple times while debugging.
Use different strategies for launching the master and the worker instances.
Details (click to expand...)
Use different strategies for launching the master and the worker instances by providing custom implementation oflazycluster.cluster.MasterLauncher andlazycluster.cluster.WorkerLauncher. The default implementations arelazycluster.cluster.hyperopt_cluster.LocalMongoLauncher andlazycluster.cluster.hyperopt_cluster.RoundRobinLauncher.
cluster=HyperoptCluster(RuntimeManager().create_group(),MyMasterLauncherImpl(),MyWorkerLauncherImpl())cluster.start()
Details (click to expand...)
A DB is running on a remote host on portruntime_port and the DB is only accessible from the remote host.But you also want to access the service from themanager on portlocal_port. Then you can use thismethod to expose the service which is running on the remote host to themanager.
Details (click to expand...)
fromlazyclusterimportRuntime# Create a Runtimeruntime=Runtime('host-1')# Make the port 50000 from the Runtime accessible on localhostruntime.expose_port_from_runtime(50000)# Make the local port 40000 accessible on the Runtimeruntime.expose_port_to_runtime(40000)
A DB is running on themanager on portlocal_port and the DB is only accessible from the manager.But you also want to access the service on the remoteRuntime on portruntime_port. Then you can usethis method to expose the service which is running on the manager to the remote host.
Details (click to expand...)
fromlazyclusterimportRuntime# Create a Runtimeruntime=Runtime('host-1')# Make the port 50000 from the Runtime accessible on localhostruntime.expose_port_from_runtime(50000)# Make the local port 40000 accessible on the Runtimeruntime.expose_port_to_runtime(40000)
Now, we extend the previous example by using aRuntimeGroup instead of just a singleRuntime. This means we want to expose a service which is running on themanager to a group ofRuntimes.
Details (click to expand...)
fromlazyclusterimportRuntimeGroup# Create a RuntimeGroupruntime_group=RuntimeGroup('host1','host-2','host-3')# Make the local port 50000 accessible on all Runtimes in the RuntimeGroup.runtime_group.expose_port_to_runtimes(50000)# Note: The port can also be exposed to a subset of the Runtimes by using the# method parameter exclude_hosts.runtime_group.expose_port_to_runtimes(50000,exclude_hosts='host-3')
Assume you have service which is running on Runtimehost-1. Now, you can expose the service to the remainingRuntimes in theRuntimeGroup.
Details (click to expand...)
fromlazyclusterimportRuntimeGroup# Create a RuntimeGroupruntime_group=RuntimeGroup('host1','host-2','host-3')# Make the port 40000 which is running on host-1 accessible on all other Runtimes in the RuntimeGroupruntime_group.expose_port_from_runtime_to_group('host-1',40000)
Details (click to expand...)
ARuntimeTask is capable of sending a file from themanager to aRuntime or vice versa. Moreover, theRuntime class as well as theRuntimeGroup provide convenient methods for this purpose that internally creates theRuntimeTasks for you.
In the following example, thefile.csv will be transferred to theRuntime's working directory. Another path on the Runtime can be specified by supplying aremote_path as argument. SeeRuntime docs for further details on the working directory.
fromlazyclusterimportRuntimeTask,Runtimetask=RuntimeTask('file-transfer')task.send_file('local_path/file.csv')runtime=Runtime('host-1')runtime.execute_task(task,exec_async=False)
The explicit creation of aRuntimeTask is only necessary if you intend to add further steps to theRuntimeTask instead of just transferring a file. For example, you want to send a file, execute a Python function, and transfer the file back. If not, you can use the file transfer methods of theRuntime orRuntimeGroup.In the case of sending a file to aRuntimeGroup you should send the files asynchronously. Otherwise, each file will be transferred sequentially. Do not forget to calljoin(), if you need the files to be transferred before proceeding.
fromlazyclusterimportRuntimeTask,Runtime,RuntimeGroup,RuntimeManager# Send a file to a single Runtimeruntime=Runtime('host-1')send_file('local_path/file.csv',execute_async=False)# Send a file to a whole RuntimeGroupgroup=RuntimeManager().create_group()group.send_file('local_path/file.csv',execute_async=True)group.join()
The usage of get_file is similar and documentedhere.
Details (click to expand...)
Read a local CSV file (on themanager) and upper case chunks in parallel usingRuntimeTasksand aRuntimeGroup.
fromtypingimportListimportpandasaspdfromlazyclusterimportRuntimeTask,RuntimeManager# Define the function to be executed remotelydefpreprocess(docs:List[str]):return [str(doc).lower()fordocindocs]file_path='/path/to/file.csv'runtime_group=RuntimeManager().create_group()tasks= []# Distribute chunks of the csv and start the preprocessing in parallel in the RuntimeGroupfordf_chunkinpd.read_csv(file_path,sep=';',chunksize=500):task=RuntimeTask().run_function(preprocess,docs=df_chunk['text'].tolist())tasks.append(runtime_group.execute_task(task))# Wait until all executions are doneruntime_group.join()# Get the return data and print itindex=0forchunkinruntime_group.function_returns:print('Chunk: '+str(index))index+=1print(chunk)
Details (click to expand...)
lazycluster aims to abstract away the complexity implied by using multiple distributedRuntimes and provides an intuitive high level API fur this purpose. The lazyclustermanager orchestrates the individual components of the distributed setup. A common use case could be to use lazycluster in order to launch a distributedhyperopt cluster. In this case, we have the lazyclustermanager, that starts aMongoDB instance, starts the hyperopt worker processes on multipleRuntimes and ensures the required communication via ssh between these instances. Each individual component could potentially fail including the 3rd party ones such as hyperopt workers. Sincelazycluster is a generic library and debugging a distributed system is an instrinsically non-trivial task, we tried to emphasize logging and good exception handling practices so that you can stay lazy.
We use the standard Pythonlogging module in order to log everything of interest that happens on themanager.
Details (click to expand...)
Per default we recommend to set the basicConfig log level tologging.INFO. Consequently, you will get relevant status updates about the progress of launching a cluster for example. Of course, you can adjust the log level tologging.DEBUG or anything you like.
We like to use the following basic configuration when using lazycluster in aJupyter notebook:
importlogginglogging.basicConfig(format='[%(levelname)s] %(message)s',level=logging.INFO)
Note:Some 3rd party libraries produce a lot of INFO messages, which are usually not of interest for the user. This is particular true forParamiko. We base most ssh handling onFabric which is based on Paramiko. We decided to set the log level for these libraries tologging.Error per default. This happens in the__init__.py module of the lazycluster package. And will be set once when importing the first module or class fromlazycluster. If you want to change the log level of 3rd party libs you can set it the following way:
importloggingfromlazyclusterimportEnvironment# Effects logs of all libraries that were initially set to logging.ERRORlazycluster.Environment.set_third_party_log_level(logging.INFO)# Of course, you can set the log level manually for each library / modulelogging.getLogger('paramiko').setLevel(logging.DEBUG)logging.getLogger('lazycluster').setLevel(logging.INFO)
Seeset_third_party_log_level() of theEnvironment class for a full list of affected libraries.
The execution log aims to provide a central access point to output produced on the Runtimes.
Details (click to expand...)
This type of log contains mainly the stdout/stderr produced when executing aRuntimeTask on aRuntime. If you are new to lazycluster or you never used the lower level API directly, then you might think the execution log is not relevant for you. But it is :) Also the concrete cluster implementations (e.g.DaskCluster orHyperoptCluster) are built on top of the lower-level API. You can think of it as the kind of log which you can use to understand what actually happened on yourRuntimes. You can access the execution log in 3 different ways.
The 1st option is by accessing the excution log files. The stdout/stderr generated on theRuntimes is streamed to log files. The respective directory is per default./lazycluster/execution_log on themanager. The log directory contains a subfolder for each Runtime (i.e. host) that executed at least oneRuntimeTask. Inside a Runtime folder you will find one log file per executed RuntimeTask. Each logfile name is generated by concatenating the name of theRuntimeTask and a current timestamp. You can configure the path were the log directory gets created by adjusting the lazycluster main directory. SeeEnvironment for this purpose. Moreover, the respective file path can be programmatically accessed viaRuntimeTask.execution_log_file_path. This property gets updated each time theRuntimeTask gets executed.
The 2nd option is to redirect the execution log (i.e. stdout/stderr from the Runtimes) to the stdout of themanager. Hereby, you can quickly spot errors. The drawback here is that you can not directly distinguish which Runtime generated which output, since the output of potentially multiple Runtimes is directly streamed to the manager's stdout as it occurs. To enable this feature you need to pass on thedebug flag to the respective methods (i.e. RuntimeTask.execute(), Runtime.execute_task(), RuntimeGroup.execute_task()). All cluster relatedstart() methods (e.g.HyperoptCluster.start(),DaskCluster.start() etc.) provide the debug option too. Example:
fromlazyclusterimportRuntimeGroup,RuntimeTasktask=RuntimeTask('debug-test').run_command('python --version')group=RuntimeGroup(hosts=['gaia-1','gaia-2'])tasks=group.execute_task(task,debug=True)
The 3rd option is to access theexecution_log property of aRuntimeTask. Additionally, theRuntime as well as theRuntimeGroup provide aprint_log() function which prints theexecution_log of theRuntimeTasks that were executed on theRuntimes. Theexecution_log property is a list and can be accessed via index. Each log entry corresponds to the output of a single (fully executed) step of aRuntimeTask. This means the stdout/stderr is not streamed to the manager can only be accessed after its execution. This kind of log might be useful if you need to access the ouput of a concreteRuntimeTask step programmatically. See theconcept definition and theclass documentation of theRuntimeTask for further details.
Note:It should be noted thatRuntimeTask.run_function() is actually not a single task step. A call to this method will produce multiple steps, since the Python function that needs to be executed will be send as a pickle file to the remote host. There it gets unpickled, executed and the return data is sent back as a pickle file. This means if you intend to access the exectution log you should be aware that the log contains multiple log entries for therun_function() call. But the number of steps per call is fixed. Moreover, you should think about using the return value of a a remotely executed Python function instead of using the execution log for this purpose.
fromlazyclusterimportRuntime,RuntimeTask# Create the tasktask=RuntimeTask('exec-log-demo')# Add 2 individual task stepstask.run_command('echo Hello')task.run_command('echo lazycluster!')# Create a Runtimeruntime=Runtime('host-1')# Execute the task remotely on the Runtimeruntime.execute_task(task)# Access th elog per indexprint(task.execution_log[0])# => 'Hello'print(task.execution_log[1])# => 'lazycluster!'# Let the Runtime print the log# an equivalent method exists for RuntimeGroupruntime.print_log()
Details (click to expand...)
Our exception handling concept follows the idea to use standard python classes whenever appropriate. Otherwise, we create a library specific error (i.e. exception) class.
Each created error class inherits from our base classLazyclusterError which in turn inherits from Pythons'sException class. We aim to be informative as possible with our used exceptions to guide you to a solution to your problem. So feel encouraged to provide feedback on misleading or unclear error messages, since we strongly believe that guided errors are essential so that you can stay as lazy as possible.
- Pull requests are encouraged and always welcome. Read
CONTRIBUTING.mdand check outhelp-wanted issues. - Submit github issues for anyfeature enhancements,bugs, ordocumentation problems.
- By participating in this project you agree to abide by itsCode of Conduct.
LicensedApache 2.0. Created and maintained with ❤️ by developers from SAP in Berlin.
About
🎛 Distributed machine learning made simple.
Topics
Resources
License
Code of conduct
Contributing
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors2
Uh oh!
There was an error while loading.Please reload this page.




