Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Swoole-based worker pool, coroutine pool / 基于 Swoole 的工作池,协程池

NotificationsYou must be signed in to change notification settings

mix-php/worker-pool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

OpenMix 出品:https://openmix.org

Mix Worker Pool

Swoole-based worker pool, coroutine pool

基于 Swoole 的工作池,协程池

go 版本:https://github.com/mix-go/xwp

Installation

composer require mix/worker-pool

单次调度

  • 如果不想阻塞执行,可以使用$pool->start() 启动
$jobQueue =newSwoole\Coroutine\Channel(200);$maxWorkers =100;$handler =function ($data) {// do something};$pool =newMix\WorkerPool\WorkerPool($jobQueue,$maxWorkers,$handler);go(function ()use ($jobQueue,$pool) {// 投放任务for ($i =0;$i <1000;$i++) {$jobQueue->push($i);    }// 停止$pool->stop();});$pool->run();// 阻塞等待

上面是采用闭包处理任务,也可以使用对象处理任务

class FooHandlerimplements \Mix\WorkerPool\RunInterface{publicfunctiondo($data):void    {// do something    }}$pool =newMix\WorkerPool\WorkerPool($jobQueue,$maxWorkers,newFooHandler());

常驻调度

适合处理 MQ 队列的异步消费

以 Redis 作为 MQ 为例:

$maxWorkers =20;$maxQueue =10;$jobQueue =newSwoole\Coroutine\Channel($maxQueue);$handler =function ($data) {// do something};$pool =newMix\WorkerPool\WorkerPool($jobQueue,$maxWorkers,$handler);$quit =newSwoole\Coroutine\Channel();foreach ([SIGHUP,SIGINT,SIGTERM]as$signal) {Swoole\Process::signal($signal,function ()use ($quit) {$quit->push(true);    });}go(function ()use ($jobQueue,$pool,$quit) {// 投放任务while (true) {if (!$quit->isEmpty()) {$pool->stop();return;        }try {$data =$redis->brPop(['test'],1);        }catch (\Throwable$ex) {// print log$pool->stop();return;        }if (!$data) {continue;        }$data =array_pop($data);// brPop命令最后一个键才是值$jobQueue->push($data);    }});$pool->run();// 阻塞等待

异常处理

闭包或者对象do 方法中执行的代码,可能会抛出异常,必须要使用try/catch 避免协程退出

class FooHandlerimplements \Mix\WorkerPool\RunInterface{publicfunctiondo($data):void    {try {// do something        }catch (\Throwable$ex){// print log        }    }}

License

Apache License Version 2.0,http://www.apache.org/licenses/

About

Swoole-based worker pool, coroutine pool / 基于 Swoole 的工作池,协程池

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp