Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Added AMQP component#27140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
lyrixx wants to merge1 commit intosymfony:masterfromlyrixx:amqp
Closed

Added AMQP component#27140

lyrixx wants to merge1 commit intosymfony:masterfromlyrixx:amqp

Conversation

@lyrixx
Copy link
Member

QA
Branch?master
Bug fix?no
New feature?yes
BC breaks?no
Deprecations?no
Tests pass?yes
Fixed tickets
LicenseMIT
Doc PR

pyrech, lyrixx, and andreybolonin reacted with hooray emoji
@sroze
Copy link
Contributor

💃 ! Can you, in the PR description, show me how to use it with Messenger so I give it a try? 😉

andreybolonin reacted with thumbs up emoji

@lyrixx
Copy link
MemberAuthor

Can you, in the PR description, show me how to use it with Messenger so I give it a try?

Right now, it's not wired with Messenger yet.
I thinks it's better to merge this as it (after review / fix etc. obviously), then to add the bridge.

Here a doc draft

Symfony AMQP

Fed up of writing the same boiler-plate code over and over again whenever you
need to use your favorite AMQP broker? Have you a hard time remembering how to
publish a message or how to wire exchanges and queues? I had the exact same
feeling. There are many AMQP libraries providing a very good low-level access to
the AMQP protocol, but what about providing a simple API for abstracting the
most common use cases? This library gives you an opinionated way of using any
AMQP brokers and it also provides a nice and consistent API for low-level
interaction with any AMQP brokers.

Dependencies

This library depends on theamqp PECL extensions (version 1.4.0-beta2 or
later)::

sudo apt-get install php-amqp

Using the Conventions

The simplest usage of an AMQP broker is sending a message that is consumed by
another script::

use Symfony\Component\Amqp\Broker;// connects to a local AMQP broker by default$broker = new Broker();// publish a message on the 'log' queue$broker->publish('log', 'some message');// in another script (non-blocking)// $message is false if no messages are in the queue$message = $broker->get('log');// blocking (waits for a message to be available in the queue)$message = $broker->consume('log');

The example above is based on some "conventions" and as such makes the
following assumptions:

  • A default exchange is used to publish the message (named
    symfony.default);

  • The routing is done via the routing key (log in this example);

  • Queues and exchanges are created implicitly when first accessed;

  • The connection to the broker is done lazily whenever a message must be sent
    or received.

Retrying a Message

Retrying processing a message when an error occurs is as easy as defining a
retry strategy on a queue::

use Symfony\Component\Amqp\RetryStrategy\ConstantRetryStrategy;// configure the queue explicitly$broker->createQueue('log', array(    // retry every 5 seconds    'retry_strategy' => new ConstantRetryStrategy(5),));

Whenever you$broker->retry() a message, it is going to be automatically re-
enqueued after a5 seconds wait for a retry.

You can also drop the message after a limited number of retries (2 in the
following example)::

$broker->createQueue('log', array(    // retry 2 times    'retry_strategy' => new ConstantRetryStrategy(5, 2),));

Instead of trying everyn seconds, you can also use a retry mechanism based
on a truncated exponential backoff algorithm::

use Symfony\Component\Amqp\RetryStrategy\ExponentialRetryStrategy;$broker->createQueue('log', array(    // retry 5 times    'retry_strategy' => new ExponentialRetryStrategy(5),));

The message will be re-enqueued after 1 second the first time you call
retry(),2^1 seconds the second time,2^2 seconds the third time,
and2^n seconds the nth time. If you want to wait more than 1 second the
first time, you can pass an offset::

$broker->createQueue('log', array(    // starts at 2^3    'retry_strategy' => new ExponentialRetryStrategy(5, 3),));

.. note::

The retry strategies are implemented by using the dead-lettering feature ofAMQP. Behind the scene, a special exchange is bound to queues configuredbased on the retry strategy you set.

.. note::

Don't forget to ``ack`` or ``nack`` your message if you retry it. Andobviously you should not use the AMQP_Requeue flag.

Configuring a Broker

By default, a broker tries to connect to a local AMQP broker with the default
port, username, and password. If you have a different setting, pass a URI to
theBroker constructor::

$broker = new Broker('amqp://user:pass@10.1.2.3:345/some-vhost');

Configuring an Exchange

The default exchange used by the library is of typedirect. You can also
create your own exchange::

// define a new fanout exchange$broker->createExchange('sensiolabs.fanout', array('type' => \AMQP_EX_TYPE_FANOUT));

You can then binding a queue to this named exchange easily::

$broker->createQueue('logs', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));$broker->createQueue('logs.again', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));

The second argument ofcreateExchange() takes an array of arguments passed
to the exchange. The following keys are used to further configure the exchange:

  • flags: sets the exchange flags;

  • type: sets the type of the queue (see\AMQP_EX_TYPE_* constants).

.. note::

Note that ``createExchange()`` automatically declares the exchange.

Configuring a Queue

As demonstrated in some examples, you can create your own queue. As for the
exchange, the second argument of thecreateQueue() method is a list of
queue arguments; the following keys are used to further configure the queue:

  • exchange: The exchange name to bind the queue to (the default exchange is
    used if not set);

  • flags: Sets the exchange flags;

  • bind_arguments: An array of arguments to pass when binding the queue with
    an exchange;

  • retry_strategy: The retry strategy to use (an instance of
    :class:Symfony\\Amqp\\RetryStrategy\\RetryStrategyInterface).

.. note::

Note that ``createQueue()`` automatically declares and binds the queue.

Implementation details

The retry strategy
..................

The retry strategy is implemented with two custom and private exchanges:
symfony.dead_letter andsymfony.retry.

CallingBroker::retry will publish the same message in the
symfony.dead_letter exchange.

This exchange will route the message to a queue named like
%exchange%.%time%.wait, for examplesensiolabs.default.000005.wait. This
queue has a TTL of 5 seconds. It means that if nothing consumes this message, it
will be dropped after 5 seconds. But this queue has also a Dead Letter (DL). It
means that instead of dropping the message, the AMQP server will re-publish
automatically the message to the Exchange configured as DL.

After 5 seconds the message will be re-published tosymfony.retry Exchange.
This exchange is bound with every single queue. Finally, the message will land
in the original queue.

jeremyb and andreybolonin reacted with thumbs up emoji

@lyrixxlyrixxforce-pushed theamqp branch 5 times, most recently from8a5a74d to15e05b1CompareMay 3, 2018 16:50
@nicolas-grekasnicolas-grekas added this to thenext milestoneMay 3, 2018
Copy link
Member

@nicolas-grekasnicolas-grekas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Here is a first round of random comments :)

private$logger;

/**
* @param ContainerInterface $container A PSR11 container from which to load the Broker service
Copy link
Member

@nicolas-grekasnicolas-grekasMay 3, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

can't the docblock be removed? feels like so :)
the broker should be injected directly instead of the container

protectedfunctionconfigure()
{
$this
->setName('amqp:move')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

to make the command lazy, you should defineprotected static $defaultName = 'amqp:move'; instead

-if [[ ! $skip ]]; then $COMPOSER_UP; fi
-if [[ ! $skip ]]; then ./phpunit install; fi
-php -i

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

should be kept, ths makes it a separate section in travis output

useSymfony\Bundle\FrameworkBundle\Routing\AnnotatedRouteControllerLoader;
useSymfony\Bundle\FullStack;
useSymfony\Component\Cache\Adapter\AbstractAdapter;
useSymfony\Component\Amqp\Broker;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

alpha order

->addArgument($connection['dsn'])
->addArgument($connection['queues'])
->addArgument($connection['exchanges'])
->setPublic(false)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

already the default

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

https://github.com/symfony/symfony/blob/master/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php => there are 14 times->setPublic(false). Looks like there is an "easy first contribution" here

/**
* This class should not be instantiated.
*/
privatefunction__construct()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

no need for this, the class is internal already

* creation.
*
* The following arguments are "special":
*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

extra line

* Create a new Exchange.
*
* Special arguments:
*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

extra line (there might be more)

"php":"^7.1.3",
"ext-amqp":">=1.5",
"psr/log":"~1.0",
"symfony/event-dispatcher":"^2.3|^3.0|^4.0"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

should be^3.4|^4.0 as in other components (we bump these at each major versions)

"minimum-stability":"dev",
"extra": {
"branch-alias": {
"dev-master":"4.1-dev"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

4.2

->ifTrue(function ($v) {
return !is_array($v);
})
->thenInvalid('Arguments should be an array (got %s).')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

should ->must ? Same for all the other occurrences of "should".


publicfunctionpublish($message,$routingKey =null,$flags =null,array$attributes =null)
{
if (null ===$flags) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Maybe we can remove thisif() and move this logic to this other line:

returnparent::publish($message,$routingKey,$flags ?? \AMQP_MANDATORY,$attributes);

Copy link
MemberAuthor

@lyrixxlyrixxMay 4, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

No because if someone passAMQP_NOPARAM (===0), we don't want to change the value.
And I can not set$flag = AMQP_NOPARAM as default value, because I need to keep the same signature as the parent :/

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Oh, with?? it works. I will update the code. 👍

*
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class MessageExporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I'm not a heavy user of queues, so I can be totally wrong ... but this command looks strange to me. Why not allowing to export all queue messages as JSON files in some dir. If the user wants compression too, they can use any command tool. Compressing messages (and picking the tgz format specifically) looks a bit "too much" for Symfony core.

sroze reacted with thumbs up emoji
Copy link
MemberAuthor

@lyrixxlyrixxMay 4, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

AMQP Messages could be something else than JSON, It could be an image for example
So exporting that in a json file is too opinionated.

But indeed, the compression could be an option, but it comes for almost free ;)

But you are right about "too much" for Symfony core. It was from our internal sensiolabs/amqp package. but it's really useful to debug ;)

tucksaun reacted with thumbs up emoji
$routingKeys =array($name);
}

if (isset($arguments['flags'])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Maybe we can refactor this into PHP 7 code?

$this->setFlags($arguments['flags'] ?? \AMQP_DURABLE);unset($arguments['flags']);

Same for the other occurrences.

*/
publicfunction__construct(int$time,int$max =0)
{
$time = (int)$time;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Not needed because of theint typehint in the argument.

$time = (int)$time;

if ($time <1) {
thrownewInvalidArgumentException('"time" should be at least 1.');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This looks like an arbitrary decision. I expected0 to be allowed and mean "no wait".

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

If you don't want to wait, there is no need to use a retry strategy. A simple nack + requeue is enough (I guess)

@lyrixxlyrixxforce-pushed theamqp branch 5 times, most recently from89c8d34 to49fcf13CompareMay 4, 2018 16:59
@fabpot
Copy link
Member

@lyrixx We have a month to finish this one. I would really like to merge it for 4.2 as the Messenger component will have to become stable in terms of API. Do you think you will have time to work on this?

@lyrixx
Copy link
MemberAuthor

lyrixx commentedAug 23, 2018
edited
Loading

@fabpot Hello, Except some tiny comments / conflict I have to address, I think this PR is ready for review.

@sroze
Copy link
Contributor

Right now, it's not wired with Messenger yet. I thinks it's better to merge this as it (after review / fix etc. obviously), then to add the bridge.

@lyrixx I actually think that we need to have the bridge straight away. It will helps us to see if the two fit well and if we need to change any of the two as soon as possible :)

->end()
->validate()
->ifTrue(function ($config) {
return'constant' ===$config['retry_strategy'] && !array_key_exists('max',$config['retry_strategy_options']);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Instead of these, can't we explicitly set the retry strategy options inretry_strategy.constant andretry_strategy.exponential? The only thing to validate after is that we only have one retry strategy.

;
}

privatefunctionfixXmlArguments($v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What's exactly fixed by this method? Can't you simply have an XSD that allows any kind of data like the rest of the "free options"? 🤔

}

if (!$match) {
thrownew \InvalidArgumentException(sprintf('The "framework.amqp.default_connection" option "%s" does not exist.',$defaultConnectionName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

"The default connection configured ("%s") does not exist." ?


$container
->setAlias('amqp.broker',"amqp.broker.$defaultConnectionName")
->setPublic(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What should I alias ? the Broker::class to the default connection ?

Yep 👍

<defaultspublic="false" />

<serviceid="amqp.command.move"class="Symfony\Component\Amqp\Command\AmqpMoveCommand">
<argumenttype="service"id="amqp.broker" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You should use a connection locator here I guess, in case there are multiple connections defined.


parent::setName($name);

if (Broker::DEAD_LETTER_EXCHANGE ===$name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Outch. That's super hardcoding: can you move this logic (i.e. name to type mapping) to wherever this is created?

parent::setFlags($arguments['flags'] ?? \AMQP_DURABLE);
unset($arguments['flags']);

parent::declareExchange();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why would we do so at construct time?

publicfunctionpublish($message,$routingKey =null,$flags =null,array$attributes =null)
{
$attributes =array_merge(array(
'delivery_mode' =>2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You can use theAMQP_DURABLE constant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Actually, why is it forced here?

{
constDEFAULT_EXCHANGE ='symfony.default';
constDEAD_LETTER_EXCHANGE ='symfony.dead_letter';
constRETRY_EXCHANGE ='symfony.retry';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Same than forBroker: should be configurable.

publicfunction__construct(\AMQPConnection$connection,array$queues =array(),array$exchanges =array())
{
$this->connection =$connection;
$this->connection->setReadTimeout(4 *60 *60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

As a configuration?

@fabpot
Copy link
Member

I agree with@sroze, we want the integration in the same PR.

private$queuesBindings =array();

/**
* Create a new Broker instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Creates (I won't do the same for other similar changes)

* Create a new Broker instance.
*
* Example of $queuesConfiguration
* array(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

code example should be indented (and with a blank line before)

/**
* Creates a new Exchange.
*
* Special arguments: See the Exchange constructor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I would use a regular@param tag here for that info

/**
* Creates a new Queue.
*
* Special arguments: See the Queue constructor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

same here


// WIP

interface BrokerInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Do we even need an interface?

@fabpotfabpot closed thisMar 4, 2019
@nicolas-grekasnicolas-grekas modified the milestones:next,4.3Apr 30, 2019
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@fabpotfabpotfabpot left review comments

@javiereguiluzjaviereguiluzjaviereguiluz left review comments

@nicolas-grekasnicolas-grekasnicolas-grekas left review comments

+1 more reviewer

@srozesrozesroze requested changes

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

4.3

Development

Successfully merging this pull request may close these issues.

6 participants

@lyrixx@sroze@fabpot@javiereguiluz@nicolas-grekas@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp