Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbe723f3

Browse files
hgracaqshurick
andcommitted
Add batch delivery to Messenger Doctrine transport
Currently, the doctrine transport only delivers one message at a time,resulting in low performance and a very high amount of hits to the DB.(one hit per message)With batch delivery, the transport will retrieve several messagesin a single query.Co-authored-by: Alexander Malyk <shu.rick.ifmo@gmail.com>
1 parentc86f390 commitbe723f3

File tree

10 files changed

+253
-60
lines changed

10 files changed

+253
-60
lines changed

‎src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
6.4
5+
---
6+
7+
* Add support to pull a batch of messages in one go (`batch_size` option)
8+
49
5.1.0
510
-----
611

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

Lines changed: 104 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
useDoctrine\DBAL\Platforms\MySQL57Platform;
2020
useDoctrine\DBAL\Platforms\MySQLPlatform;
2121
useDoctrine\DBAL\Platforms\OraclePlatform;
22+
useDoctrine\DBAL\Platforms\PostgreSQLPlatform;
2223
useDoctrine\DBAL\Platforms\SQLServer2012Platform;
2324
useDoctrine\DBAL\Platforms\SQLServerPlatform;
25+
useDoctrine\DBAL\Query\Expression\ExpressionBuilder;
2426
useDoctrine\DBAL\Query\QueryBuilder;
2527
useDoctrine\DBAL\Result;
2628
useDoctrine\DBAL\Schema\AbstractSchemaManager;
@@ -38,11 +40,19 @@ public function testGetAMessageWillChangeItsStatus()
3840
{
3941
$queryBuilder =$this->getQueryBuilderMock();
4042
$driverConnection =$this->getDBALConnectionMock();
41-
$stmt =$this->getResultMock([
42-
'id' =>1,
43-
'body' =>'{"message":"Hi"}',
44-
'headers' =>json_encode(['type' => DummyMessage::class]),
45-
]);
43+
$resultMock = [
44+
[
45+
'id' =>1,
46+
'body' =>'{"message":"Hi"}',
47+
'headers' =>json_encode(['type' => DummyMessage::class]),
48+
],
49+
[
50+
'id' =>2,
51+
'body' =>'{"message":"There"}',
52+
'headers' =>json_encode(['type' => DummyMessage::class]),
53+
],
54+
];
55+
$stmt =$this->getResultMockForGet($resultMock);
4656

4757
$driverConnection
4858
->method('createQueryBuilder')
@@ -64,17 +74,19 @@ public function testGetAMessageWillChangeItsStatus()
6474
->willReturn(1);
6575

6676
$connection =newConnection([],$driverConnection);
67-
$doctrineEnvelope =$connection->get();
68-
$this->assertEquals(1,$doctrineEnvelope['id']);
69-
$this->assertEquals('{"message":"Hi"}',$doctrineEnvelope['body']);
70-
$this->assertEquals(['type' => DummyMessage::class],$doctrineEnvelope['headers']);
77+
$doctrineEnvelopeList =$connection->get();
78+
foreach ($doctrineEnvelopeListas$key =>$doctrineEnvelope) {
79+
$this->assertEquals($resultMock[$key]['id'],$doctrineEnvelope['id']);
80+
$this->assertEquals($resultMock[$key]['body'],$doctrineEnvelope['body']);
81+
$this->assertEquals(json_decode($resultMock[$key]['headers'],true),$doctrineEnvelope['headers']);
82+
}
7183
}
7284

73-
publicfunctiontestGetWithNoPendingMessageWillReturnNull()
85+
publicfunctiontestGetWithNoPendingMessageWillReturnEmptyList()
7486
{
7587
$queryBuilder =$this->getQueryBuilderMock();
7688
$driverConnection =$this->getDBALConnectionMock();
77-
$stmt =$this->getResultMock(false);
89+
$stmt =$this->getResultMockForGet([]);
7890

7991
$queryBuilder
8092
->method('getParameters')
@@ -95,8 +107,9 @@ public function testGetWithNoPendingMessageWillReturnNull()
95107
->willReturn($stmt);
96108

97109
$connection =newConnection([],$driverConnection);
98-
$doctrineEnvelope =$connection->get();
99-
$this->assertNull($doctrineEnvelope);
110+
$doctrineEnvelopeList =$connection->get();
111+
$this->assertIsArray($doctrineEnvelopeList);
112+
$this->assertEmpty($doctrineEnvelopeList);
100113
}
101114

102115
publicfunctiontestItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
@@ -160,11 +173,27 @@ private function getQueryBuilderMock()
160173
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
161174
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
162175
$queryBuilder->method('setParameters')->willReturn($queryBuilder);
176+
$queryBuilder->method('addOrderBy')->willReturn($queryBuilder);
177+
178+
$expressionBuilderMock =$this->createMock(ExpressionBuilder::class);
179+
$expressionBuilderMock->method('in')->willReturn('');
180+
$queryBuilder->method('expr')->willReturn($expressionBuilderMock);
163181

164182
return$queryBuilder;
165183
}
166184

167-
privatefunctiongetResultMock($expectedResult)
185+
privatefunctiongetResultMockForGet($expectedResult)
186+
{
187+
$stmt =$this->createMock(class_exists(Result::class) ? Result::class : ResultStatement::class);
188+
189+
$stmt->expects($this->once())
190+
->method(class_exists(Result::class) ?'fetchAllAssociative' :'fetchAll')
191+
->willReturn($expectedResult);
192+
193+
return$stmt;
194+
}
195+
196+
privatefunctiongetResultMockForFind($expectedResult)
168197
{
169198
$stmt =$this->createMock(class_exists(Result::class) ? Result::class : ResultStatement::class);
170199

@@ -264,14 +293,14 @@ public static function buildConfigurationProvider(): iterable
264293
publicfunctiontestItThrowsAnExceptionIfAnExtraOptionsInDefined()
265294
{
266295
$this->expectException(InvalidArgumentException::class);
267-
$this->expectExceptionMessage('Unknown option found: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
296+
$this->expectExceptionMessage('Unknown option found: [new_option]. Allowed options are [table_name, queue_name,batch_size,redeliver_timeout, auto_setup]');
268297
Connection::buildConfiguration('doctrine://default', ['new_option' =>'woops']);
269298
}
270299

271300
publicfunctiontestItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
272301
{
273302
$this->expectException(InvalidArgumentException::class);
274-
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
303+
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name,batch_size,redeliver_timeout, auto_setup]');
275304
Connection::buildConfiguration('doctrine://default?new_option=woops');
276305
}
277306

@@ -280,7 +309,7 @@ public function testFind()
280309
$queryBuilder =$this->getQueryBuilderMock();
281310
$driverConnection =$this->getDBALConnectionMock();
282311
$id =1;
283-
$stmt =$this->getResultMock([
312+
$stmt =$this->getResultMockForFind([
284313
'id' =>$id,
285314
'body' =>'{"message":"Hi"}',
286315
'headers' =>json_encode(['type' => DummyMessage::class]),
@@ -364,18 +393,18 @@ public function testFindAll()
364393
/**
365394
* @dataProvider providePlatformSql
366395
*/
367-
publicfunctiontestGeneratedSql(AbstractPlatform$platform,string$expectedSql)
396+
publicfunctiontestGeneratedSql(AbstractPlatform$platform,int$batchSize,string$expectedSql)
368397
{
369398
$driverConnection =$this->createMock(DBALConnection::class);
370399
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
371400
$driverConnection->method('createQueryBuilder')->willReturnCallback(fn () =>newQueryBuilder($driverConnection));
372401

373402
if (class_exists(Result::class)) {
374403
$result =$this->createMock(Result::class);
375-
$result->method('fetchAssociative')->willReturn(false);
404+
$result->method('fetchAllAssociative')->willReturn([]);
376405
}else {
377406
$result =$this->createMock(ResultStatement::class);
378-
$result->method('fetch')->willReturn(false);
407+
$result->method('fetchAll')->willReturn([]);
379408
}
380409

381410
$driverConnection->expects($this->once())->method('beginTransaction');
@@ -389,41 +418,90 @@ public function testGeneratedSql(AbstractPlatform $platform, string $expectedSql
389418
;
390419
$driverConnection->expects($this->once())->method('commit');
391420

392-
$connection =newConnection([],$driverConnection);
421+
$connection =newConnection(['batch_size' =>$batchSize],$driverConnection);
393422
$connection->get();
394423
}
395424

396425
publicstaticfunctionprovidePlatformSql():iterable
397426
{
398-
yield'MySQL' => [
427+
yield'MySQL|batch_size=1' => [
399428
class_exists(MySQLPlatform::class) ?newMySQLPlatform() :newMySQL57Platform(),
429+
1,
400430
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
401431
];
402432

433+
yield'MySQL|batch_size=50' => [
434+
class_exists(MySQLPlatform::class) ?newMySQLPlatform() :newMySQL57Platform(),
435+
50,
436+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
437+
];
438+
439+
if (class_exists(PostgreSQLPlatform::class)) {
440+
yield'Postgres|batch_size=1' => [
441+
newPostgreSQLPlatform(),
442+
1,
443+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
444+
];
445+
446+
yield'Postgres|batch_size=50' => [
447+
newPostgreSQLPlatform(),
448+
50,
449+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
450+
];
451+
}
452+
403453
if (class_exists(MariaDBPlatform::class)) {
404-
yield'MariaDB' => [
454+
yield'MariaDB|batch_size=1' => [
405455
newMariaDBPlatform(),
456+
1,
406457
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
407458
];
459+
yield'MariaDB|batch_size=50' => [
460+
newMariaDBPlatform(),
461+
50,
462+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
463+
];
408464
}
409465

410-
yield'SQL Server' => [
466+
yield'SQL Server|batch_size=1' => [
411467
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ?newSQLServerPlatform() :newSQLServer2012Platform(),
468+
1,
412469
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY',
413470
];
414471

472+
yield'SQL Server|batch_size=50' => [
473+
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ?newSQLServerPlatform() :newSQLServer2012Platform(),
474+
50,
475+
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 50 ROWS ONLY',
476+
];
477+
415478
if (!class_exists(MySQL57Platform::class)) {
416479
// DBAL >= 4
417-
yield'Oracle' => [
480+
481+
yield'Oracle|batch_size=1' => [
418482
newOraclePlatform(),
483+
1,
419484
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE',
420485
];
486+
487+
yield'Oracle|batch_size=50' => [
488+
newOraclePlatform(),
489+
50,
490+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 50 ROWS ONLY) FOR UPDATE',
491+
];
421492
}else {
422493
// DBAL < 4
423-
yield'Oracle' => [
494+
yield'Oracle|batch_size=1' => [
424495
newOraclePlatform(),
496+
1,
425497
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
426498
];
499+
500+
yield'Oracle|batch_size=50' => [
501+
newOraclePlatform(),
502+
50,
503+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 50) FOR UPDATE',
504+
];
427505
}
428506
}
429507

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineIntegrationTest.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ protected function tearDown(): void
4949
publicfunctiontestConnectionSendAndGet()
5050
{
5151
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
52-
$encoded =$this->connection->get();
53-
$this->assertEquals('{"message": "Hi"}',$encoded['body']);
54-
$this->assertEquals(['type' => DummyMessage::class],$encoded['headers']);
52+
$encodedList =$this->connection->get()[0];
53+
$this->assertEquals('{"message": "Hi"}',$encodedList['body']);
54+
$this->assertEquals(['type' => DummyMessage::class],$encodedList['headers']);
5555
}
5656

5757
publicfunctiontestSendWithDelay()
@@ -103,7 +103,7 @@ public function testItRetrieveTheFirstAvailableMessage()
103103
'available_at' =>$this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:30:00',new \DateTimeZone('UTC'))),
104104
]);
105105

106-
$encoded =$this->connection->get();
106+
$encoded =$this->connection->get()[0];
107107
$this->assertEquals('{"message": "Hi available"}',$encoded['body']);
108108
}
109109

@@ -168,18 +168,20 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
168168
'available_at' =>$this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:30:00',new \DateTimeZone('UTC'))),
169169
]);
170170

171-
$next =$this->connection->get();
171+
$next =$this->connection->get()[0];
172172
$this->assertEquals('{"message": "Hi requeued"}',$next['body']);
173173
$this->connection->reject($next['id']);
174174
}
175175

176176
publicfunctiontestTheTransportIsSetupOnGet()
177177
{
178178
$this->assertFalse($this->createSchemaManager()->tablesExist(['messenger_messages']));
179-
$this->assertNull($this->connection->get());
179+
$result =$this->connection->get();
180+
$this->assertIsArray($result);
181+
$this->assertEmpty($result);
180182

181183
$this->connection->send('the body', ['my' =>'header']);
182-
$envelope =$this->connection->get();
184+
$envelope =$this->connection->get()[0];
183185
$this->assertEquals('the body',$envelope['body']);
184186
}
185187

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public function testPostgreSqlConnectionSendAndGet()
5959
{
6060
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
6161

62-
$encoded =$this->connection->get();
62+
$encoded =$this->connection->get()[0];
6363
$this->assertEquals('{"message": "Hi"}',$encoded['body']);
6464
$this->assertEquals(['type' => DummyMessage::class],$encoded['headers']);
6565

66-
$this->assertNull($this->connection->get());
66+
$this->assertEmpty($this->connection->get());
6767
}
6868

6969
privatefunctioncreateSchemaManager():AbstractSchemaManager

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public function testItReturnsTheDecodedMessageToTheHandler()
3838

3939
$doctrineEnvelope =$this->createDoctrineEnvelope();
4040
$connection =$this->createMock(Connection::class);
41-
$connection->method('get')->willReturn($doctrineEnvelope);
41+
$connection->method('get')->willReturn([$doctrineEnvelope]);
4242

4343
$receiver =newDoctrineReceiver($connection,$serializer);
4444
$actualEnvelopes =$receiver->get();
@@ -66,7 +66,7 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
6666

6767
$doctrineEnvelop =$this->createDoctrineEnvelope();
6868
$connection =$this->createMock(Connection::class);
69-
$connection->method('get')->willReturn($doctrineEnvelop);
69+
$connection->method('get')->willReturn([$doctrineEnvelop]);
7070
$connection->expects($this->once())->method('reject');
7171

7272
$receiver =newDoctrineReceiver($connection,$serializer);

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function testReceivesMessages()
4646
];
4747

4848
$serializer->method('decode')->with(['body' =>'body','headers' => ['my' =>'header']])->willReturn(newEnvelope($decodedMessage));
49-
$connection->method('get')->willReturn($doctrineEnvelope);
49+
$connection->method('get')->willReturn([$doctrineEnvelope]);
5050

5151
$envelopes =$transport->get();
5252
$this->assertSame($decodedMessage,$envelopes[0]->getMessage());

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp