Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit574097f

Browse files
committed
feature#30757 [Messenger] Adding MessageCountAwareInterface to get transport message count (weaverryan)
This PR was squashed before being merged into the 4.3-dev branch (closes#30757).Discussion----------[Messenger] Adding MessageCountAwareInterface to get transport message count| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets | none| License | MIT| Doc PR |symfony/symfony-docs#11236This adds a new optional interface that receivers should implement to give an approximate number of the messages "waiting" to be handled. Why? Because, with this, you could design a system that dynamically adds/removes worker processes if a specific transport is getting slammed and needs help. Creating that system could be something we discuss for core later, but this at least makes it possible - and means it could be implemented by the user or in a bundle... which I might do if we don't get it in core ;).Commits-------fc5b0cf [Messenger] Adding MessageCountAwareInterface to get transport message count
2 parents9ed2f2b +fc5b0cf commit574097f

File tree

10 files changed

+164
-16
lines changed

10 files changed

+164
-16
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* Added optional`MessageCountAwareInterface` that receivers can implement
8+
to give information about how many messages are waiting to be processed.
79
*[BC BREAK] The`Envelope::__construct()` signature changed:
810
you can no longer pass an unlimited number of stamps as the second,
911
third, fourth, arguments etc: stamps are now an array passed to the

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php‎

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,24 @@ public function testItReceivesSignals()
167167
,$process->getOutput());
168168
}
169169

170+
publicfunctiontestItCountsMessagesInQueue()
171+
{
172+
$serializer =$this->createSerializer();
173+
174+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
175+
$connection->setup();
176+
$connection->queue()->purge();
177+
178+
$sender =newAmqpSender($connection,$serializer);
179+
180+
$sender->send($first =newEnvelope(newDummyMessage('First')));
181+
$sender->send($second =newEnvelope(newDummyMessage('Second')));
182+
$sender->send($second =newEnvelope(newDummyMessage('Third')));
183+
184+
sleep(1);// give amqp a moment to have the messages ready
185+
$this->assertSame(3,$connection->countMessagesInQueue());
186+
}
187+
170188
privatefunctionwaitForOutput(Process$process,string$output,$timeoutInSeconds =10)
171189
{
172190
$timedOutTime =time() +$timeoutInSeconds;

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ public function testGetAMessageWillChangeItsStatus()
3737
$queryBuilder
3838
->method('getSQL')
3939
->willReturn('');
40+
$queryBuilder
41+
->method('getParameters')
42+
->willReturn([]);
4043
$driverConnection
4144
->method('prepare')
4245
->willReturn($stmt);
@@ -54,6 +57,9 @@ public function testGetWithNoPendingMessageWillReturnNull()
5457
$driverConnection =$this->getDBALConnectionMock();
5558
$stmt =$this->getStatementMock(false);
5659

60+
$queryBuilder
61+
->method('getParameters')
62+
->willReturn([]);
5763
$driverConnection->expects($this->once())
5864
->method('createQueryBuilder')
5965
->willReturn($queryBuilder);
@@ -119,6 +125,7 @@ private function getQueryBuilderMock()
119125
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
120126
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
121127
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
128+
$queryBuilder->method('setParameters')->willReturn($queryBuilder);
122129

123130
return$queryBuilder;
124131
}

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php‎

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,46 @@ public function testItRetrieveTheFirstAvailableMessage()
100100
$this->assertEquals('{"message": "Hi available"}',$encoded['body']);
101101
}
102102

103+
publicfunctiontestItCountMessages()
104+
{
105+
// insert messages
106+
// one currently handled
107+
$this->driverConnection->insert('messenger_messages', [
108+
'body' =>'{"message": "Hi handled"}',
109+
'headers' =>json_encode(['type' => DummyMessage::class]),
110+
'queue_name' =>'default',
111+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
112+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
113+
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u',microtime(true))),
114+
]);
115+
// one available later
116+
$this->driverConnection->insert('messenger_messages', [
117+
'body' =>'{"message": "Hi delayed"}',
118+
'headers' =>json_encode(['type' => DummyMessage::class]),
119+
'queue_name' =>'default',
120+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
121+
'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')),
122+
]);
123+
// one available
124+
$this->driverConnection->insert('messenger_messages', [
125+
'body' =>'{"message": "Hi available"}',
126+
'headers' =>json_encode(['type' => DummyMessage::class]),
127+
'queue_name' =>'default',
128+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
129+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
130+
]);
131+
// another available
132+
$this->driverConnection->insert('messenger_messages', [
133+
'body' =>'{"message": "Hi available"}',
134+
'headers' =>json_encode(['type' => DummyMessage::class]),
135+
'queue_name' =>'default',
136+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
137+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
138+
]);
139+
140+
$this->assertSame(2,$this->connection->getMessageCount());
141+
}
142+
103143
publicfunctiontestItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
104144
{
105145
$twoHoursAgo =new \DateTime('now');

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
useSymfony\Component\Messenger\Exception\LogicException;
1616
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
useSymfony\Component\Messenger\Exception\TransportException;
18+
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1819
useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1920
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2021
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -26,7 +27,7 @@
2627
*
2728
* @experimental in 4.2
2829
*/
29-
class AmqpReceiverimplements ReceiverInterface
30+
class AmqpReceiverimplements ReceiverInterface, MessageCountAwareInterface
3031
{
3132
private$serializer;
3233
private$connection;
@@ -87,6 +88,14 @@ public function reject(Envelope $envelope): void
8788
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
8889
}
8990

91+
/**
92+
* {@inheritdoc}
93+
*/
94+
publicfunctiongetMessageCount():int
95+
{
96+
return$this->connection->countMessagesInQueue();
97+
}
98+
9099
privatefunctionrejectAmqpEnvelope(\AMQPEnvelope$amqpEnvelope):void
91100
{
92101
try {

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespaceSymfony\Component\Messenger\Transport\AmqpExt;
1313

1414
useSymfony\Component\Messenger\Envelope;
15+
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1516
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1617
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1718
useSymfony\Component\Messenger\Transport\SetupableTransportInterface;
@@ -22,7 +23,7 @@
2223
*
2324
* @experimental in 4.2
2425
*/
25-
class AmqpTransportimplements TransportInterface, SetupableTransportInterface
26+
class AmqpTransportimplements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2627
{
2728
private$serializer;
2829
private$connection;
@@ -75,6 +76,14 @@ public function setup(): void
7576
$this->connection->setup();
7677
}
7778

79+
/**
80+
* {@inheritdoc}
81+
*/
82+
publicfunctiongetMessageCount():int
83+
{
84+
return ($this->receiver ??$this->getReceiver())->getMessageCount();
85+
}
86+
7887
privatefunctiongetReceiver()
7988
{
8089
return$this->receiver =newAmqpReceiver($this->connection,$this->serializer);

‎src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,14 @@ public function publish(string $body, array $headers = [], int $delay = 0): void
184184
$this->exchange()->publish($body,$this->queueConfiguration['routing_key'] ??null,$flags,$attributes);
185185
}
186186

187+
/**
188+
* Returns an approximate count of the messages in a queue.
189+
*/
190+
publicfunctioncountMessagesInQueue():int
191+
{
192+
return$this->queue()->declareQueue();
193+
}
194+
187195
/**
188196
* @throws \AMQPException
189197
*/

‎src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php‎

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
useDoctrine\DBAL\ConnectionasDBALConnection;
1515
useDoctrine\DBAL\DBALException;
1616
useDoctrine\DBAL\Exception\TableNotFoundException;
17+
useDoctrine\DBAL\Query\QueryBuilder;
1718
useDoctrine\DBAL\Schema\Schema;
1819
useDoctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
1920
useDoctrine\DBAL\Types\Type;
@@ -128,25 +129,14 @@ public function get(): ?array
128129
{
129130
$this->driverConnection->beginTransaction();
130131
try {
131-
$query =$this->driverConnection->createQueryBuilder()
132-
->select('m.*')
133-
->from($this->configuration['table_name'],'m')
134-
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
135-
->andWhere('m.available_at <= :now')
136-
->andWhere('m.queue_name = :queue_name')
132+
$query =$this->createAvailableMessagesQueryBuilder()
137133
->orderBy('available_at','ASC')
138134
->setMaxResults(1);
139135

140-
$now = \DateTime::createFromFormat('U.u',microtime(true));
141-
$redeliverLimit = (clone$now)->modify(sprintf('-%d seconds',$this->configuration['redeliver_timeout']));
142136
// use SELECT ... FOR UPDATE to lock table
143137
$doctrineEnvelope =$this->executeQuery(
144138
$query->getSQL().''.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
145-
[
146-
':now' =>self::formatDateTime($now),
147-
':queue_name' =>$this->configuration['queue_name'],
148-
':redeliver_limit' =>self::formatDateTime($redeliverLimit),
149-
]
139+
$query->getParameters()
150140
)->fetch();
151141

152142
if (false ===$doctrineEnvelope) {
@@ -161,6 +151,7 @@ public function get(): ?array
161151
->update($this->configuration['table_name'])
162152
->set('delivered_at',':delivered_at')
163153
->where('id = :id');
154+
$now = \DateTime::createFromFormat('U.u',microtime(true));
164155
$this->executeQuery($queryBuilder->getSQL(), [
165156
':id' =>$doctrineEnvelope['id'],
166157
':delivered_at' =>self::formatDateTime($now),
@@ -200,6 +191,33 @@ public function setup(): void
200191
$synchronizer->updateSchema($this->getSchema(),true);
201192
}
202193

194+
publicfunctiongetMessageCount():int
195+
{
196+
$queryBuilder =$this->createAvailableMessagesQueryBuilder()
197+
->select('COUNT(m.id) as message_count')
198+
->setMaxResults(1);
199+
200+
return$this->executeQuery($queryBuilder->getSQL(),$queryBuilder->getParameters())->fetchColumn();
201+
}
202+
203+
privatefunctioncreateAvailableMessagesQueryBuilder():QueryBuilder
204+
{
205+
$now = \DateTime::createFromFormat('U.u',microtime(true));
206+
$redeliverLimit = (clone$now)->modify(sprintf('-%d seconds',$this->configuration['redeliver_timeout']));
207+
208+
return$this->driverConnection->createQueryBuilder()
209+
->select('m.*')
210+
->from($this->configuration['table_name'],'m')
211+
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
212+
->andWhere('m.available_at <= :now')
213+
->andWhere('m.queue_name = :queue_name')
214+
->setParameters([
215+
':now' =>self::formatDateTime($now),
216+
':queue_name' =>$this->configuration['queue_name'],
217+
':redeliver_limit' =>self::formatDateTime($redeliverLimit),
218+
]);
219+
}
220+
203221
privatefunctionexecuteQuery(string$sql,array$parameters = [])
204222
{
205223
$stmt =null;

‎src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
useSymfony\Component\Messenger\Exception\LogicException;
1717
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
1818
useSymfony\Component\Messenger\Exception\TransportException;
19+
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2021
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -25,7 +26,7 @@
2526
*
2627
* @experimental in 4.3
2728
*/
28-
class DoctrineReceiverimplements ReceiverInterface
29+
class DoctrineReceiverimplements ReceiverInterface, MessageCountAwareInterface
2930
{
3031
private$connection;
3132
private$serializer;
@@ -81,6 +82,14 @@ public function reject(Envelope $envelope): void
8182
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
8283
}
8384

85+
/**
86+
* {@inheritdoc}
87+
*/
88+
publicfunctiongetMessageCount():int
89+
{
90+
return$this->connection->getMessageCount();
91+
}
92+
8493
privatefunctionfindDoctrineReceivedStamp(Envelope$envelope):DoctrineReceivedStamp
8594
{
8695
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Transport\Receiver;
13+
14+
/**
15+
* @author Samuel Roze <samuel.roze@gmail.com>
16+
* @author Ryan Weaver <ryan@symfonycasts.com>
17+
*
18+
* @experimental in 4.3
19+
*/
20+
interface MessageCountAwareInterface
21+
{
22+
/**
23+
* Returns the number of messages waiting to be handled.
24+
*
25+
* In some systems, this may be an approximate number.
26+
*/
27+
publicfunctiongetMessageCount():int;
28+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp