Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitead643a

Browse files
committed
fix:redis-subscriber:redis-subscribe > redis-subscriber
0 parents  commitead643a

File tree

11 files changed

+634
-0
lines changed

11 files changed

+634
-0
lines changed

‎.gitignore‎

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# phpstorm project files
2+
.idea
3+
4+
# netbeans project files
5+
nbproject
6+
7+
# zend studio for eclipse project files
8+
.buildpath
9+
.project
10+
.settings
11+
12+
# windows thumbnail cache
13+
Thumbs.db
14+
15+
# composer itself is not needed
16+
composer.phar
17+
vendor
18+
19+
# Mac DS_Store Files
20+
.DS_Store
21+
22+
# phpunit itself is not needed
23+
phpunit.phar
24+
# local phpunit config
25+
/phpunit.xml

‎README.md‎

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
##Mix Redis Subscriber
2+
3+
Redis native protocol Subscriber based on Swoole coroutine
4+
5+
基于 Swoole 协程的 Redis 原生协议订阅库
6+
7+
使用 Socket 直接连接 Redis 服务器,不依赖 phpredis 扩展,该订阅器有如下优点:
8+
9+
- 平滑修改:可随时增加、取消订阅通道,实现无缝切换通道的需求。
10+
- 跨协程安全关闭:可在任意时刻关闭订阅。
11+
- 通道获取消息:该库封装风格参考 golang 语言[go-redis](https://github.com/go-redis/redis) 库封装,通过 channel 获取订阅的消息。
12+
13+
##Installation
14+
15+
- Swoole >= 4.4
16+
17+
```
18+
composer require mix/redis-subscriber
19+
```
20+
21+
##订阅频道
22+
23+
- 连接、订阅失败会抛出异常
24+
25+
```php
26+
$sub = new \Mix\Redis\Subscriberr\Subscriber('127.0.0.1', 6379, '', 5); // 连接失败将抛出异常
27+
$sub->subscribe('foo', 'bar'); // 订阅失败将抛出异常
28+
29+
$chan = $sub->channel();
30+
while (true) {
31+
$data = $chan->pop();
32+
if (empty($data)) { // 手动close与redis异常断开都会导致返回false
33+
if (!$sub->closed) {
34+
// redis异常断开处理
35+
var_dump('Redis connection is disconnected abnormally');
36+
}
37+
break;
38+
}
39+
var_dump($data);
40+
}
41+
```
42+
43+
接收到订阅消息:
44+
45+
```
46+
object(Mix\Redis\Subscriberr\Message)#8 (2) {
47+
["channel"]=>
48+
string(2) "foo"
49+
["payload"]=>
50+
string(4) "test"
51+
}
52+
```
53+
54+
##全部方法
55+
56+
| 方法| 描述|
57+
| ---| ---|
58+
| subscribe(string ...$channels) : void| 增加订阅|
59+
| unsubscribe(string ...$channels) : void| 取消订阅|
60+
| channel() : Swoole\Coroutine\Channel| 获取消息通道|
61+
| close() : void| 关闭订阅|
62+
63+
##License
64+
65+
Apache License Version 2.0,http://www.apache.org/licenses/

‎composer.json‎

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"name":"mix/redis-subscriber",
3+
"description":"Redis native protocol Subscriber based on Swoole coroutine",
4+
"type":"library",
5+
"keywords": [
6+
"mix",
7+
"swoole",
8+
"redis",
9+
"subscribe",
10+
"subscriber"
11+
],
12+
"homepage":"https://openmix.org/mix-php",
13+
"license":"Apache-2.0",
14+
"authors": [
15+
{
16+
"name":"liu,jian",
17+
"email":"coder.keda@gmail.com"
18+
}
19+
],
20+
"require": {
21+
"php":">=7.0.0",
22+
"ext-swoole":">=4.4.4"
23+
},
24+
"autoload": {
25+
"psr-4": {
26+
"Mix\\Redis\\Subscriber\\":"src/"
27+
}
28+
},
29+
"require-dev": {
30+
"phpunit/phpunit":"^7.0.0"
31+
}
32+
}

‎src/CommandInvoker.php‎

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
<?php
2+
3+
namespaceMix\Redis\Subscriber;
4+
5+
useSwoole\Timer;
6+
useSwoole\Coroutine;
7+
useSwoole\Coroutine\Channel;
8+
9+
/**
10+
* Class CommandInvoker
11+
* @package Mix\Redis\Subscriber
12+
*/
13+
class CommandInvoker
14+
{
15+
16+
/**
17+
* @var Connection
18+
*/
19+
protected$connection;
20+
21+
/**
22+
* EOF
23+
*/
24+
constEOF ="\r\n";
25+
26+
/**
27+
* @var Channel
28+
*/
29+
protected$resultChannel;
30+
31+
/**
32+
* @var Channel
33+
*/
34+
protected$messageChannel;
35+
36+
/**
37+
* CommandInvoker constructor.
38+
* @param Connection $connection
39+
*/
40+
publicfunction__construct(Connection$connection)
41+
{
42+
$this->connection =$connection;
43+
$this->resultChannel =newChannel();
44+
$this->messageChannel =newChannel(100);
45+
Coroutine::create(function ()use ($connection) {
46+
$this->receive($connection);
47+
});
48+
}
49+
50+
/**
51+
* Receive
52+
* @param Connection $connection
53+
* @throws \Swoole\Exception
54+
*/
55+
publicfunctionreceive(Connection$connection)
56+
{
57+
$buffer =null;
58+
while (true) {
59+
$line =$connection->recv();
60+
if ($line ===false ||$line ==="") {
61+
$this->interrupt();
62+
break;
63+
}
64+
$line =substr($line,0, -(strlen(static::EOF)));
65+
66+
if ($line =='+OK') {
67+
$this->resultChannel->push($line);
68+
continue;
69+
}
70+
71+
if ($line =='*3') {
72+
if (!empty($buffer)) {
73+
$this->resultChannel->push($buffer);
74+
$buffer =null;
75+
}
76+
$buffer[] =$line;
77+
continue;
78+
}
79+
80+
$buffer[] =$line;
81+
82+
$type =$buffer[2] ??false;
83+
84+
if ($type =='subscribe' &&count($buffer) ==6) {
85+
$this->resultChannel->push($buffer);
86+
$buffer =null;
87+
continue;
88+
}
89+
90+
if ($type =='unsubscribe' &&count($buffer) ==6) {
91+
$this->resultChannel->push($buffer);
92+
$buffer =null;
93+
continue;
94+
}
95+
96+
if ($type =='message' &&count($buffer) ==7) {
97+
$message =newMessage();
98+
$message->channel =$buffer[4];
99+
$message->payload =$buffer[6];
100+
$timerID = Timer::after(30 *1000,function ()use ($message) {
101+
static::error(sprintf('Message channel (%s) is 30 seconds full, disconnected',$message->channel));
102+
$this->interrupt();
103+
});
104+
$this->messageChannel->push($message);
105+
Timer::clear($timerID);
106+
$buffer =null;
107+
continue;
108+
}
109+
}
110+
}
111+
112+
/**
113+
* Invoke
114+
* @param string $command
115+
* @param int $number
116+
* @return array
117+
* @throws \Throwable
118+
*/
119+
publicfunctioninvoke(string$command,int$number)
120+
{
121+
try {
122+
$this->connection->send($command .static::EOF);
123+
}catch (\Throwable$e) {
124+
$this->interrupt();
125+
throw$e;
126+
}
127+
$result = [];
128+
for ($i =0;$i <$number;$i++) {
129+
$result[] =$this->resultChannel->pop();
130+
}
131+
return$result;
132+
}
133+
134+
/**
135+
* Channel
136+
* @return Channel
137+
*/
138+
publicfunctionchannel()
139+
{
140+
return$this->messageChannel;
141+
}
142+
143+
/**
144+
* Interrupt
145+
* @return bool
146+
* @throws \Swoole\Exception
147+
*/
148+
publicfunctioninterrupt()
149+
{
150+
$this->connection->close();
151+
$this->resultChannel->close();
152+
$this->messageChannel->close();
153+
returntrue;
154+
}
155+
156+
/**
157+
* Print error
158+
* @param \Throwable $ex
159+
*/
160+
protectedstaticfunctionerror(string$message)
161+
{
162+
$time =date('Y-m-d H:i:s');
163+
echo"[error]$time$message\n";
164+
}
165+
166+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp