Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitff0b855

Browse files
Refractor redis transport using redis streams
1 parent7162d2e commitff0b855

File tree

21 files changed

+392
-447
lines changed

21 files changed

+392
-447
lines changed

‎.travis.yml‎

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
-MIN_PHP=7.1.3
2020
-SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
-MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
-MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
2223

2324
matrix:
2425
include:
@@ -55,8 +56,8 @@ before_install:
5556
5657
-|
5758
# Start Redis cluster
58-
docker pull grokzen/redis-cluster:4.0.8
59-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
59+
docker pull grokzen/redis-cluster:5.0.4
60+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
6061
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6162
6263
-|
@@ -116,6 +117,7 @@ before_install:
116117
local ext_name=$1
117118
local ext_so=$2
118119
local INI=$3
120+
local input=${4:-yes}
119121
local ext_dir=$(php -r "echo ini_get('extension_dir');")
120122
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
121123
@@ -124,7 +126,7 @@ before_install:
124126
else
125127
rm ~/.pearrc /tmp/pear 2>/dev/null ||true
126128
mkdir -p $ext_cache
127-
echoyes | pecl install -f $ext_name &&
129+
echo$input | pecl install -f $ext_name &&
128130
cp $ext_dir/$ext_so $ext_cache
129131
fi
130132
}
@@ -147,7 +149,6 @@ before_install:
147149
echo session.gc_probability = 0 >> $INI
148150
echo opcache.enable_cli = 1 >> $INI
149151
echo apc.enable_cli = 1 >> $INI
150-
echo extension = redis.so >> $INI
151152
echo extension = memcached.so >> $INI
152153
done
153154
@@ -166,7 +167,11 @@ before_install:
166167
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
167168
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
168169
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
170+
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
169171
done
172+
-|
173+
# List all php extensions with versions
174+
-php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'
170175

171176
-|
172177
# Load fixtures

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17001700
if (empty($config['transports'])) {
17011701
$container->removeDefinition('messenger.transport.symfony_serializer');
17021702
$container->removeDefinition('messenger.transport.amqp.factory');
1703+
$container->removeDefinition('messenger.transport.redis.factory');
17031704
}else {
17041705
$container->getDefinition('messenger.transport.symfony_serializer')
17051706
->replaceArgument(1,$config['serializer']['symfony_serializer']['format'])

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
<tagname="messenger.transport_factory" />
6767
</service>
6868

69+
<serviceid="messenger.transport.redis.factory"class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
70+
<tagname="messenger.transport_factory" />
71+
</service>
72+
6973
<serviceid="messenger.transport.sync.factory"class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
7074
<tagname="messenger.transport_factory" />
7175
</service>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
'options' => ['queue' => ['name' =>'Queue']],
1414
'serializer' =>'messenger.transport.native_php_serializer',
1515
],
16+
'redis' =>'redis://127.0.0.1:6379/messages',
1617
],
1718
],
1819
]);

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
</framework:queue>
1818
</framework:options>
1919
</framework:transport>
20+
<framework:transportname="redis"dsn="redis://127.0.0.1:6379/messages" />
2021
</framework:messenger>
2122
</framework:config>
2223
</container>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ framework:
1111
queue:
1212
name:Queue
1313
serializer:'messenger.transport.native_php_serializer'
14+
redis:'redis://127.0.0.1:6379/messages'

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ public function testMessenger()
673673
$this->assertTrue($container->hasAlias('messenger.default_bus'));
674674
$this->assertTrue($container->getAlias('messenger.default_bus')->isPublic());
675675
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
676+
$this->assertFalse($container->hasDefinition('messenger.transport.redis.factory'));
676677
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
677678
$this->assertSame(TransportFactory::class,$container->getDefinition('messenger.transport_factory')->getClass());
678679
}
@@ -697,6 +698,16 @@ public function testMessengerTransports()
697698
$this->assertEquals(newReference('messenger.transport.native_php_serializer'),$transportArguments[2]);
698699

699700
$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
701+
702+
$this->assertTrue($container->hasDefinition('messenger.transport.redis'));
703+
$transportFactory =$container->getDefinition('messenger.transport.redis')->getFactory();
704+
$transportArguments =$container->getDefinition('messenger.transport.redis')->getArguments();
705+
706+
$this->assertEquals([newReference('messenger.transport_factory'),'createTransport'],$transportFactory);
707+
$this->assertCount(3,$transportArguments);
708+
$this->assertSame('redis://127.0.0.1:6379/messages',$transportArguments[0]);
709+
710+
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
700711
}
701712

702713
publicfunctiontestMessengerRouting()

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php‎

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,104 @@
1212
namespaceSymfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
usePHPUnit\Framework\TestCase;
15+
useSymfony\Component\Messenger\Exception\LogicException;
1516
useSymfony\Component\Messenger\Transport\RedisExt\Connection;
1617

1718
/**
1819
* @requires extension redis
1920
*/
2021
class ConnectionTestextends TestCase
2122
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
publicfunctiontestItCannotBeConstructedWithAWrongDsn()
23+
publicfunctiontestFromInvalidDsn()
2724
{
25+
$this->expectException(\InvalidArgumentException::class);
26+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
27+
2828
Connection::fromDsn('redis://');
2929
}
3030

31-
publicfunctiontestItGetsParametersFromTheDsn()
31+
publicfunctiontestFromDsn()
3232
{
3333
$this->assertEquals(
34-
newConnection('queue',array(
34+
newConnection(['stream' =>'queue'], [
3535
'host' =>'localhost',
3636
'port' =>6379,
37-
)),
37+
]),
3838
Connection::fromDsn('redis://localhost/queue')
3939
);
4040
}
4141

42-
publicfunctiontestOverrideOptionsViaQueryParameters()
42+
publicfunctiontestFromDsnWithOptions()
4343
{
4444
$this->assertEquals(
45-
newConnection('queue',array(
46-
'host' =>'127.0.0.1',
45+
newConnection(['stream' =>'queue','group' =>'group1','consumer' =>'consumer1'], [
46+
'host' =>'localhost',
4747
'port' =>6379,
48-
),array(
49-
'processing_ttl' =>'8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
48+
], [
49+
'blocking_timeout' =>30,
50+
]),
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' =>30])
5252
);
5353
}
54+
55+
publicfunctiontestFromDsnWithQueryOptions()
56+
{
57+
$this->assertEquals(
58+
newConnection(['stream' =>'queue','group' =>'group1','consumer' =>'consumer1'], [
59+
'host' =>'localhost',
60+
'port' =>6379,
61+
], [
62+
'blocking_timeout' =>30,
63+
]),
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
65+
);
66+
}
67+
68+
publicfunctiontestKeepGettingPendingMessages()
69+
{
70+
$redis =$this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
71+
72+
$redis->expects($this->exactly(3))->method('xreadgroup')
73+
->with('symfony','consumer', ['queue' =>0],1,null)
74+
->willReturn(['queue' => [['message' =>json_encode(['body' =>'Test','headers' => []])]]]);
75+
76+
$connection = Connection::fromDsn('redis://localhost/queue', [],$redis);
77+
$this->assertNotNull($connection->get());
78+
$this->assertNotNull($connection->get());
79+
$this->assertNotNull($connection->get());
80+
}
81+
82+
publicfunctiontestFirstGetPendingMessagesThenNewMessages()
83+
{
84+
$redis =$this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
85+
86+
$count =0;
87+
88+
$redis->expects($this->exactly(2))->method('xreadgroup')
89+
->with('symfony','consumer',$this->callback(function ($arr_streams)use (&$count) {
90+
++$count;
91+
92+
if (1 ===$count) {
93+
return'0' ===$arr_streams['queue'];
94+
}
95+
96+
return'>' ===$arr_streams['queue'];
97+
}),1,null)
98+
->willReturn(['queue' => []]);
99+
100+
$connection = Connection::fromDsn('redis://localhost/queue', [],$redis);
101+
$connection->get();
102+
}
103+
104+
publicfunctiontestUnexpectedRedisError()
105+
{
106+
$this->expectException(LogicException::class);
107+
$this->expectExceptionMessage('Redis error happens');
108+
$redis =$this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
109+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111+
112+
$connection = Connection::fromDsn('redis://localhost/queue', [],$redis);
113+
$connection->get();
114+
}
54115
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php‎

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp