Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaa04d06

Browse files
committed
feature#26632 [Messenger] Add AMQP adapter (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes#26632).Discussion----------[Messenger] Add AMQP adapter| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | ø| License | MIT- [x] Depends on the Messenger component#24411- [x] Add tests once we are all happy about the structure---In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used.Configuring the adapter is as simple as the following configuration:```yaml# config/packages/messenger_adapters.yamlframework: messenger: adapter: "%env(MESSENGER_DSN)%"```With the given `.env` for example:```MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages```Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.```yaml# config/packages/messenger_routes.yamlframework: messenger: routing:producer). 'App\Message\Command\CreateNumber': messenger.default_sender```---Additionally, multiple adapters can be created and messages routed to these ones.```yaml# config/packages/messenger_routes.yamlframework: messenger: adapters: commands: "amqp://guest:guest@localhost:5672/%2f/commands" maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance" routing:producer). 'App\Message\Command\CreateNumber': messenger.commands_sender 'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender```Commits-------798c230 [Messenger] Add AMQP adapter
2 parents9a99955 +798c230 commitaa04d06

File tree

33 files changed

+1322
-86
lines changed

33 files changed

+1322
-86
lines changed

‎.travis.yml‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ addons:
1212
-language-pack-fr-base
1313
-ldap-utils
1414
-slapd
15+
-librabbitmq-dev
1516

1617
env:
1718
global:
1819
-MIN_PHP=7.1.3
1920
-SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
21+
-MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
2022

2123
matrix:
2224
include:
@@ -38,6 +40,7 @@ services:
3840
-memcached
3941
-mongodb
4042
-redis-server
43+
-rabbitmq
4144

4245
before_install:
4346
-|
@@ -134,6 +137,11 @@ before_install:
134137
-|
135138
# Install extra PHP extensions
136139
if [[ ! $skip ]]; then
140+
# Install librabbitmq
141+
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb
142+
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb
143+
sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb
144+
137145
# install libsodium
138146
sudo add-apt-repository ppa:ondrej/php -y
139147
sudo apt-get update -q
@@ -142,6 +150,7 @@ before_install:
142150
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI
143151
tfold ext.libsodium tpecl libsodium sodium.so $INI
144152
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI
153+
tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI
145154
fi
146155
147156
-|

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php‎

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,12 +971,17 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
971971
->arrayNode('messenger')
972972
->info('Messenger configuration')
973973
->{!class_exists(FullStack::class) &&class_exists(MessageBusInterface::class) ?'canBeDisabled' :'canBeEnabled'}()
974+
->fixXmlConfig('adapter')
974975
->children()
975976
->arrayNode('routing')
976977
->useAttributeAsKey('message_class')
977978
->beforeNormalization()
978979
->always()
979980
->then(function ($config) {
981+
if (!is_array($config)) {
982+
returnarray();
983+
}
984+
980985
$newConfig =array();
981986
foreach ($configas$k =>$v) {
982987
if (!is_int($k)) {
@@ -1011,6 +1016,28 @@ function ($a) {
10111016
->end()
10121017
->end()
10131018
->end()
1019+
->arrayNode('adapters')
1020+
->useAttributeAsKey('name')
1021+
->arrayPrototype()
1022+
->beforeNormalization()
1023+
->ifString()
1024+
->then(function (string$dsn) {
1025+
returnarray('dsn' =>$dsn);
1026+
})
1027+
->end()
1028+
->fixXmlConfig('option')
1029+
->children()
1030+
->scalarNode('dsn')->end()
1031+
->arrayNode('options')
1032+
->normalizeKeys(false)
1033+
->useAttributeAsKey('name')
1034+
->defaultValue(array())
1035+
->prototype('variable')
1036+
->end()
1037+
->end()
1038+
->end()
1039+
->end()
1040+
->end()
10141041
->end()
10151042
->end()
10161043
->end()

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php‎

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14681468
}else {
14691469
$container->removeDefinition('messenger.middleware.validator');
14701470
}
1471+
1472+
foreach ($config['adapters']as$name =>$adapter) {
1473+
$container->setDefinition('messenger.sender.'.$name, (newDefinition(SenderInterface::class))->setFactory(array(
1474+
newReference('messenger.adapter_factory'),
1475+
'createSender',
1476+
))->setArguments(array(
1477+
$adapter['dsn'],
1478+
$adapter['options'],
1479+
))->addTag('messenger.sender'));
1480+
1481+
$container->setDefinition('messenger.receiver.'.$name, (newDefinition(ReceiverInterface::class))->setFactory(array(
1482+
newReference('messenger.adapter_factory'),
1483+
'createReceiver',
1484+
))->setArguments(array(
1485+
$adapter['dsn'],
1486+
$adapter['options'],
1487+
))->addTag('messenger.receiver'));
1488+
}
14711489
}
14721490

14731491
privatefunctionregisterCacheConfiguration(array$config,ContainerBuilder$container)

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,18 @@
7272
<tagname="container.service_locator" />
7373
<argumenttype="collection" />
7474
</service>
75+
76+
<!-- Adapters-->
77+
<serviceid="messenger.adapter_factory"class="Symfony\Component\Messenger\Adapter\Factory\ChainAdapterFactory">
78+
<argumenttype="tagged"tag="messenger.adapter_factory" />
79+
</service>
80+
81+
<serviceid="messenger.adapter.amqp.factory"class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
82+
<argumenttype="service"id="messenger.transport.default_encoder" />
83+
<argumenttype="service"id="messenger.transport.default_decoder" />
84+
<argument>%kernel.debug%</argument>
85+
86+
<tagname="messenger.adapter_factory" />
87+
</service>
7588
</services>
7689
</container>

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@
354354
<xsd:sequence>
355355
<xsd:elementname="routing"type="messenger_routing"minOccurs="0"maxOccurs="unbounded" />
356356
<xsd:elementname="middlewares"type="messenger_middleware"minOccurs="0"maxOccurs="unbounded" />
357+
<xsd:elementname="adapter"type="messenger_adapter"minOccurs="0"maxOccurs="unbounded" />
357358
</xsd:sequence>
358359
</xsd:complexType>
359360

@@ -368,6 +369,19 @@
368369
<xsd:attributename="service"type="xsd:string"use="required"/>
369370
</xsd:complexType>
370371

372+
<xsd:complexTypename="messenger_adapter">
373+
<xsd:sequence>
374+
<xsd:elementname="option"type="messenger_adapter_option"minOccurs="0"maxOccurs="unbounded" />
375+
</xsd:sequence>
376+
<xsd:attributename="name"type="xsd:string" />
377+
<xsd:attributename="dsn"type="xsd:string" />
378+
</xsd:complexType>
379+
380+
<xsd:complexTypename="messenger_adapter_option">
381+
<xsd:attributename="name"type="xsd:string" />
382+
<xsd:attributename="value"type="xsd:string" />
383+
</xsd:complexType>
384+
371385
<xsd:complexTypename="messenger_middleware">
372386
<xsd:sequence>
373387
<xsd:elementname="validation"type="messenger_validation"minOccurs="0"maxOccurs="1" />

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
258258
'enabled' => !class_exists(FullStack::class),
259259
),
260260
),
261+
'adapters' =>array(),
261262
),
262263
);
263264
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework',array(
4+
'messenger' =>array(
5+
'adapters' =>array(
6+
'default' =>'amqp://localhost/%2f/messages',
7+
'customised' =>array(
8+
'dsn' =>'amqp://localhost/%2f/messages?exchange_name=exchange_name',
9+
'options' =>array('queue_name' =>'Queue'),
10+
),
11+
),
12+
),
13+
));
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<containerxmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:adaptername="default"dsn="amqp://localhost/%2f/messages" />
11+
<framework:adaptername="customised"dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
12+
<framework:optionname="queue_name"value="Queue" />
13+
</framework:adapter>
14+
</framework:messenger>
15+
</framework:config>
16+
</container>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
framework:
2+
messenger:
3+
adapters:
4+
default:'amqp://localhost/%2f/messages'
5+
customised:
6+
dsn:'amqp://localhost/%2f/messages?exchange_name=exchange_name'
7+
options:
8+
queue_name:Queue

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php‎

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ public function testWebLink()
523523
publicfunctiontestMessenger()
524524
{
525525
$container =$this->createContainerFromFile('messenger');
526+
$this->assertTrue($container->hasDefinition('message_bus'));
526527
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
527528
}
528529

@@ -538,6 +539,33 @@ public function testMessengerValidationDisabled()
538539
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
539540
}
540541

542+
publicfunctiontestMessengerAdapter()
543+
{
544+
$container =$this->createContainerFromFile('messenger_adapter');
545+
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
546+
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
547+
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
548+
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
549+
550+
$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
551+
$senderFactory =$container->getDefinition('messenger.sender.customised')->getFactory();
552+
$senderArguments =$container->getDefinition('messenger.sender.customised')->getArguments();
553+
554+
$this->assertEquals(array(newReference('messenger.adapter_factory'),'createSender'),$senderFactory);
555+
$this->assertCount(2,$senderArguments);
556+
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name',$senderArguments[0]);
557+
$this->assertEquals(array('queue_name' =>'Queue'),$senderArguments[1]);
558+
559+
$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
560+
$receiverFactory =$container->getDefinition('messenger.receiver.customised')->getFactory();
561+
$receiverArguments =$container->getDefinition('messenger.receiver.customised')->getArguments();
562+
563+
$this->assertEquals(array(newReference('messenger.adapter_factory'),'createReceiver'),$receiverFactory);
564+
$this->assertCount(2,$receiverArguments);
565+
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name',$receiverArguments[0]);
566+
$this->assertEquals(array('queue_name' =>'Queue'),$receiverArguments[1]);
567+
}
568+
541569
publicfunctiontestTranslator()
542570
{
543571
$container =$this->createContainerFromFile('full');

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp