- Notifications
You must be signed in to change notification settings - Fork2
Object-oriented PHP library for running tasks in parallel
License
aternosorg/taskmaster
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Taskmaster is an object-oriented PHP library for running tasks in parallel.
A task can be written in a few lines of code and then executed in different environments, e.g. in aforked process, a new process, a thread or just synchronous in the same process without changing any code.
Therefore, this library can run without any extensions that aren't part of the PHP core, but it's alsopossible to use the advantages of advanced extensions such aspcntl
orparallel
if they are available.
It's even possible to proxy the creation of the environment through a proxy process that uses a differentPHP binary/installation with different extensions. This allows using the advantages of the differentstrategies even in environments where this would not be possible otherwise, e.g. using forked processeson a webserver.
Tasks can communicate back to the main process during execution and handle results and errors gracefully.
This library is not supported on Windows due to a lack of essential features. The autodetect falls back to thesync worker on Windows, so running tasks should be possible, but running tasks in parallel does not work.
composer require aternos/taskmaster
To use theForkWorker
you have to installthepcntl
extension.For theThreadWorker
theparallel
extension is required.
// Every task is its own class, the class should be autoloadedclass SleepTaskextends \Aternos\Taskmaster\Task\Task {// The run method is called when the task is executedpublicfunctionrun():void {sleep(1); }}// The taskmaster object holds tasks and workers$taskmaster =new \Aternos\Taskmaster\Taskmaster();// Set up the workers automatically$taskmaster->autoDetectWorkers(4);// Add tasks to the taskmasterfor ($i =0;$i <8;$i++) {$taskmaster->runTask(newSleepTask());}// Wait for all tasks to finish and stop the taskmaster$taskmaster->wait()->stop();
A task is an instance of a class. When writing your own task class, it is recommended to extend theTask
class,but implementing theTaskInterface
is also possible.
A class must define a run function and has some optional functions such as error handlers.
Tasks are serialized and therefore must not contain any unserializable fields such as closures orresources. They can define those fields when the task is executed in the run function.
The run function is called when the task is executed. It's the only required function fora task. It can return a value that is passed back to the main process and can be handled bydefining aTask::handleResult(mixed $result)
function in your task.
In all current workers, the input/output streams are connected to the main process, soyou can useecho
andSTDERR
to output something in yourrun()
function at any time.
A task (usually) runs in a different process than the main process. The result and errors are communicatedback to the main process, but it's also possible to call back to the main process during execution.
TheTask
class provides theTask::call()
andTask::callAsync()
functions to calla function in the main process. TheTask::call()
function blocks until the function is executed and theresult is returned. TheTask::callAsync()
function returns aPromise
that resolves when the function is executed and the result is returned. While it is theoretically possibleto send multiple requests to the main process at the same time, the main process still has to process themsynchronously and therefore the async calls have no benefit. It's only recommended to use the async callsif you want to do something else in the task while waiting for the result.
The first parameter of theTask::call()
andTask::callAsync()
functions is aClosure
of the functionthat you want to call in the main process. The function has to be a public function of your task class. The secondand following parameters are the parameters that are passed to the function in the main process as firstand following arguments. The arguments have to be serializable.
Example:
class CallbackTaskextends \Aternos\Taskmaster\Task\Task{staticprotectedint$current =0; #[OnParent]publicfunctiongetCurrent():int {returnstatic::$current++; } #[OnChild]publicfunctionrun():void {$current =$this->call($this->getCurrent(...));echo"I am task number$current\n"; }}
As seen in the example above, it's possible to define functions that are executed in the mainprocessOnParent
,in the child processOnChild
or in bothOnBoth
usingattributes.
These attributes are optional (default isOnBoth
) and for methods mostly used as adocumentation for the developer to clearly show which methods are executed where. The only implementedrestriction for methods is that functions marked with theOnChild
attribute must not be called using theTask::call()
orTask::callAsync()
functions.
The attributes can also be used on properties:
Besides the usage for methods mentioned above, the attributes can also be used on properties to definewhere task properties are used and synchronized.
Properties marked with theOnParent
attribute are only available in the main processand not serialized when running the task. They can contain unserializable values such as closures or resources.
Properties marked with theOnChild
attribute are initially serialized and sent to thechild process, so they can be set initially on the parent. After that, they are only available in the childprocess and never synchronized back to the parent. So the child process in theTask::run()
function canset the values of those properties to something unserializable.
Properties marked with theOnBoth
or no attribute are initially serialized and sent to thechild process. They are synchronized back to the parent whenTask::callAsync()
orTask::call()
isused to call a function in the main process. They are also synchronized back to the parent when the taskis finished or (safely) errors with an exception. The synchronisation ONLY happens on those events, changesto the property are not immediately synchronized. The properties marked with this attribute have to be alwaysserializable.
Example:
class SynchronizedFieldTaskextends \Aternos\Taskmaster\Task\Task{ #[OnBoth]protectedint$counter =0; #[OnBoth]publicfunctionincreaseCounter():void {$this->counter++; } #[OnChild]publicfunctionrun():null {for ($i =0;$i <3;$i++) {$this->increaseCounter();$this->call($this->increaseCounter(...)); }return$this->counter; }}
The result of this task is6
because thecounter
property is synchronized and increased on both sides.
TheOnParent
,OnChild
andOnBoth
attributes are only available in yourTask
class. If other objects are serialized butcontain properties that should not be serialized, you can use theSerializationTrait
in your classand then add theSerializable
orNotSerializable
attributes to your properties.
You can use theSerializable
attribute to mark properties that should be serialized.When using only theSerializable
attribute, all properties that are not marked with theSerializable
attribute will be ignored.
You can use theNotSerializable
attribute to mark properties that should not be serialized.When using only theNotSerializable
attribute, all properties that are not marked with theNotSerializable
attribute will be serialized.
When using both attributes, all propertiesmust be marked with either theSerializable
orNotSerializable
attribute, otherwise an exception will be thrown.
In some cases special handling is required when the task is executed in a synchronous environmentusing theSyncWorker
, e.g. you might not want to closefile handles that are still used by other tasks. TheTask::isSync()
function canbe used to check if the task is being executed synchronously.
TheTask::handleResult()
function is called when the task returns a value. It can be used to handlethe result of the task. You can override this function to implement your own result handler.It is not required to define this function.
The first parameter is the result of the task ornull
if the task did not return a value.The default implementation in theTask
class just stores the result inthe task object for later access using theTask::getResult()
function. If you override thisfunction, you should call the parent function to store the result or store the result yourself.
You can also use theTaskPromise
returned from theTaskmaster::runTask()
function or obtainable from the task object using theTask::getPromise()
functionto handle the result. TheTaskPromise
is resolved with the return value of theTask::run()
function.You can use theTaskPromise::then()
function to handle the result. The first argument is the result,the second argument is the task object.
Example:
$taskmaster->runTask(newSleepTask())->then(function(mixed$result,TaskInterface$task) {echo"The task returned" .$result .PHP_EOL;});
You can define a timeout for a task using theTask::setTimeout(?float $timeout)
function or byoverriding theTask::getTimeout()
function.0
means no timeout,null
means the default timeoutis set by the taskmaster defined byTaskmaster::setDefaultTaskTimeout(float $timeout)
. The timeoutis set in seconds and can be a float value down to microseconds.
If the task takes longer than the timeout, aTaskTimeoutException
is thrown.
Timeouts are only used in asynchronous workers, with aSyncWorker
the task is executed synchronously and therefore the timeout is not used.
Timeouts are not exact, they are evaluated in every update interval of the taskmaster. Therefore, thetask can take a little longer than the timeout.
TheTask::handleError()
function is called when the task caused a fatal unrecoverable error. The firstparameter is anException
that is thrown by the task or by this library. You can override this functionto implement your own error handler.
The three exception thrown by this library are thePhpFatalErrorException
cause by a fatal PHP error, theWorkerFailedException
that is thrownwhen the worker process exited unexpectedly and theTaskTimeoutException
that is thrown when the task takes longer than the timeout.
PHP fatal errors can only be caught if they were caused in a separate process, e.g. when usingtheForkWorker
or theProcessWorker
.It's not recommended to rely on this.
If a worker fails and the task gets aWorkerFailedException
,it is possible that this was not caused by the task itself and therefore a retry of the task might be possible.This should be limited to a few retries to prevent endless loops.
TheTaskTimeoutException
is thrown when the task takes longer than the timeout definedbyTask::getTimeout()
or the default defined byTaskmaster::setDefaultTaskTimeout()
.
The default error handler implementation in theTask
class stores the error inthe task object for later access using theTask::getError()
function and writes the error message toSTDERR
.When overriding this function, you should call the parent function to store the error or store the error yourself.
You can also use theTaskPromise
returned from theTaskmaster::runTask()
function or obtainable from the task object using theTask::getPromise()
functionto handle the error. TheTaskPromise
is rejected with the error. You can use theTaskPromise::catch()
functionto handle the error. The first argument is the error, the second argument is the task object.
Example:
$taskmaster->runTask(newSleepTask())->catch(function(Exception$error,TaskInterface$task) {echo"The task failed:" .$error->getMessage() .PHP_EOL;});
TheTask::handleUncriticalError()
function is called when the task caused an uncritical error, e.g. a PHP warning.You can override this function to implement your own error handler.The first parameter is aPhpError
object that contains the errordetails. The function should returntrue
if the error was handled orfalse
(default) if the PHP errorhandler should continue (usually by logging/outputting the error).
ThehandleUncriticalError
function is called in the same process as the task itself.
When executing tasks synchronously using theSyncWorker
, no PHPerror handler is defined to avoid conflicts with other error handler of the main process. Therefore, thehandleUncriticalError
function is not called in this case.
A task object can simply be created by instancing the task class:
$task =newSleepTask();
And then added to the taskmaster:
$taskmaster->runTask($task);
You can add all your tasks at the beginning:
for ($i =0;$i <100;$i++) {$taskmaster->runTask(newSleepTask());}
or wait for the taskmaster to finish some tasks and then add more to avoid holding all tasks in memory:
for ($i =0;$i <10;$i++) {for ($j =0;$j <10;$j++) {$taskmaster->runTask(newSleepTask()); }$taskmaster->waitUntilAllTasksAreAssigned();}
The best way to dynamically create tasks when necessary is by creating a task factory, by extending theTaskFactory
class or implementingtheTaskFactoryInterface
.
// Your own task factory extending the TaskFactory classclass SleepTaskFactoryextends \Aternos\Taskmaster\Task\TaskFactory{protectedint$count =0;publicfunctioncreateNextTask(?string$group): ?\Aternos\Taskmaster\Task\TaskInterface {if ($this->count++ <100) {returnnewSleepTask(); }// Stop creating tasks after 100 tasksreturnnull; }}$taskmaster->addTaskFactory(newSleepTaskFactory());
You could use the promise returned fromTask::getPromise()
to also handle success andfailure of tasks in your task factory as well.
You can also use the existingIteratorTaskFactory
that creates tasks from aniterator.
// Create an iterator that iterates over all files in the current directory$iterator =new \RecursiveIteratorIterator(new \RecursiveDirectoryIterator("."));// Create the task factory using the iterator and a task class that gets the iterator value as constructor argument// Note, that the SplFileInfo object that you get from a DirectoryIterator is not serializable and therefore cannot be// stored in a task property, but you can use any other values, e.g. the file path$factory =new \Aternos\Taskmaster\Task\IteratorTaskFactory($iterator, FileTask::class);$taskmaster->addTaskFactory($factory);
You can add multiple task factories to a taskmaster. The taskmaster will use the factories in the order they wereadded. If a factory returnsnull
, the next factory is used.
A worker executes tasks. There are different workers available that execute tasks in different environments.
Currently, the following workers are available:
Worker | Requirements | Notes |
---|---|---|
SyncWorker | None | The sync worker can be used as a fallback or if the number of tasks don't justify async execution. |
ProcessWorker | proc_open() - This function is part of the PHP core but might be disabled. | The process worker spawns an entirely new process which causes a slight overhead. |
ForkWorker | pcntl extension - This extension can only be installed in CLI environments. | Forking the current process is more lightweight than spawning a new process. |
ThreadWorker | parallel extension - Also requires a build of PHP with ZTS (Zend Thread Safety) enabled. | This worker is considered experimental and potentially unstable. It should not be used in production. |
You can also write your own worker by extending the existing workers or implementingtheWorkerInterface
.
A worker object can simply be created by instancing the worker class:
$worker =new \Aternos\Taskmaster\Environment\Sync\SyncWorker();
You can define custom options for the worker by creating aTaskmasterOptions
object and passing it to the worker. If the options are not set, the default options from yourTaskmaster
instance are used.
$options =new \Aternos\Taskmaster\TaskmasterOptions();$options->setBootstrap(__DIR__ .'/vendor/autoload.php');$options->setPhpExecutable('/usr/bin/php');$worker =new \Aternos\Taskmaster\Environment\Process\ProcessWorker();$worker->setOptions($options);
Currently only the bootstrap file and the PHP executable can be set as options. Those options are onlyrelevant for some workers, especially theProcessWorker
.
It's possible to proxy the creation of the worker through a proxy process that uses a different PHP binary orenvironment, e.g. you can use a PHP CLI proxy process in a webserver environment to usetheForkWorker
.One proxy can be used for multiple workers.
Currently, the only available proxy is theProcessProxy
that uses a new process openedusingproc_open()
to run the worker.
To use a proxy, create a new proxy object and pass it to the worker:
$proxy =new \Aternos\Taskmaster\Proxy\ProcessProxy();$worker =new \Aternos\Taskmaster\Environment\Fork\ForkWorker();$worker->setProxy($proxy);
You can also defineTaskmasterOptions
for the proxy process.If the options are not set, the default options from yourTaskmaster
instance are used.
$options =new \Aternos\Taskmaster\TaskmasterOptions();$options->setBootstrap(__DIR__ .'/vendor/autoload.php');$options->setPhpExecutable('/usr/bin/php');$proxy =new \Aternos\Taskmaster\Proxy\ProcessProxy();$proxy->setOptions($options);
Before running any tasks, you have to define the workers that should be used.
// Add a single worker$taskmaster->addWorker(new \Aternos\Taskmaster\Environment\Sync\SyncWorker());// Add a worker multiple times$taskmaster->addWorkers(new \Aternos\Taskmaster\Environment\Process\ProcessWorker(),4);// Add multiple workers with the same proxy$worker =new \Aternos\Taskmaster\Environment\Fork\ForkWorker();$worker->setProxy(new \Aternos\Taskmaster\Proxy\ProcessProxy());$taskmaster->addWorkers($worker,8);// Define/replace all workers at once$taskmaster->setWorkers([new \Aternos\Taskmaster\Environment\Fork\ForkWorker(),new \Aternos\Taskmaster\Environment\Fork\ForkWorker(),new \Aternos\Taskmaster\Environment\Fork\ForkWorker(),]);
It's possible to detect the available workers and set them up automatically:
// create 4 automatically detected workers$taskmaster->autoDetectWorkers(4);
This will use theForkWorker
if thepcntl
extension is available,or theProcessWorker
if theproc_open()
function is availableand fall back to theSyncWorker
otherwise.
TheautoDetectWorkers()
function also supports loading the worker configuration from environment variables.
When theautoDetectWorkers()
function is called, it also checks the following environment variablesto create the worker configuration:
Environment variable | Description |
---|---|
TASKMASTER_WORKER_COUNT | The number of workers |
TASKMASTER_WORKER | The type of worker, currently available workers aresync ,fork ,process andthread . The requirements for these workers have to be met, they are not checked again. |
TASKMASTER_WORKER_PROXY | The type of proxy (if any) to use for the workers, currently onlyprocess is available. |
TASKMASTER_WORKER_BIN | Path to the PHP binary to use for the workers, currently only applies toprocess workers. |
TASKMASTER_WORKER_PROXY_BIN | Path to the PHP binary to use for the proxy. |
When using theautoDetectWorkers()
function, it's possible to disable loading the worker configuration fromenvironment variablesby setting the second argument tofalse
or to just disable loading the worker count by setting the third argumenttofalse
.
// Load count and workers from environment variables$taskmaster->autoDetectWorkers(4);// Load nothing from environment variables$taskmaster->autoDetectWorkers(4,false);// Load worker types from environment variables, but keep the worker count$taskmaster->autoDetectWorkers(4,true,false);
You can define tasks that are executed on every worker instance before the first task is executed.This is helpful to run some initial setup or (in case of theForkWorker
)to clear some variables that are inherited from the parent process, e.g. database connections.
// init tasks are always provided by a task factory$taskmaster->setDefaultInitTaskFactory(newInitTaskFactory());// but taskmaster can create task factories automatically by cloning or instancing a task$taskmaster->setDefaultInitTask(newInitTask());$taskmaster->setDefaultInitTask(InitTask::class);// you can also define a task factory for a specific worker$worker->setInitTaskFactory(newInitTaskFactory());
After writing your tasks, creating them and defining the workers, you can start running the tasks.You don't have to explicitly start the taskmaster, just running the update loop through the waitfunctions or manually is enough. Workers and proxies are started when necessary.
Besides configuring workers and proxies directly, you can also configure the defaultTaskmasterOptions
on the taskmaster object. Those options are used for all workers and proxies that don't have their own options.
The bootstrap file is used to autoload classes in the worker process. This isn't used by everyworker, e.g. theSyncWorker
and theForkWorker
don't need this, but theProcessWorker
does.
$taskmaster->setBootstrap(__DIR__ .'/vendor/autoload.php');
If this is not set, Taskmaster tries to find the composer autoloader automatically.
The PHP executable is used to run the worker process. This is currently only used by theProcessWorker
and theProcessProxy
.
$taskmaster->setPhpExecutable('/usr/bin/php');
The default value for the PHP executable is simplyphp
.
You can simply wait for all tasks to finish using theTaskmaster::wait()
function:
$taskmaster->wait();
This function blocks until all tasks are finished and then stops the taskmaster.If you might want to add further tasks, you can also use theTaskmaster::waitUntilAllTasksAreAssigned()
functionto wait until all tasks are assigned to a worker and then add more tasks.
$taskmaster->waitUntilAllTasksAreAssigned();
This doesn't wait for all tasks to finish, but when all tasks are assigned to a worker, it's the besttime to add more tasks to avoid any workers being idle.
You should still wait for all tasks to finish usingTaskmaster::wait()
before stopping the taskmaster.
You can also use theTaskmaster::waitAndHandleTasks()
function to handle tasks when theyfinish instead of waiting for all tasks to finish.
foreach ($taskmaster->waitAndHandleTasks()as$task) {if ($task->getError()) {echo"Task failed:" .$task->getError()->getMessage() .PHP_EOL; }else {echo"Task finished:" .$task->getResult() .PHP_EOL; }}
TheTaskmaster::waitAndHandleTasks()
function returns a generator that yields tasks when they finish.You have to iterate over the generator to handle the tasks or the taskmaster will not continue to run.
You can also run the update loop manually and do something else between the updates.TheTaskmaster::update()
function returns an array of all tasks that finished since the last update.
do {$finishedTasks =$taskmaster->update();// do something else}while ($taskmaster->isRunning());
This is exactly the code of theTaskmaster::wait()
function, but you can do something else between the updates.
After you've waited for all tasks to finish, you should stop the taskmaster:
$taskmaster->stop();
For a more complex setup, you can group several workers together and then define tasks that onlyrun on a certain group.
// create a group A with 4 fork workers$workerA =new \Aternos\Taskmaster\Environment\Fork\ForkWorker();$workerA->setGroup('A');$taskmaster->addWorkers($workerA,4);// create a group B with 2 process workers$workerB =new \Aternos\Taskmaster\Environment\Process\ProcessWorker();$workerB->setGroup('B');$taskmaster->addWorkers($workerB,2);// create tasks that only run on group Afor ($i =0;$i <10;$i++) {$taskA =newSleepTask();$taskA->setGroup('A');$taskmaster->runTask($taskA);}// create tasks that only run on group Bfor ($i =0;$i <5;$i++) {$taskB =newFileTask();$taskB->setGroup('B');$taskmaster->runTask($taskB);}
Task factories also support groups in two ways.
You can directly define, for which groups the task factoryshould be called by returning an array of groups from theTaskFactory::getGroups()
function. You can returnnull
if you want to create tasks for all groups or[null]
in an array if you want to be called for taskswithout a group.
And you get the group as a parameter in theTaskFactory::createNextTask(?string $group)
function. The groupparameter isnull
if the task factory is called for tasks without a group.
class SleepTaskFactoryextends \Aternos\Taskmaster\Task\TaskFactory{protectedint$count =0;publicfunctiongetGroups() : ?array {return ['A','B']; }publicfunctioncreateNextTask(?string$group): ?\Aternos\Taskmaster\Task\TaskInterface {if ($this->count++ <100) {returnnewSleepTask(); }returnnull; }}
About
Object-oriented PHP library for running tasks in parallel