Movatterモバイル変換


[0]ホーム

URL:


Skip to content
Version:

The Messenger Component

Edit this page

The Messenger component helps applications send and receive messages to/fromother applications or via message queues.

The component is greatly inspired by Matthias Noback's series ofblog posts about command buses and theSimpleBus project.

See also

This article explains how to use the Messenger features as an independentcomponent in any PHP application. Read theMessenger: Sync & Queued Message Handling article tolearn about how to use it in Symfony applications.

Installation

1
$composer require symfony/messenger

Note

If you install this component outside of a Symfony application, you mustrequire thevendor/autoload.php file in your code to enable the classautoloading mechanism provided by Composer. Readthis article for more details.

Concepts

Sender:
Responsible for serializing and sending messages tosomething. Thissomething can be a message broker or a third party API for example.
Receiver:
Responsible for retrieving, deserializing and forwarding messages to handler(s).This can be a message queue puller or an API endpoint for example.
Handler:
Responsible for handling messages using the business logic applicable to the messages.Handlers are called by theHandleMessageMiddleware middleware.
Middleware:
Middleware can access the message and its wrapper (the envelope) while it isdispatched through the bus.Literally"the software in the middle", those are not about core concerns(business logic) of an application. Instead, they are cross cutting concernsapplicable throughout the application and affecting the entire message bus.For instance: logging, validating a message, starting a transaction, ...They are also responsible for calling the next middleware in the chain,which means they can tweak the envelope, by adding stamps to it or evenreplacing it, as well as interrupt the middleware chain. Middleware are calledboth when a message is originally dispatched and again later when a messageis received from a transport.
Envelope:
Messenger specific concept, it gives full flexibility inside the message bus,by wrapping the messages into it, allowing to add useful information insidethroughenvelope stamps.
Envelope Stamps:
Piece of information you need to attach to your message: serializer contextto use for transport, markers identifying a received message or any sort ofmetadata your middleware or transport layer may use.

Bus

The bus is used to dispatch messages. The behavior of the bus is in its orderedmiddleware stack. The component comes with a set of middleware that you can use.

When using the message bus with Symfony's FrameworkBundle, the following middlewareare configured for you:

  1. SendMessageMiddleware (enables asynchronous processing, logs the processing of your messages if you provide a logger)
  2. HandleMessageMiddleware (calls the registered handler(s))

Example:

123456789101112131415
useApp\Message\MyMessage;useApp\MessageHandler\MyMessageHandler;useSymfony\Component\Messenger\Handler\HandlersLocator;useSymfony\Component\Messenger\MessageBus;useSymfony\Component\Messenger\Middleware\HandleMessageMiddleware;$handler =newMyMessageHandler();$bus =newMessageBus([newHandleMessageMiddleware(newHandlersLocator([        MyMessage::class => [$handler],    ])),]);$bus->dispatch(newMyMessage(/* ... */));

Note

Every middleware needs to implement theMiddlewareInterface.

Handlers

Once dispatched to the bus, messages will be handled by a "message handler". Amessage handler is a PHP callable (i.e. a function or an instance of a class)that will do the required processing for your message:

1234567891011
namespaceApp\MessageHandler;useApp\Message\MyMessage;classMyMessageHandler{publicfunction__invoke(MyMessage$message):void{// Message processing...    }}

Adding Metadata to Messages (Envelopes)

If you need to add metadata or some configuration to a message, wrap it with theEnvelope class and add stamps.For example, to set the serialization groups used when the message goesthrough the transport layer, use theSerializerStamp stamp:

12345678910
useSymfony\Component\Messenger\Envelope;useSymfony\Component\Messenger\Stamp\SerializerStamp;$bus->dispatch(    (newEnvelope($message))->with(newSerializerStamp([// groups are applied to the whole message, so make sure// to define the group for every embedded object'groups' => ['my_serialization_groups'],    ])));

Here are some important envelope stamps that are shipped with the Symfony Messenger:

Note

TheErrorDetailsStamp stampcontains aFlattenException,which is a representation of the exception that made the message fail. You canget this exception with thegetFlattenException()method. This exception is normalized thanks to theFlattenExceptionNormalizerwhich helps error reporting in the Messenger context.

Instead of dealing directly with the messages in the middleware you receive the envelope.Hence you can inspect the envelope content and its stamps, or add any:

12345678910111213141516171819202122
useApp\Message\Stamp\AnotherStamp;useSymfony\Component\Messenger\Envelope;useSymfony\Component\Messenger\Middleware\MiddlewareInterface;useSymfony\Component\Messenger\Middleware\StackInterface;useSymfony\Component\Messenger\Stamp\ReceivedStamp;classMyOwnMiddlewareimplementsMiddlewareInterface{publicfunctionhandle(Envelope$envelope, StackInterface$stack):Envelope{if (null !==$envelope->last(ReceivedStamp::class)) {// Message just has been received...// You could for example add another stamp.$envelope =$envelope->with(newAnotherStamp(/* ... */));        }else {// Message was just originally dispatched        }return$stack->next()->handle($envelope,$stack);    }}

The above example will forward the message to the next middleware with anadditional stampif the message has just been received (i.e. has at least oneReceivedStamp stamp). You can create your own stamps by implementingStampInterface.

If you want to examine all stamps on an envelope, use the$envelope->all()method, which returns all stamps grouped by type (FQCN). Alternatively, you caniterate through all stamps of a specific type by using the FQCN as firstparameter of this method (e.g.$envelope->all(ReceivedStamp::class)).

Note

Any stamp must be serializable using the Symfony Serializer componentif going through transport using theSerializerbase serializer.

Transports

In order to send and receive messages, you will have to configure a transport. Atransport will be responsible for communicating with your message broker or 3rd parties.

Your own Sender

Imagine that you already have anImportantAction message going through themessage bus and being handled by a handler. Now, you also want to send thismessage as an email (using theMime andMailer components).

Using theSenderInterface,you can create your own message sender:

12345678910111213141516171819202122232425262728293031323334
namespaceApp\MessageSender;useApp\Message\ImportantAction;useSymfony\Component\Mailer\MailerInterface;useSymfony\Component\Messenger\Envelope;useSymfony\Component\Messenger\Transport\Sender\SenderInterface;useSymfony\Component\Mime\Email;classImportantActionToEmailSenderimplementsSenderInterface{publicfunction__construct(private MailerInterface$mailer,privatestring$toEmail,    ){    }publicfunctionsend(Envelope$envelope):Envelope{$message =$envelope->getMessage();if (!$messageinstanceof ImportantAction) {thrownew \InvalidArgumentException(sprintf('This transport only supports "%s" messages.', ImportantAction::class));        }$this->mailer->send(            (newEmail())                ->to($this->toEmail)                ->subject('Important action made')                ->html('<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>')        );return$envelope;    }}

Your own Receiver

A receiver is responsible for getting messages from a source and dispatchingthem to the application.

Imagine you already processed some "orders" in your application using aNewOrder message. Now you want to integrate with a 3rd party or a legacyapplication but you can't use an API and need to use a shared CSV file with neworders.

You will read this CSV file and dispatch aNewOrder message. All you need todo is to write your own CSV receiver:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
namespaceApp\MessageReceiver;useApp\Message\NewOrder;useSymfony\Component\Messenger\Envelope;useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;useSymfony\Component\Serializer\SerializerInterface;classNewOrdersFromCsvFileReceiverimplementsReceiverInterface{private$connection;publicfunction__construct(private SerializerInterface$serializer,privatestring$filePath,    ){// Available connection bundled with the Messenger component// can be found in "Symfony\Component\Messenger\Bridge\*\Transport\Connection".$this->connection =/* create your connection */;    }publicfunctionget():iterable{// Receive the envelope according to your transport ($yourEnvelope here),// in most cases, using a connection is the easiest solution.$yourEnvelope =$this->connection->get();if (null ===$yourEnvelope) {return [];        }try {$envelope =$this->serializer->decode(['body' =>$yourEnvelope['body'],'headers' =>$yourEnvelope['headers'],            ]);        }catch (MessageDecodingFailedException$exception) {$this->connection->reject($yourEnvelope['id']);throw$exception;        }return [$envelope->with(newCustomStamp($yourEnvelope['id']))];    }publicfunctionack(Envelope$envelope):void{// Add information about the handled message    }publicfunctionreject(Envelope$envelope):void{// In the case of a custom connection$id =/* get the message id thanks to information or stamps present in the envelope */;$this->connection->reject($id);    }}

Receiver and Sender on the same Bus

To allow sending and receiving messages on the same bus and prevent an infiniteloop, the message bus will add aReceivedStampstamp to the message envelopes and theSendMessageMiddlewaremiddleware will know it should not route these messages again to a transport.

This work, including the code samples, is licensed under aCreative Commons BY-SA 3.0 license.
TOC
    Version

    Symfony 7.3backers


    [8]ページ先頭

    ©2009-2025 Movatter.jp