Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.7k
[Messenger] Add a Doctrine transport#29007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
ef46d12 to536ed49CompareUh oh!
There was an error while loading.Please reload this page.
Taluu left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Looks good, but overall missing typehints / return hints as it's a feature for 4.3+, so php 7.1+. I'm guessing the missing tests are because it's a wip :}
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
536ed49 to4fae182Comparesrc/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
f0b0993 tofc59186Comparefc59186 tof998853Comparesrc/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
d174c1b to4023eaaComparesrc/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
4023eaa to9014d1bComparecdcd762 to2a31b47Compare
sroze left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you add@experimental in 4.3 on all these classes please? Then, a few remaining comments. After this, it should be ready to go!
| 'body' => '{"message": "Hi handled"}', | ||
| 'headers' => json_encode(['type' => DummyMessage::class]), | ||
| 'queue' => 'default', | ||
| 'created_at' => '2019-03-15 12:00:00', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's do this then. It's going to beYYYY-MM-DDTHH:mm:ss.sssZ, with theDateTime as UTC. I suggest to use a function rather than having the->format(...) call copy/pasted.
src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
sroze commentedMar 31, 2019
@vincenttouzet mind the conflict with |
b2bccd7 tof4f4bf4Comparevincenttouzet commentedMar 31, 2019
@sroze I've rebase and squash my commits :) |
sroze commentedMar 31, 2019
@vincenttouzet but you kept the |
f4f4bf4 to3991088Comparevincenttouzet commentedMar 31, 2019
That was just the doc in the configuration (part of another commit). |
| returnnull; | ||
| } | ||
| $doctrineEnvelope['headers'] = Type::getType(Type::JSON)->convertToPHPValue($doctrineEnvelope['headers'],$this->driverConnection->getDatabasePlatform()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Type::JSON does not exists on early Doctrine versionsas thelow test suite is highlighting. Therefore, I suggest to move it back toSTRING (like thebody) and usejson_encode. Whoever wants to have the table with aJSON field can create it manually. (FYI@weaverryan)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes i saw that. I was trying to require a minimum version for Doctrine but the Json type is available since v2.6.0 (tagged 2017-07-23 01:02) ... I've put back the json_encode / json_decode with the string type then
3991088 to88d008cComparesroze commentedMar 31, 2019
Thank you@vincenttouzet. |
This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Add a Doctrine transport| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets || License | MIT| Doc PR |symfony/symfony-docs#10616| DoctrineBundle PR |doctrine/DoctrineBundle#868As discussed with@sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).# How it worksThe code is splitted betwwen this PR and the one on the DoctrineBundle :doctrine/DoctrineBundle#868## ConfigurationTo configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important"```## Table schemaDispatched messages are stored into a database table with the following schema:| Column | Type | Options | Description ||--------------|----------|--------------------------|-------------------------------------------------------------------|| id | bigint | AUTO_INCREMENT, NOT NULL | Primary key || body | text | NOT NULL | Body of the message || headers | text | NOT NULL | Headers of the message || queue | varchar(32) | NOT NULL | Headers of the message || created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) || available_at | datetime | NOT NULL | When the message is available to be handled || delivered_at | datetime | NULL | When the message was delivered to a worker |## Message dispatchingWhen dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`## Message consumingThe message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.### Getting the next message* Start a transaction* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)* Update the message in database to update the delivered_at columns* Commit the transaction### Handling the messageThe retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.## Message requeueingIt may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)# TODO- [x] Add tests- [x] Create DOC PR- [x] PR on doctrine-bundle for transport factory- [x] Add a `available_at` column- [x] Add a `queue` column- [x] Implement the retry functionnality : See#30557- [x] Rebase after#29476Commits-------88d008c [Messenger] Add a Doctrine transport
vincenttouzet commentedMar 31, 2019
@sroze while updating the Doc PR I realized that I've keep the |
sroze commentedMar 31, 2019
@vincenttouzet sure. |
vincenttouzet commentedMar 31, 2019 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
@sroze Hum there is also a problem with the DoctrineTransportFactory: Fatal error: Declaration of Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory::createTransport(string $dsn, array $options): Symfony\Component\Messenger\Transport\TransportInterface must be compatible with Symfony\Component\Messenger\Transport\TransportFactoryInterface::createTransport(string $dsn, array $options, Symfony\Component\Messenger\Transport\Serialization\SerializerInterface $serializer): Symfony\Component\Messenger\Transport\TransportInterface in /var/www/sources/symfony/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php on line 26 it worked before merging 🤔 . I can fix this in the same MR if you want |
sroze commentedMar 31, 2019
I fixed it in#30799. |
…terface (sroze)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Fix the Doctrine transport to use the new interface| Q | A| ------------- | ---| Branch? | master| Bug fix? | yes| New feature? | no| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets |#29007| License | MIT| Doc PR | øCommits-------75e3355 Fix the Doctrine transport to use the new interface
vincenttouzet commentedMar 31, 2019
…uzet)This PR was submitted for the master branch but it was merged into the 4.3 branch instead (closes#10616).Discussion----------[Messenger] Describe the doctrine transportAdd documentation relative to the PRsymfony/symfony#29007TODO- [ ] Document the max granularity in seconds for delay (doctrine/dbal#2873)Commits-------27c0f9d [Messenger] Describe the doctrine tranport
Uh oh!
There was an error while loading.Please reload this page.
As discussed with@sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.
Actually
AMQPis the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).How it works
The code is splitted betwwen this PR and the one on the DoctrineBundle :doctrine/DoctrineBundle#868
Configuration
To configure a Doctrine transport the dsn MUST have the format
doctrine://<entity_manager_name>where<entity_manager_name>is the name of the entity manager (usuallydefault)Table schema
Dispatched messages are stored into a database table with the following schema:
Message dispatching
When dispatching a message a new row is inserted into the table. See
Symfony\Component\Messenger\Transport\Doctrine::publishMessage consuming
The message is retrieved by the
Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver. It calls theSymfony\Component\Messenger\Transport\Doctrine::getmethod to get the next message to handle.Getting the next message
SELECT ... FOR UPDATEquery)Handling the message
The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the
Symfony\Component\Messenger\Transport\Doctrine::ackwhich delete the message from the table.If an error occured the receiver call the
Symfony\Component\Messenger\Transport\Doctrine::nackmethod which update the message to set the delivered_at column tonull.Message requeueing
It may happen that a message is stuck in
deliveredstate but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages theDoctrineReceivercall theSymfony\Component\Messenger\Transport\Doctrine::requeueMessages. This method update all the message with adelivered_atnot null since more than the "redeliver timeout" (default to 3600 seconds)TODO
available_atcolumnqueuecolumn