You signed in with another tab or window.Reload to refresh your session.You signed out in another tab or window.Reload to refresh your session.You switched accounts on another tab or window.Reload to refresh your session.Dismiss alert
Add batch delivery to Messenger Doctrine transport
Currently, the doctrine transport only delivers one message at a time,resulting in low performance and a very high amount of hits to the DB.(one hit per message)With batch delivery, the transport will retrieve several messagesin a single query.Co-authored-by: Alexander Malyk <shu.rick.ifmo@gmail.com>
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
303
+
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name,batch_size,redeliver_timeout, auto_setup]');
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
437
+
];
438
+
439
+
if (class_exists(PostgreSQLPlatform::class)) {
440
+
yield'Postgres|batch_size=1' => [
441
+
newPostgreSQLPlatform(),
442
+
1,
443
+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
444
+
];
445
+
446
+
yield'Postgres|batch_size=50' => [
447
+
newPostgreSQLPlatform(),
448
+
50,
449
+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
450
+
];
451
+
}
452
+
403
453
if (class_exists(MariaDBPlatform::class)) {
404
-
yield'MariaDB' => [
454
+
yield'MariaDB|batch_size=1' => [
405
455
newMariaDBPlatform(),
456
+
1,
406
457
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
407
458
];
459
+
yield'MariaDB|batch_size=50' => [
460
+
newMariaDBPlatform(),
461
+
50,
462
+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY',
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 50 ROWS ONLY',
476
+
];
477
+
415
478
if (!class_exists(MySQL57Platform::class)) {
416
479
// DBAL >= 4
417
-
yield'Oracle' => [
480
+
481
+
yield'Oracle|batch_size=1' => [
418
482
newOraclePlatform(),
483
+
1,
419
484
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE',
420
485
];
486
+
487
+
yield'Oracle|batch_size=50' => [
488
+
newOraclePlatform(),
489
+
50,
490
+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 50 ROWS ONLY) FOR UPDATE',
491
+
];
421
492
}else {
422
493
// DBAL < 4
423
-
yield'Oracle' => [
494
+
yield'Oracle|batch_size=1' => [
424
495
newOraclePlatform(),
496
+
1,
425
497
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
426
498
];
499
+
500
+
yield'Oracle|batch_size=50' => [
501
+
newOraclePlatform(),
502
+
50,
503
+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 50) FOR UPDATE',