Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Object-oriented PHP library for running tasks in parallel

License

NotificationsYou must be signed in to change notification settings

aternosorg/taskmaster

Repository files navigation

Taskmaster is an object-oriented PHP library for running tasks in parallel.

A task can be written in a few lines of code and then executed in different environments, e.g. in aforked process, a new process, a thread or just synchronous in the same process without changing any code.

Therefore, this library can run without any extensions that aren't part of the PHP core, but it's alsopossible to use the advantages of advanced extensions such aspcntl orparallel if they are available.

It's even possible to proxy the creation of the environment through a proxy process that uses a differentPHP binary/installation with different extensions. This allows using the advantages of the differentstrategies even in environments where this would not be possible otherwise, e.g. using forked processeson a webserver.

Tasks can communicate back to the main process during execution and handle results and errors gracefully.

This library is not supported on Windows due to a lack of essential features. The autodetect falls back to thesync worker on Windows, so running tasks should be possible, but running tasks in parallel does not work.

Installation

composer require aternos/taskmaster

To use theForkWorker you have to installthepcntl extension.For theThreadWorkertheparallel extension is required.

Basic Example

// Every task is its own class, the class should be autoloadedclass SleepTaskextends \Aternos\Taskmaster\Task\Task {// The run method is called when the task is executedpublicfunctionrun():void     {sleep(1);    }}// The taskmaster object holds tasks and workers$taskmaster =new \Aternos\Taskmaster\Taskmaster();// Set up the workers automatically$taskmaster->autoDetectWorkers(4);// Add tasks to the taskmasterfor ($i =0;$i <8;$i++) {$taskmaster->runTask(newSleepTask());}// Wait for all tasks to finish and stop the taskmaster$taskmaster->wait()->stop();

Writing tasks

A task is an instance of a class. When writing your own task class, it is recommended to extend theTask class,but implementing theTaskInterface is also possible.

A class must define a run function and has some optional functions such as error handlers.

Tasks are serialized and therefore must not contain any unserializable fields such as closures orresources. They can define those fields when the task is executed in the run function.

Therun() function

The run function is called when the task is executed. It's the only required function fora task. It can return a value that is passed back to the main process and can be handled bydefining aTask::handleResult(mixed $result) function in your task.

In all current workers, the input/output streams are connected to the main process, soyou can useecho andSTDERR to output something in yourrun() function at any time.

Call back to the main process

A task (usually) runs in a different process than the main process. The result and errors are communicatedback to the main process, but it's also possible to call back to the main process during execution.

TheTask class provides theTask::call() andTask::callAsync() functions to calla function in the main process. TheTask::call() function blocks until the function is executed and theresult is returned. TheTask::callAsync() function returns aPromisethat resolves when the function is executed and the result is returned. While it is theoretically possibleto send multiple requests to the main process at the same time, the main process still has to process themsynchronously and therefore the async calls have no benefit. It's only recommended to use the async callsif you want to do something else in the task while waiting for the result.

The first parameter of theTask::call() andTask::callAsync() functions is aClosure of the functionthat you want to call in the main process. The function has to be a public function of your task class. The secondand following parameters are the parameters that are passed to the function in the main process as firstand following arguments. The arguments have to be serializable.

Example:

class CallbackTaskextends \Aternos\Taskmaster\Task\Task{staticprotectedint$current =0;    #[OnParent]publicfunctiongetCurrent():int    {returnstatic::$current++;    }    #[OnChild]publicfunctionrun():void    {$current =$this->call($this->getCurrent(...));echo"I am task number$current\n";    }}

Child/parent attributes

As seen in the example above, it's possible to define functions that are executed in the mainprocessOnParent,in the child processOnChild or in bothOnBoth usingattributes.

These attributes are optional (default isOnBoth) and for methods mostly used as adocumentation for the developer to clearly show which methods are executed where. The only implementedrestriction for methods is that functions marked with theOnChildattribute must not be called using theTask::call() orTask::callAsync() functions.

The attributes can also be used on properties:

Synchronized properties

Besides the usage for methods mentioned above, the attributes can also be used on properties to definewhere task properties are used and synchronized.

Properties marked with theOnParent attribute are only available in the main processand not serialized when running the task. They can contain unserializable values such as closures or resources.

Properties marked with theOnChild attribute are initially serialized and sent to thechild process, so they can be set initially on the parent. After that, they are only available in the childprocess and never synchronized back to the parent. So the child process in theTask::run() function canset the values of those properties to something unserializable.

Properties marked with theOnBoth or no attribute are initially serialized and sent to thechild process. They are synchronized back to the parent whenTask::callAsync() orTask::call() isused to call a function in the main process. They are also synchronized back to the parent when the taskis finished or (safely) errors with an exception. The synchronisation ONLY happens on those events, changesto the property are not immediately synchronized. The properties marked with this attribute have to be alwaysserializable.

Example:

class SynchronizedFieldTaskextends \Aternos\Taskmaster\Task\Task{    #[OnBoth]protectedint$counter =0;    #[OnBoth]publicfunctionincreaseCounter():void    {$this->counter++;    }    #[OnChild]publicfunctionrun():null    {for ($i =0;$i <3;$i++) {$this->increaseCounter();$this->call($this->increaseCounter(...));        }return$this->counter;    }}

The result of this task is6 because thecounter property is synchronized and increased on both sides.

Serialization in other classes

TheOnParent,OnChild andOnBothattributes are only available in yourTask class. If other objects are serialized butcontain properties that should not be serialized, you can use theSerializationTrait in your classand then add theSerializable orNotSerializableattributes to your properties.

You can use theSerializable attribute to mark properties that should be serialized.When using only theSerializable attribute, all properties that are not marked with theSerializable attribute will be ignored.

You can use theNotSerializable attribute to mark properties that should not be serialized.When using only theNotSerializable attribute, all properties that are not marked with theNotSerializable attribute will be serialized.

When using both attributes, all propertiesmust be marked with either theSerializableorNotSerializable attribute, otherwise an exception will be thrown.

Synchronous environment

In some cases special handling is required when the task is executed in a synchronous environmentusing theSyncWorker, e.g. you might not want to closefile handles that are still used by other tasks. TheTask::isSync() function canbe used to check if the task is being executed synchronously.

Handling the result

TheTask::handleResult() function is called when the task returns a value. It can be used to handlethe result of the task. You can override this function to implement your own result handler.It is not required to define this function.

The first parameter is the result of the task ornull if the task did not return a value.The default implementation in theTask class just stores the result inthe task object for later access using theTask::getResult() function. If you override thisfunction, you should call the parent function to store the result or store the result yourself.

You can also use theTaskPromise returned from theTaskmaster::runTask() function or obtainable from the task object using theTask::getPromise() functionto handle the result. TheTaskPromise is resolved with the return value of theTask::run() function.You can use theTaskPromise::then() function to handle the result. The first argument is the result,the second argument is the task object.

Example:

$taskmaster->runTask(newSleepTask())->then(function(mixed$result,TaskInterface$task) {echo"The task returned" .$result .PHP_EOL;});

Timeout

You can define a timeout for a task using theTask::setTimeout(?float $timeout) function or byoverriding theTask::getTimeout() function.0 means no timeout,null means the default timeoutis set by the taskmaster defined byTaskmaster::setDefaultTaskTimeout(float $timeout). The timeoutis set in seconds and can be a float value down to microseconds.

If the task takes longer than the timeout, aTaskTimeoutExceptionis thrown.

Timeouts are only used in asynchronous workers, with aSyncWorkerthe task is executed synchronously and therefore the timeout is not used.

Timeouts are not exact, they are evaluated in every update interval of the taskmaster. Therefore, thetask can take a little longer than the timeout.

Handling errors

Critical errors

TheTask::handleError() function is called when the task caused a fatal unrecoverable error. The firstparameter is anException that is thrown by the task or by this library. You can override this functionto implement your own error handler.

The three exception thrown by this library are thePhpFatalErrorExceptioncause by a fatal PHP error, theWorkerFailedException that is thrownwhen the worker process exited unexpectedly and theTaskTimeoutExceptionthat is thrown when the task takes longer than the timeout.

PHP fatal errors can only be caught if they were caused in a separate process, e.g. when usingtheForkWorker or theProcessWorker.It's not recommended to rely on this.

If a worker fails and the task gets aWorkerFailedException,it is possible that this was not caused by the task itself and therefore a retry of the task might be possible.This should be limited to a few retries to prevent endless loops.

TheTaskTimeoutException is thrown when the task takes longer than the timeout definedbyTask::getTimeout() or the default defined byTaskmaster::setDefaultTaskTimeout().

The default error handler implementation in theTask class stores the error inthe task object for later access using theTask::getError() function and writes the error message toSTDERR.When overriding this function, you should call the parent function to store the error or store the error yourself.

You can also use theTaskPromise returned from theTaskmaster::runTask() function or obtainable from the task object using theTask::getPromise() functionto handle the error. TheTaskPromise is rejected with the error. You can use theTaskPromise::catch() functionto handle the error. The first argument is the error, the second argument is the task object.

Example:

$taskmaster->runTask(newSleepTask())->catch(function(Exception$error,TaskInterface$task) {echo"The task failed:" .$error->getMessage() .PHP_EOL;});

Uncritical errors

TheTask::handleUncriticalError() function is called when the task caused an uncritical error, e.g. a PHP warning.You can override this function to implement your own error handler.The first parameter is aPhpError object that contains the errordetails. The function should returntrue if the error was handled orfalse (default) if the PHP errorhandler should continue (usually by logging/outputting the error).

ThehandleUncriticalError function is called in the same process as the task itself.

When executing tasks synchronously using theSyncWorker, no PHPerror handler is defined to avoid conflicts with other error handler of the main process. Therefore, thehandleUncriticalError function is not called in this case.

Creating tasks

A task object can simply be created by instancing the task class:

$task =newSleepTask();

And then added to the taskmaster:

$taskmaster->runTask($task);

You can add all your tasks at the beginning:

for ($i =0;$i <100;$i++) {$taskmaster->runTask(newSleepTask());}

or wait for the taskmaster to finish some tasks and then add more to avoid holding all tasks in memory:

for ($i =0;$i <10;$i++) {for ($j =0;$j <10;$j++) {$taskmaster->runTask(newSleepTask());    }$taskmaster->waitUntilAllTasksAreAssigned();}

Task factory

The best way to dynamically create tasks when necessary is by creating a task factory, by extending theTaskFactory class or implementingtheTaskFactoryInterface.

// Your own task factory extending the TaskFactory classclass SleepTaskFactoryextends \Aternos\Taskmaster\Task\TaskFactory{protectedint$count =0;publicfunctioncreateNextTask(?string$group): ?\Aternos\Taskmaster\Task\TaskInterface    {if ($this->count++ <100) {returnnewSleepTask();        }// Stop creating tasks after 100 tasksreturnnull;    }}$taskmaster->addTaskFactory(newSleepTaskFactory());

You could use the promise returned fromTask::getPromise() to also handle success andfailure of tasks in your task factory as well.

You can also use the existingIteratorTaskFactory that creates tasks from aniterator.

// Create an iterator that iterates over all files in the current directory$iterator =new \RecursiveIteratorIterator(new \RecursiveDirectoryIterator("."));// Create the task factory using the iterator and a task class that gets the iterator value as constructor argument// Note, that the SplFileInfo object that you get from a DirectoryIterator is not serializable and therefore cannot be// stored in a task property, but you can use any other values, e.g. the file path$factory =new \Aternos\Taskmaster\Task\IteratorTaskFactory($iterator, FileTask::class);$taskmaster->addTaskFactory($factory);

You can add multiple task factories to a taskmaster. The taskmaster will use the factories in the order they wereadded. If a factory returnsnull, the next factory is used.

Defining workers

A worker executes tasks. There are different workers available that execute tasks in different environments.

Available workers

Currently, the following workers are available:

WorkerRequirementsNotes
SyncWorkerNoneThe sync worker can be used as a fallback or if the number of tasks don't justify async execution.
ProcessWorkerproc_open() - This function is part of the PHP core but might be disabled.The process worker spawns an entirely new process which causes a slight overhead.
ForkWorkerpcntl extension - This extension can only be installed in CLI environments.Forking the current process is more lightweight than spawning a new process.
ThreadWorkerparallel extension - Also requires a build of PHP with ZTS (Zend Thread Safety) enabled.This worker is considered experimental and potentially unstable. It should not be used in production.

You can also write your own worker by extending the existing workers or implementingtheWorkerInterface.

Creating workers

A worker object can simply be created by instancing the worker class:

$worker =new \Aternos\Taskmaster\Environment\Sync\SyncWorker();

You can define custom options for the worker by creating aTaskmasterOptionsobject and passing it to the worker. If the options are not set, the default options from yourTaskmasterinstance are used.

$options =new \Aternos\Taskmaster\TaskmasterOptions();$options->setBootstrap(__DIR__ .'/vendor/autoload.php');$options->setPhpExecutable('/usr/bin/php');$worker =new \Aternos\Taskmaster\Environment\Process\ProcessWorker();$worker->setOptions($options);

Currently only the bootstrap file and the PHP executable can be set as options. Those options are onlyrelevant for some workers, especially theProcessWorker.

Proxy workers

It's possible to proxy the creation of the worker through a proxy process that uses a different PHP binary orenvironment, e.g. you can use a PHP CLI proxy process in a webserver environment to usetheForkWorker.One proxy can be used for multiple workers.

Currently, the only available proxy is theProcessProxy that uses a new process openedusingproc_open() to run the worker.

To use a proxy, create a new proxy object and pass it to the worker:

$proxy =new \Aternos\Taskmaster\Proxy\ProcessProxy();$worker =new \Aternos\Taskmaster\Environment\Fork\ForkWorker();$worker->setProxy($proxy);

You can also defineTaskmasterOptions for the proxy process.If the options are not set, the default options from yourTaskmaster instance are used.

$options =new \Aternos\Taskmaster\TaskmasterOptions();$options->setBootstrap(__DIR__ .'/vendor/autoload.php');$options->setPhpExecutable('/usr/bin/php');$proxy =new \Aternos\Taskmaster\Proxy\ProcessProxy();$proxy->setOptions($options);

Defining workers manually

Before running any tasks, you have to define the workers that should be used.

// Add a single worker$taskmaster->addWorker(new \Aternos\Taskmaster\Environment\Sync\SyncWorker());// Add a worker multiple times$taskmaster->addWorkers(new \Aternos\Taskmaster\Environment\Process\ProcessWorker(),4);// Add multiple workers with the same proxy$worker =new \Aternos\Taskmaster\Environment\Fork\ForkWorker();$worker->setProxy(new \Aternos\Taskmaster\Proxy\ProcessProxy());$taskmaster->addWorkers($worker,8);// Define/replace all workers at once$taskmaster->setWorkers([new \Aternos\Taskmaster\Environment\Fork\ForkWorker(),new \Aternos\Taskmaster\Environment\Fork\ForkWorker(),new \Aternos\Taskmaster\Environment\Fork\ForkWorker(),]);

Defining workers automatically

It's possible to detect the available workers and set them up automatically:

// create 4 automatically detected workers$taskmaster->autoDetectWorkers(4);

This will use theForkWorker if thepcntl extension is available,or theProcessWorker if theproc_open() function is availableand fall back to theSyncWorker otherwise.

TheautoDetectWorkers() function also supports loading the worker configuration from environment variables.

Defining workers using environment variables

When theautoDetectWorkers() function is called, it also checks the following environment variablesto create the worker configuration:

Environment variableDescription
TASKMASTER_WORKER_COUNTThe number of workers
TASKMASTER_WORKERThe type of worker, currently available workers aresync,fork,process andthread. The requirements for these workers have to be met, they are not checked again.
TASKMASTER_WORKER_PROXYThe type of proxy (if any) to use for the workers, currently onlyprocess is available.
TASKMASTER_WORKER_BINPath to the PHP binary to use for the workers, currently only applies toprocess workers.
TASKMASTER_WORKER_PROXY_BINPath to the PHP binary to use for the proxy.

When using theautoDetectWorkers() function, it's possible to disable loading the worker configuration fromenvironment variablesby setting the second argument tofalse or to just disable loading the worker count by setting the third argumenttofalse.

// Load count and workers from environment variables$taskmaster->autoDetectWorkers(4);// Load nothing from environment variables$taskmaster->autoDetectWorkers(4,false);// Load worker types from environment variables, but keep the worker count$taskmaster->autoDetectWorkers(4,true,false);

Init tasks

You can define tasks that are executed on every worker instance before the first task is executed.This is helpful to run some initial setup or (in case of theForkWorker)to clear some variables that are inherited from the parent process, e.g. database connections.

// init tasks are always provided by a task factory$taskmaster->setDefaultInitTaskFactory(newInitTaskFactory());// but taskmaster can create task factories automatically by cloning or instancing a task$taskmaster->setDefaultInitTask(newInitTask());$taskmaster->setDefaultInitTask(InitTask::class);// you can also define a task factory for a specific worker$worker->setInitTaskFactory(newInitTaskFactory());

Running tasks

After writing your tasks, creating them and defining the workers, you can start running the tasks.You don't have to explicitly start the taskmaster, just running the update loop through the waitfunctions or manually is enough. Workers and proxies are started when necessary.

Configuring the taskmaster

Besides configuring workers and proxies directly, you can also configure the defaultTaskmasterOptionson the taskmaster object. Those options are used for all workers and proxies that don't have their own options.

Bootstrap file

The bootstrap file is used to autoload classes in the worker process. This isn't used by everyworker, e.g. theSyncWorker and theForkWorkerdon't need this, but theProcessWorker does.

$taskmaster->setBootstrap(__DIR__ .'/vendor/autoload.php');

If this is not set, Taskmaster tries to find the composer autoloader automatically.

PHP executable

The PHP executable is used to run the worker process. This is currently only used by theProcessWorker and theProcessProxy.

$taskmaster->setPhpExecutable('/usr/bin/php');

The default value for the PHP executable is simplyphp.

Waiting for tasks to finish

You can simply wait for all tasks to finish using theTaskmaster::wait() function:

$taskmaster->wait();

This function blocks until all tasks are finished and then stops the taskmaster.If you might want to add further tasks, you can also use theTaskmaster::waitUntilAllTasksAreAssigned() functionto wait until all tasks are assigned to a worker and then add more tasks.

$taskmaster->waitUntilAllTasksAreAssigned();

This doesn't wait for all tasks to finish, but when all tasks are assigned to a worker, it's the besttime to add more tasks to avoid any workers being idle.

You should still wait for all tasks to finish usingTaskmaster::wait() before stopping the taskmaster.

Waiting and handling tasks

You can also use theTaskmaster::waitAndHandleTasks() function to handle tasks when theyfinish instead of waiting for all tasks to finish.

foreach ($taskmaster->waitAndHandleTasks()as$task) {if ($task->getError()) {echo"Task failed:" .$task->getError()->getMessage() .PHP_EOL;    }else {echo"Task finished:" .$task->getResult() .PHP_EOL;    }}

TheTaskmaster::waitAndHandleTasks() function returns a generator that yields tasks when they finish.You have to iterate over the generator to handle the tasks or the taskmaster will not continue to run.

Running the update loop manually

You can also run the update loop manually and do something else between the updates.TheTaskmaster::update() function returns an array of all tasks that finished since the last update.

do {$finishedTasks =$taskmaster->update();// do something else}while ($taskmaster->isRunning());

This is exactly the code of theTaskmaster::wait() function, but you can do something else between the updates.

Stopping the taskmaster

After you've waited for all tasks to finish, you should stop the taskmaster:

$taskmaster->stop();

Task/worker groups

For a more complex setup, you can group several workers together and then define tasks that onlyrun on a certain group.

// create a group A with 4 fork workers$workerA =new \Aternos\Taskmaster\Environment\Fork\ForkWorker();$workerA->setGroup('A');$taskmaster->addWorkers($workerA,4);// create a group B with 2 process workers$workerB =new \Aternos\Taskmaster\Environment\Process\ProcessWorker();$workerB->setGroup('B');$taskmaster->addWorkers($workerB,2);// create tasks that only run on group Afor ($i =0;$i <10;$i++) {$taskA =newSleepTask();$taskA->setGroup('A');$taskmaster->runTask($taskA);}// create tasks that only run on group Bfor ($i =0;$i <5;$i++) {$taskB =newFileTask();$taskB->setGroup('B');$taskmaster->runTask($taskB);}

Groups in task factories

Task factories also support groups in two ways.

You can directly define, for which groups the task factoryshould be called by returning an array of groups from theTaskFactory::getGroups() function. You can returnnull if you want to create tasks for all groups or[null] in an array if you want to be called for taskswithout a group.

And you get the group as a parameter in theTaskFactory::createNextTask(?string $group) function. The groupparameter isnull if the task factory is called for tasks without a group.

class SleepTaskFactoryextends \Aternos\Taskmaster\Task\TaskFactory{protectedint$count =0;publicfunctiongetGroups() : ?array    {return ['A','B'];    }publicfunctioncreateNextTask(?string$group): ?\Aternos\Taskmaster\Task\TaskInterface    {if ($this->count++ <100) {returnnewSleepTask();        }returnnull;    }}

[8]ページ先頭

©2009-2025 Movatter.jp