Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Mar 29, 2023. It is now read-only.
/lazyclusterPublic archive

🎛 Distributed machine learning made simple.

License

NotificationsYou must be signed in to change notification settings

ml-tooling/lazycluster

Distributed machine learning made simple.
Use your preferred distributed ML framework like alazy engineer.

Getting StartedHighlightsFeaturesAPI DocsSupportReport a BugContribution

lazycluster is a Python library intended to liberate data scientists and machine learning engineers by abstractingaway cluster management and configuration so that they are able to focus on their actual tasks. Especially, the easyand convenient cluster setup with Python for various distributed machine learning frameworks is emphasized.

Highlights

  • High-Level API for starting clusters:
    • DASK
    • Hyperopt
    • Morelazyclusters (e.g. Ray, PyTorch, Tensorflow, Horovod, Spark) to come ...
  • Lower-level API for:
    • ManagingRuntimes orRuntimeGroups to:
      • A-/synchronously executeRuntimeTasks by leveraging the power of ssh
      • Expose services (e.g. a DB) from or to aRuntime or in a wholeRuntimeGroup
  • Command line interface (CLI)
    • List all availableRuntimes
    • Add aRuntime configuration
    • Delete aRuntime configuration

API layer

Concept Definition:Runtime

ARuntime is the logical representation of a remote host. Typically, the host is another server or a virtual machine / container on another server. This python class provides several methods for utilizing remote resources such as the port exposure from / to aRuntime as well as the execution ofRuntimeTasks. ARuntime has a working directory. Usually, the execution of aRuntimeTask is conducted relatively to this directory if no other path is explicitly given. The working directory can be manually set during the initialization. Otherwise, a temporary directory gets created that might eventually be removed.

Concept Definition:RuntimeGroup

ARuntimeGroup is the representation of logically relatedRuntimes and provides convenient methods for managing those relatedRuntimes. Most methods are wrappers around their counterparts in theRuntime class. Typical usage examples are exposing a port (i.e. a service such as a DB) in theRuntimeGroup, transfer files, or execute aRuntimeTask on theRuntimes. Additionally, all concreteRuntimeCluster (e.g. theHyperoptCluster) implementations rely onRuntimeGroups for example.

Concept Definition:Manager

Themanager refers to the host where you are actually using the lazycluster library, since all desired lazycluster entities are managed from here.Caution: It is not to be confused with theRuntimeManager class.

Concept Definition:RuntimeTask

ARuntimeTask is a composition of multiple elemantary task steps, namelysend file,get file,run command (shell),run function (python). ARuntimeTask can be executed on a remote host either by handing it over to aRuntime object or standalone by handing over afabric Connection object to the execute method of theRuntimeTask. Consequently, all invididual task steps are executed sequentially. Moreover, aRuntimeTask object captures the output (stdout/stderr) of the remote execution in its execution log. An example for aRuntimeTask could be to send a csv file to aRuntime, execute a python function that is transforming the csv file and finally get the file back.



Getting started

Installation

pip install lazycluster
# Most up-to-date development versionpip install --upgrade git+https://github.com/ml-tooling/lazycluster.git@develop

Prerequisites

For lazycluster usage on themanager:

  • Unix based OS

  • Python >= 3.6

  • ssh client (e.g. openssh-client)

  • Passwordless ssh access to theRuntime hosts(recommended)

    Configure passwordless ssh access (click to expand...)
    • Create a key pair on the manager as describedhere or use an existing one
    • Install lazycluster on the manager
    • Create the ssh configuration for each host to be used as Runtime by using the lazycluster CLI commandlazycluster add-runtime as describedhere anddo not forget to specify the--id-file argument.
    • Finally, enable the passwordless ssh access by copying the public key to each Runtime as descibedhere

Runtime host requirements:

  • Unix based OS
  • Python >= 3.6
  • ssh server (e.g. openssh-server)

Note:

Passwordless ssh needs to be setup for the hosts to be used asRuntimes for the most convenient user experience. Otherwise, you need to pass the connection details to Runtime.__init__ via connection_kwargs. These parameters will be passed on to thefabric.Connection.

Usage example high-level API

Start aDask cluster.

fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.dask_clusterimportDaskCluster# Automatically generate a group based on the ssh configurationruntime_manager=RuntimeManager()runtime_group=runtime_manager.create_group()# Start the Dask cluster instances using the RuntimeGroupdask_cluster=DaskCluster(runtime_group)dask_cluster.start()# => Now, you can start using the running Dask cluster# Get Dask client to interact with the cluster# Note: This will give you a dask.distributed.Client which is not#       a lazycluster cluster but a Dask one insteadclient=cluster.get_client()

Usage example lower-level API

Execute a Python function on a remote host and access the return data.

fromlazyclusterimportRuntimeTask,Runtime# Define a Python function which will be executed remotelydefhello(name:str):return'Hello '+name+'!'# Compose a `RuntimeTask`task=RuntimeTask('my-first_task').run_command('echo Hello World!') \                                   .run_function(hello,name='World')# Actually execute it remotely in a `Runtime`task=Runtime('host-1').execute_task(task,execute_async=False)# The stdout from from the executing `Runtime` can be accessed# via the execution log of the `RuntimeTask`task.print_log()# Print the return of the `hello()` callgenerator=task.function_returnsprint(next(generator))

Support

Thelazycluster project is maintained byJan Kalkan. Pleaseunderstand that we won't be able to provide individual support via email. We also believe that help is much morevaluable if it's shared publicly so that more people can benefit from it.

TypeChannel
🚨Bug Reports
🎁Feature Requests
👩‍💻Usage Questions
🗯General Discussion

Features

Use the Command Line Interface (CLI) to manage local ssh configuration to enableRuntime usage

Details (click to expand...)

For a full list of CLI commands please uselazycluster --help. For the help of a specific command please uselazycluster COMMAND --help.

List all available runtimes incl. additional information like cpu, memory, etc.

Moreover, also incative hosts will be shown. Inactive means, that the host could not be reached via ssh and instantiated as a valid Runtime.

# Will print a short list of active / inactive Runtimeslazycluster list-runtimes

List Runtimes

# will print a list of active / inactive Runtimes incl. additional host information# Note: This is slower as compared to omittin the -l optionlazycluster list-runtimes -l

List Runtimes in long format

Add host to ssh config

The host is namedlocalhost for userroot accessible onlocalhost port22 using the private key file found under~/.ssh/id_rsa.

Note: Add command will only add the ssh configuration on themanager. For a complete guide on how to setup passwordless ssh check theprerequisites section.

lazycluster add-runtime localhost root@localhost:22 --id_file~/.ssh/id_rsa

Runtime Added

Delete the ssh config ofRuntime

Note: Corresponding remote ikernel will be deleted too if present.

lazycluster delete-runtime host-1

Runtime Deleted

CreateRuntimes &RuntimeGroups

Details (click to expand...)

ARuntime has a working directory. Usually, the execution of aRuntimeTask is conducted relatively to this directory if no other path is explicitly given. The working directory can be manually set during the initialization. Otherwise, a temporary directory gets created that might eventually be removed.

fromlazyclusterimportRuntime,RuntimeGrouprt_1=Runtime('host-1')rt_2=Runtime('host-2',working_dir='/workspace')# In this case you get a group where both Runtimes have different working directories.# The working directory on host-1 will be a temp one and gets removed eventually.runtime_group=RuntimeGroup([rt_1,rt_2])# Here, the group internally creates Runtimes for both hosts and sets its working directory.runtime_group=RuntimeGroup(hosts=['host-1','host-2'],working_dir='/workspace')

Moreover, you can set environment variables for the Runtimes. These variables can then be accessed when executing a Python function on the Runtime or executing a shell command. Per default the working directory is set as an env variable and the class constantRuntime.WORKING_DIR_ENV_VAR_NAME will give you the name of the variable. The working directory is always accessible also if manually update the env_variables.

# Directly set the env vars per Runtimesrt=Runtime('host-1')rt.env_variables= {'foo':'bar'}# Or use the convenient method to the the env vars# for all Runtimes in a RuntimeGroupruntime_group=RuntimeGroup(hosts=['host-1','host-2'])group.set_env_variables({'foo':'bar'})

Use theRuntimeManager to create aRuntimeGroup based on the manager's ssh config

Details (click to expand...)

TheRuntimeManager can automatically detect all availableRuntimes based on themanager's local ssh config and eventually create a necessaryRuntimeGroup for you.

fromlazyclusterimportRuntimeManager,RuntimeGroupruntime_group=RuntimeManager().create_group()

Start aDask cluster for scalable analytics

Details (click to expand...)

Most simple way to use Dask in a cluster based on aRuntimeGroup created by theRuntimeManager. TheRuntimeManager can automatically detect all availableRuntimes based on themanager's ssh config and eventually create a necessaryRuntimeGroup for you. ThisRuntimeGroup is then handed over toDaskCluster during initialization.

The DASKscheduler instance gets started on themanager. Additionally, multiple DASKworker processes get started in theRuntimeGroup, i.e. in theRuntimes. The default number of workers is equal to the number ofRuntimes in theRuntimeGroup.

Prerequisite:Please make sure that you have Dask installed on themanager. This can be done usingpip install -q "dask[complete]".

Details (click to expand...)
fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.dask_clusterimportDaskCluster# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect#      available hosts (i.e. Runtimes) and create the group for you.runtime_group=RuntimeManager().create_group()# 2nd: Create the DaskCluster instance with the RuntimeGroup.cluster=DaskCluster(runtime_group)# 3rd: Let the DaskCluster instantiate all entities on Runtimes#      of the RuntimeGroup using default values. For custom#      configuration check the DaskCluster API documentation.cluster.start()# => Now, all cluster entities should be started and you can simply use#    it as documented in the Dask distributed documentation.

Test the cluster setup

# Define test functions to be executed in parallel via DASKdefsquare(x):returnx**2defneg(x):return-x# Get a DASK client instanceclient=cluster.get_client()# Execute the computationA=client.map(square,range(10))B=client.map(neg,A)total=client.submit(sum,B, )res=total.result()print('Result: '+str(res))

Use different strategies for launching the master and the worker instances.
Details (click to expand...)

Use different strategies for launching the master and the worker instance by providing custom implementation oflazycluster.cluster.MasterLauncher andlazycluster.cluster.WorkerLauncher. The default implementations arelazycluster.cluster.dask_cluster.LocalMasterLauncher andlazycluster.cluster.dask_cluster.RoundRobinLauncher.

cluster=DaskCluster(RuntimeManager().create_group(),MyMasterLauncherImpl(),MyWorkerLauncherImpl())cluster.start()

Distributed hyperparameter tuning withHyperopt

Details (click to expand...)

Most simple way to use Hyperopt in a cluster based on aRuntimeGroup created by theRuntimeManager. TheRuntimeManager can automatically detect all availableRuntimes based on themanager's ssh config and eventually create a necessaryRuntimeGroup for you. ThisRuntimeGroup is then handed over toHyperoptCluster during initialization.

A MongoDB instance gets started on themanager. Additionally, multiple hyperoptworker processes get started in theRuntimeGroup, i.e. on the containedRuntimes. The default number of workers is equal to the number ofRuntimes in theRuntimeGroup.

Prerequisites:

  • MongoDB server must be installed on themanager.
    • Note: When using theml-workspace as themaster then you can use the provided install script for MongoDB which can be found under/resources/tools.
  • Hyperopt must be installed on allRuntimes where hyperopt workers will be started
    • Note: When using theml-workspace as hosts for theRuntimes then hyperopt is already pre-installed.
Launch a cluster (click to expand...)

For a detailed documentation of customizing options and default values check out theAPI docs

fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.hyperopt_clusterimportHyperoptCluster# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect#      available hosts (i.e. Runtimes) and create the group for you.runtime_group=RuntimeManager().create_group()# 2nd: Create the HyperoptCluster instance with the RuntimeGroup.cluster=HyperoptCluster(runtime_group)# 3rd: Let the HyperoptCluster instantiate all entities on Runtimes of the RuntimeGroup using default values. For custom#      configuration check the HyperoptCluster API documentation.cluster.start()# => Now, all cluster entities should be started and you can simply use#    it as documented in the hyperopt documentation. We recommend to call#    cluster.cleanup() once you are done.

Test the cluster setup using the simpleexample to minimize the sin function.

Note: The call tofmin is also done on themanager. Theobjective_function gets sent to the hyperopt workers by fmin via MongoDB. So there is no need to trigger the execution offmin or theobjective_function on the individualRuntimes. Seehyperopt docs for detailed explanation.

importmathfromhyperoptimportfmin,tpe,hpfromhyperopt.mongoexpimportMongoTrials# You can retrieve the the actual url required by MongoTrials form the cluster instancetrials=MongoTrials(cluster.mongo_trial_url,exp_key='exp1')objective_function=math.sinbest=fmin(objective_function,hp.uniform('x',-2,2),trials=trials,algo=tpe.suggest,max_evals=10)# Ensures that MongoDB gets stopped and other resourcescluster.cleanup()

Now, we will cenceptually demonstrate how to uselazycluster w/ hyperopt to optimize hyperparameters of afasttext model. Note, this should not be a fasttext demo and thus the actual usage of fasttext is not optimized. Thus, you should read the related docs for this purpose. The example should just highlight how to get fasttext up and running in a distributed setting using lazycluster.

fromlazyclusterimportRuntimeManagerfromlazycluster.cluster.hyperopt_clusterimportHyperoptClusterimportos# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect#      available hosts (i.e. Runtimes) and create the group with a persistent#      working directory for you.runtime_group=RuntimeManager().create_group(working_dir='~/hyperopt')# 2nd: Send the training - and test dataset to all Runtimespath_to_datasets='/path_on_manager'train_file_name='train.csv'train_path=os.path.join(path_to_datasets,train_file_name)test_file_name='train.csv'test_path=os.path.join(path_to_datasets,test_file_name)# Per default the file will be send asynchronously to Runtime's working directoryruntime_group.send_file(train_file_name)runtime_group.send_file(test_file_name)# 3rd: Create the HyperoptCluster instance with the RuntimeGroup.cluster=HyperoptCluster(runtime_group)# 4th: Let the HyperoptCluster instantiate all entities on# Runtimes of the RuntimeGroup using default values.# For custom  configuration check the HyperoptCluster API documentation.cluster.start()# 5th: Ensure that the processes for sending the files terminated already,#      since we sent the files async in 2nd step.runtime_group.join()# => Now, all cluster entities are started, datasets transferred, and you#    can simply use the lcuster as documented in the hyperopt documentation.# 6th: Define the objective function to be minimized by Hyperopt in order to find the#      best hyperparameter combination.deftrain(params):importfasttextimportostrain_path=os.path.join(os.environ['WORKING_DIR'],params['train_set_file_name'])test_path=os.path.join(os.environ['WORKING_DIR'],params['test_set_file_name'])model=fasttext.train_supervised(input=train_path,lr=float(params['learning_rate']),dim=int(params['vector_dim']),ws=int(params['window_size']),epoch=int(params['epochs']),minCount=int(params['min_count']),neg=int(params['negativ_sampling']),t=float(params['sampling']),wordNgrams=1,# word ngrams other than 1 crashbucket=int(params['bucket']),pretrainedVectors=str(params['pretrained_vectors']),lrUpdateRate=int(params['lr_update_rate']),thread=int(params['threads']),verbose=2    )number_of_classes,precision,recall=model.test(test_path)f1=2* ((precision*recall)/ (precision+recall))# Return value must be negative because hyperopt's fmin tries to minimize the objective# function. You can think of it as minimizing an artificial loss function.return-1*f1fromhyperoptimportfmin,tpe,hpfromhyperopt.mongoexpimportMongoTrials# 7th: Define the searh space for the paramters to be optimized. Check further functions#      of Hyperopt's hp module that might suit your specific requirement. This should just#      give you an idea and not show how to best use fasttext.search_space= {'min_count':hp.quniform('min_count',2,20,1),'window_size':hp.quniform('window_size',4,15,1),'vector_dim':hp.quniform('vector_dim',100,300,1),'learning_rate':0.4,'lr_update_rate':100,'negativ_sampling':hp.quniform('negativ_sampling',5,20,1),'sampling':hp.uniform('sampling',0,10**-3),'bucket':2000000,'epochs':hp.quniform('epochs',3,30,1),'pretrained_vectors':'','threads':8,'train_set_file_name':train_file_name,'test_set_file_name':test_file_name}# 8th: Actually, execute the hyperparameter optimization. Use the mongo_trial_url#      property of your HyperoptCluster instance to get the url in the format#      required by MongoTrials.trials=MongoTrials(cluster.mongo_trial_url,exp_key='exp1')best=fmin(train,search_space,tpe.suggest,500,trials)print(best)

Debugging (click to expand...)

In general you should read theLogging, exception handling and debugging section first so that you are aware of the general options lazycluster offers for debugging.
So the first step is to successfully launch a Hyperopt cluster by using the corresponding lazycluster class. If you experience problems until this point you should analyze the exceptions which should guide you forward to a solution. If this given error is not self explaining then please consider to provide meaningful feedback here so that it will be soon. Common problems until the cluster is started are:

  • MongoDB or hyperopt are not installed, i.e. the prerequisites are not yet fulfilled.=> Ensure that the prerequisites are fulfilled. Consider usingml-workspace to get rid of dependency problems.
  • MongoDB is already running (under the same dbpath). This might especially happen if you started a cluster before and the cleanup did not happen correctly. Usually, the cleanup should happenatexit but sometimes it simply does not work depending on your execution environment.=> to prevent this problem you can and should explicitly call thecleanup() method of theHyperoptCluster instance=> to solve the problem if MongoDB is still running just typelsof -i | grep mongod into a terminal. Finally, use thekill pid command with the process ID you got from issuing the previous command.

Once the Hyperopt cluster is running, you can startusing it. It should be noted, that the following is mainly about finding Hyperopt related issues since lazycluster basically did its job already. Typically, this means you have a bug in your objective function that you try to minimize with Hyperopt.
First, you could use theprint_log() method of your hyperopt to check the execution log. If you can't find any error here, then check theexecution log files or redirect the execution log from files to stdout of themanager by settingdebug=True in the start methods of theHyperoptCluster class.
Alternatively, you can ssh into one of yourRuntimes and manually start a hyperopt-worker process. You can find the respective shell command in thehyperopt docs. Moreover, you can get the necessary url for the--mongo argument by accessing the python propertymongo_url from yourHyperoptCluster instance once its running. Consequently, the newly started worker will poll a job from the master (i.e. MongoDB) and start its execution. Now you should see the error in the terminal once it occurs.

We found two common bug types related to the objective function. First, make sure that the hyper-/parameters you are passing to your model have the correct datatypes. Sounds trivial, right? :)
Next, you typically use some training - and test dataset on your Runtimes inside your objective function. So the correct file paths may be a bit tricky at first. You should understand that the objective function gets communicated to the hyperopt worker processes byfmin() via MongoDB. Consequently, the objective function gets executed as it is on the Runtimes and the paths must exist on theRuntimes. TheRuntime's working directory as documented in theAPI docs is of interest here. It should be noted, that the path of this directory is available on the Runtimes. Consequently, we recommend that you manually set a working directory on yourRuntimes and move the training - and test dataset files relative to the working directory. This can also be done onRuntimeGroup level. Now, you can create a relative path to the files inside your objective_function withos.path.join(os.environ['WORKING_DIR'], 'relative_file_path').Note: The advantage of manually setting a working directory in this case is that a manually set working directory does not get removed at the end. Consequently, you do not need to move the files each time you start the execution. This hint can safe you quite a lot of time especially when you need to restart the exectuion mutliple times while debugging.


Use different strategies for launching the master and the worker instances.

Details (click to expand...)

Use different strategies for launching the master and the worker instances by providing custom implementation oflazycluster.cluster.MasterLauncher andlazycluster.cluster.WorkerLauncher. The default implementations arelazycluster.cluster.hyperopt_cluster.LocalMongoLauncher andlazycluster.cluster.hyperopt_cluster.RoundRobinLauncher.

cluster=HyperoptCluster(RuntimeManager().create_group(),MyMasterLauncherImpl(),MyWorkerLauncherImpl())cluster.start()

Expose services

Details (click to expand...)

Expose a service from aRuntime

A DB is running on a remote host on portruntime_port and the DB is only accessible from the remote host.But you also want to access the service from themanager on portlocal_port. Then you can use thismethod to expose the service which is running on the remote host to themanager.

Details (click to expand...)
fromlazyclusterimportRuntime# Create a Runtimeruntime=Runtime('host-1')# Make the port 50000 from the Runtime accessible on localhostruntime.expose_port_from_runtime(50000)# Make the local port 40000 accessible on the Runtimeruntime.expose_port_to_runtime(40000)

Expose a service to aRuntime

A DB is running on themanager on portlocal_port and the DB is only accessible from the manager.But you also want to access the service on the remoteRuntime on portruntime_port. Then you can usethis method to expose the service which is running on the manager to the remote host.

Details (click to expand...)
fromlazyclusterimportRuntime# Create a Runtimeruntime=Runtime('host-1')# Make the port 50000 from the Runtime accessible on localhostruntime.expose_port_from_runtime(50000)# Make the local port 40000 accessible on the Runtimeruntime.expose_port_to_runtime(40000)

Service exposure

Now, we extend the previous example by using aRuntimeGroup instead of just a singleRuntime. This means we want to expose a service which is running on themanager to a group ofRuntimes.

Details (click to expand...)
fromlazyclusterimportRuntimeGroup# Create a RuntimeGroupruntime_group=RuntimeGroup('host1','host-2','host-3')# Make the local port 50000 accessible on all Runtimes in the RuntimeGroup.runtime_group.expose_port_to_runtimes(50000)# Note: The port can also be exposed to a subset of the Runtimes by using the# method parameter exclude_hosts.runtime_group.expose_port_to_runtimes(50000,exclude_hosts='host-3')

Expose a service from aRuntime to the otherRuntimes in theRuntimeGroup

Assume you have service which is running on Runtimehost-1. Now, you can expose the service to the remainingRuntimes in theRuntimeGroup.

Details (click to expand...)
fromlazyclusterimportRuntimeGroup# Create a RuntimeGroupruntime_group=RuntimeGroup('host1','host-2','host-3')# Make the port 40000 which is running on host-1 accessible on all other Runtimes in the RuntimeGroupruntime_group.expose_port_from_runtime_to_group('host-1',40000)

File Transfer

Details (click to expand...)

ARuntimeTask is capable of sending a file from themanager to aRuntime or vice versa. Moreover, theRuntime class as well as theRuntimeGroup provide convenient methods for this purpose that internally creates theRuntimeTasks for you.

In the following example, thefile.csv will be transferred to theRuntime's working directory. Another path on the Runtime can be specified by supplying aremote_path as argument. SeeRuntime docs for further details on the working directory.

fromlazyclusterimportRuntimeTask,Runtimetask=RuntimeTask('file-transfer')task.send_file('local_path/file.csv')runtime=Runtime('host-1')runtime.execute_task(task,exec_async=False)

The explicit creation of aRuntimeTask is only necessary if you intend to add further steps to theRuntimeTask instead of just transferring a file. For example, you want to send a file, execute a Python function, and transfer the file back. If not, you can use the file transfer methods of theRuntime orRuntimeGroup.In the case of sending a file to aRuntimeGroup you should send the files asynchronously. Otherwise, each file will be transferred sequentially. Do not forget to calljoin(), if you need the files to be transferred before proceeding.

fromlazyclusterimportRuntimeTask,Runtime,RuntimeGroup,RuntimeManager# Send a file to a single Runtimeruntime=Runtime('host-1')send_file('local_path/file.csv',execute_async=False)# Send a file to a whole RuntimeGroupgroup=RuntimeManager().create_group()group.send_file('local_path/file.csv',execute_async=True)group.join()

The usage of get_file is similar and documentedhere.

Simple preprocessing example

Details (click to expand...)

Read a local CSV file (on themanager) and upper case chunks in parallel usingRuntimeTasksand aRuntimeGroup.

fromtypingimportListimportpandasaspdfromlazyclusterimportRuntimeTask,RuntimeManager# Define the function to be executed remotelydefpreprocess(docs:List[str]):return [str(doc).lower()fordocindocs]file_path='/path/to/file.csv'runtime_group=RuntimeManager().create_group()tasks= []# Distribute chunks of the csv and start the preprocessing in parallel in the RuntimeGroupfordf_chunkinpd.read_csv(file_path,sep=';',chunksize=500):task=RuntimeTask().run_function(preprocess,docs=df_chunk['text'].tolist())tasks.append(runtime_group.execute_task(task))# Wait until all executions are doneruntime_group.join()# Get the return data and print itindex=0forchunkinruntime_group.function_returns:print('Chunk: '+str(index))index+=1print(chunk)

Logging, exception handling and debugging

Details (click to expand...)

lazycluster aims to abstract away the complexity implied by using multiple distributedRuntimes and provides an intuitive high level API fur this purpose. The lazyclustermanager orchestrates the individual components of the distributed setup. A common use case could be to use lazycluster in order to launch a distributedhyperopt cluster. In this case, we have the lazyclustermanager, that starts aMongoDB instance, starts the hyperopt worker processes on multipleRuntimes and ensures the required communication via ssh between these instances. Each individual component could potentially fail including the 3rd party ones such as hyperopt workers. Sincelazycluster is a generic library and debugging a distributed system is an instrinsically non-trivial task, we tried to emphasize logging and good exception handling practices so that you can stay lazy.

Standard Python log

We use the standard Pythonlogging module in order to log everything of interest that happens on themanager.

Details (click to expand...)

Per default we recommend to set the basicConfig log level tologging.INFO. Consequently, you will get relevant status updates about the progress of launching a cluster for example. Of course, you can adjust the log level tologging.DEBUG or anything you like.

We like to use the following basic configuration when using lazycluster in aJupyter notebook:

importlogginglogging.basicConfig(format='[%(levelname)s] %(message)s',level=logging.INFO)

Note:Some 3rd party libraries produce a lot of INFO messages, which are usually not of interest for the user. This is particular true forParamiko. We base most ssh handling onFabric which is based on Paramiko. We decided to set the log level for these libraries tologging.Error per default. This happens in the__init__.py module of the lazycluster package. And will be set once when importing the first module or class fromlazycluster. If you want to change the log level of 3rd party libs you can set it the following way:

importloggingfromlazyclusterimportEnvironment# Effects logs of all libraries that were initially set to logging.ERRORlazycluster.Environment.set_third_party_log_level(logging.INFO)# Of course, you can set the log level manually for each library / modulelogging.getLogger('paramiko').setLevel(logging.DEBUG)logging.getLogger('lazycluster').setLevel(logging.INFO)

Seeset_third_party_log_level() of theEnvironment class for a full list of affected libraries.

Execution log

The execution log aims to provide a central access point to output produced on the Runtimes.

Details (click to expand...)

This type of log contains mainly the stdout/stderr produced when executing aRuntimeTask on aRuntime. If you are new to lazycluster or you never used the lower level API directly, then you might think the execution log is not relevant for you. But it is :) Also the concrete cluster implementations (e.g.DaskCluster orHyperoptCluster) are built on top of the lower-level API. You can think of it as the kind of log which you can use to understand what actually happened on yourRuntimes. You can access the execution log in 3 different ways.

The 1st option is by accessing the excution log files. The stdout/stderr generated on theRuntimes is streamed to log files. The respective directory is per default./lazycluster/execution_log on themanager. The log directory contains a subfolder for each Runtime (i.e. host) that executed at least oneRuntimeTask. Inside a Runtime folder you will find one log file per executed RuntimeTask. Each logfile name is generated by concatenating the name of theRuntimeTask and a current timestamp. You can configure the path were the log directory gets created by adjusting the lazycluster main directory. SeeEnvironment for this purpose. Moreover, the respective file path can be programmatically accessed viaRuntimeTask.execution_log_file_path. This property gets updated each time theRuntimeTask gets executed.

The 2nd option is to redirect the execution log (i.e. stdout/stderr from the Runtimes) to the stdout of themanager. Hereby, you can quickly spot errors. The drawback here is that you can not directly distinguish which Runtime generated which output, since the output of potentially multiple Runtimes is directly streamed to the manager's stdout as it occurs. To enable this feature you need to pass on thedebug flag to the respective methods (i.e. RuntimeTask.execute(), Runtime.execute_task(), RuntimeGroup.execute_task()). All cluster relatedstart() methods (e.g.HyperoptCluster.start(),DaskCluster.start() etc.) provide the debug option too. Example:

fromlazyclusterimportRuntimeGroup,RuntimeTasktask=RuntimeTask('debug-test').run_command('python --version')group=RuntimeGroup(hosts=['gaia-1','gaia-2'])tasks=group.execute_task(task,debug=True)

The 3rd option is to access theexecution_log property of aRuntimeTask. Additionally, theRuntime as well as theRuntimeGroup provide aprint_log() function which prints theexecution_log of theRuntimeTasks that were executed on theRuntimes. Theexecution_log property is a list and can be accessed via index. Each log entry corresponds to the output of a single (fully executed) step of aRuntimeTask. This means the stdout/stderr is not streamed to the manager can only be accessed after its execution. This kind of log might be useful if you need to access the ouput of a concreteRuntimeTask step programmatically. See theconcept definition and theclass documentation of theRuntimeTask for further details.

Note:It should be noted thatRuntimeTask.run_function() is actually not a single task step. A call to this method will produce multiple steps, since the Python function that needs to be executed will be send as a pickle file to the remote host. There it gets unpickled, executed and the return data is sent back as a pickle file. This means if you intend to access the exectution log you should be aware that the log contains multiple log entries for therun_function() call. But the number of steps per call is fixed. Moreover, you should think about using the return value of a a remotely executed Python function instead of using the execution log for this purpose.

fromlazyclusterimportRuntime,RuntimeTask# Create the tasktask=RuntimeTask('exec-log-demo')# Add 2 individual task stepstask.run_command('echo Hello')task.run_command('echo lazycluster!')# Create a Runtimeruntime=Runtime('host-1')# Execute the task remotely on the Runtimeruntime.execute_task(task)# Access th elog per indexprint(task.execution_log[0])# => 'Hello'print(task.execution_log[1])# => 'lazycluster!'# Let the Runtime print the log# an equivalent method exists for RuntimeGroupruntime.print_log()

Exception handling

Details (click to expand...)

Our exception handling concept follows the idea to use standard python classes whenever appropriate. Otherwise, we create a library specific error (i.e. exception) class.

Each created error class inherits from our base classLazyclusterError which in turn inherits from Pythons'sException class. We aim to be informative as possible with our used exceptions to guide you to a solution to your problem. So feel encouraged to provide feedback on misleading or unclear error messages, since we strongly believe that guided errors are essential so that you can stay as lazy as possible.


Contribution


LicensedApache 2.0. Created and maintained with ❤️ by developers from SAP in Berlin.

About

🎛 Distributed machine learning made simple.

Topics

Resources

License

Code of conduct

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors2

  •  
  •  

[8]ページ先頭

©2009-2026 Movatter.jp