Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit89bae9c

Browse files
committed
feature#59862 [Messenger] Allow to close the transport connection (andrew-demb)
This PR was squashed before being merged into the 7.3 branch.Discussion----------[Messenger] Allow to close the transport connection| Q | A| ------------- | ---| Branch? | 7.3| Bug fix? | no| New feature? | yes| Deprecations? | no| Issues |Fix#53543| License | MIT<!--Replace this notice by a description of your feature/bugfix.This will help reviewers and should be a good start for the documentation.Additionally (seehttps://symfony.com/releases): - Always add tests and ensure they pass. - Bug fixes must be submitted against the lowest maintained branch where they apply (lowest branches are regularly merged to upper ones so they get the fixes too). - Features and deprecations must be submitted against the latest branch. - For new features, provide some code snippets to help understand usage. - Changelog entry should followhttps://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry - Never break backward compatibility (seehttps://symfony.com/bc).-->~~1. Implemented the possibility to make messenger transports resettable~~~~2. Implemented reset Redis connection for Redis messenger transport~~~~This feature may lead to decreased performance for messenger consumers (because connection will be resetted between processing messages).~~~~One way to resolve it that I found - add configuration for resettable transports - aka "reset connection on kernel reset: true/false". For consumers it can be configured via env var to be "false", but for web "true".~~----**UPD 2025-02-27**: According to the feedback, I changed the implementation to another one to help with our use case.Implemented a way to close messenger transport from the application, to allow free resources for long-running processes (like a long-running webserver)Commits-------9788aee [Messenger] Allow to close the transport connection
2 parents6477041 +9788aee commit89bae9c

File tree

13 files changed

+57
-12
lines changed

13 files changed

+57
-12
lines changed

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Implement the`CloseableTransportInterface` to allow closing the transport
78
* Add new`queue_attributes` and`queue_tags` options for SQS queue creation
89

910
7.2

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
useAsyncAws\Core\Exception\Http\HttpException;
1515
useSymfony\Component\Messenger\Envelope;
1616
useSymfony\Component\Messenger\Exception\TransportException;
17+
useSymfony\Component\Messenger\Transport\CloseableTransportInterface;
1718
useSymfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -27,7 +28,7 @@
2728
/**
2829
* @author Jérémy Derussé <jeremy@derusse.com>
2930
*/
30-
class AmazonSqsTransportimplements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
31+
class AmazonSqsTransportimplements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface,CloseableTransportInterface,MessageCountAwareInterface, ResetInterface
3132
{
3233
privateSerializerInterface$serializer;
3334

@@ -91,6 +92,11 @@ public function reset(): void
9192
}
9293
}
9394

95+
publicfunctionclose():void
96+
{
97+
$this->reset();
98+
}
99+
94100
privatefunctiongetReceiver():MessageCountAwareInterface&ReceiverInterface
95101
{
96102
return$this->receiver ??=newAmazonSqsReceiver($this->connection,$this->serializer);

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"php":">=8.2",
2020
"async-aws/core":"^1.7",
2121
"async-aws/sqs":"^1.0|^2.0",
22-
"symfony/messenger":"^7.2",
22+
"symfony/messenger":"^7.3",
2323
"symfony/service-contracts":"^2.5|^3",
2424
"psr/log":"^1|^2|^3"
2525
},

‎src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.1
55
---
66

7+
* Implement the`CloseableTransportInterface` to allow closing the AMQP connection
78
* Add option`delay[arguments]` in the transport definition
89

910
6.0

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespaceSymfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
useSymfony\Component\Messenger\Envelope;
15+
useSymfony\Component\Messenger\Transport\CloseableTransportInterface;
1516
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
useSymfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1718
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -22,7 +23,7 @@
2223
/**
2324
* @author Nicolas Grekas <p@tchwork.com>
2425
*/
25-
class AmqpTransportimplements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
26+
class AmqpTransportimplements QueueReceiverInterface, TransportInterface, SetupableTransportInterface,CloseableTransportInterface,MessageCountAwareInterface
2627
{
2728
privateSerializerInterface$serializer;
2829
privateAmqpReceiver$receiver;
@@ -70,6 +71,11 @@ public function getMessageCount(): int
7071
return$this->getReceiver()->getMessageCount();
7172
}
7273

74+
publicfunctionclose():void
75+
{
76+
$this->connection->clear();
77+
}
78+
7379
privatefunctiongetReceiver():AmqpReceiver
7480
{
7581
return$this->receiver ??=newAmqpReceiver($this->connection,$this->serializer);

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ private function clearWhenDisconnected(): void
551551
}
552552
}
553553

554-
privatefunctionclear():void
554+
publicfunctionclear():void
555555
{
556556
unset($this->amqpChannel,$this->amqpExchange,$this->amqpDelayExchange);
557557
$this->amqpQueues = [];

‎src/Symfony/Component/Messenger/Bridge/Amqp/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php":">=8.2",
2020
"ext-amqp":"*",
21-
"symfony/messenger":"^6.4|^7.0"
21+
"symfony/messenger":"^7.3"
2222
},
2323
"require-dev": {
2424
"symfony/event-dispatcher":"^6.4|^7.0",

‎src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Implement the`CloseableTransportInterface` to allow closing the Redis connection
78
* Implement the`KeepaliveReceiverInterface` to enable asynchronously notifying Redis that the job is still being processed, in order to avoid timeouts
89

910
6.3

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class Connection
5555
'ssl' =>null,// see https://php.net/context.ssl
5656
];
5757

58-
private\Redis|Relay|\RedisCluster|\Closure$redis;
58+
private\Redis|Relay|\RedisCluster|null$redis =null;
59+
private\Closure$redisInitializer;
5960
privatestring$stream;
6061
privatestring$queue;
6162
privatestring$group;
@@ -112,9 +113,9 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster|null $red
112113

113114
if ((\is_array($host) &&null ===$sentinelMaster) ||$redisinstanceof \RedisCluster) {
114115
$hosts =\is_string($host) ? [$host.':'.$port] :$host;// Always ensure we have an array
115-
$this->redis =staticfn () =>self::initializeRedisCluster($redis,$hosts,$auth,$options);
116+
$this->redisInitializer =staticfn () =>self::initializeRedisCluster($redis,$hosts,$auth,$options);
116117
}else {
117-
$this->redis =staticfunction ()use ($redis,$sentinelMaster,$host,$port,$options,$auth) {
118+
$this->redisInitializer =staticfunction ()use ($redis,$sentinelMaster,$host,$port,$options,$auth) {
118119
if (null !==$sentinelMaster) {
119120
$sentinelClass =\extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
120121
$hostIndex =0;
@@ -737,10 +738,15 @@ private function rawCommand(string $command, ...$arguments): mixed
737738

738739
privatefunctiongetRedis():\Redis|Relay|\RedisCluster
739740
{
740-
if ($this->redisinstanceof \Closure) {
741-
$this->redis = ($this->redis)();
741+
if (!$this->redis) {
742+
$this->redis = ($this->redisInitializer)();
742743
}
743744

744745
return$this->redis;
745746
}
747+
748+
publicfunctionclose():void
749+
{
750+
$this->redis =null;
751+
}
746752
}

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespaceSymfony\Component\Messenger\Bridge\Redis\Transport;
1313

1414
useSymfony\Component\Messenger\Envelope;
15+
useSymfony\Component\Messenger\Transport\CloseableTransportInterface;
1516
useSymfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1617
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1718
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -23,7 +24,7 @@
2324
* @author Alexander Schranz <alexander@sulu.io>
2425
* @author Antoine Bluchet <soyuka@gmail.com>
2526
*/
26-
class RedisTransportimplements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface
27+
class RedisTransportimplements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface,CloseableTransportInterface,MessageCountAwareInterface
2728
{
2829
privateSerializerInterface$serializer;
2930
privateRedisReceiver$receiver;
@@ -71,6 +72,11 @@ public function getMessageCount(): int
7172
return$this->getReceiver()->getMessageCount();
7273
}
7374

75+
publicfunctionclose():void
76+
{
77+
$this->connection->close();
78+
}
79+
7480
privatefunctiongetReceiver():RedisReceiver
7581
{
7682
return$this->receiver ??=newRedisReceiver($this->connection,$this->serializer);

‎src/Symfony/Component/Messenger/Bridge/Redis/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php":">=8.2",
2020
"ext-redis":"*",
21-
"symfony/messenger":"^7.2"
21+
"symfony/messenger":"^7.3"
2222
},
2323
"require-dev": {
2424
"symfony/property-access":"^6.4|^7.0",

‎src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Add`CloseableTransportInterface` to allow closing the transport
78
* Add`SentForRetryStamp` that identifies whether a failed message was sent for retry
89
* Add`Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and`Symfony\Component\Messenger\Stamp\DeduplicateStamp`
910

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Transport;
13+
14+
interface CloseableTransportInterface
15+
{
16+
publicfunctionclose():void;
17+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp