Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit99881e6

Browse files
feature#60018 [Messenger] Reset peak memory usage for each message (TimWolla)
This PR was squashed before being merged into the 7.3 branch.Discussion----------[Messenger] Reset peak memory usage for each message| Q | A| ------------- | ---| Branch? | 7.3| Bug fix? | no| New feature? | yes| Deprecations? | no| Issues | -| License | MITPHP’s peak memory usage is a bit of global state that is not useful to keep in a long-running process handling individual self-contained messages, since a single high-memory message (handler) running early in a worker’s lifecycle will skew the numbers for all remaining messages processed by that worker.By resetting the peak memory usage for each message it becomes possible to measure a given message type’s maximum memory usage more accurately, allowing to optimize hardware resources, for example by placing individual messages with handlers requiring a high memory-usage into their own transport that is executed on a larger worker instance.As part of this change the cycle collection is also moved out of the Worker into an event-listener, since the cycle collection is not a core task of the Worker, since cycle collection would happen implicitly as well.Commits-------896ab90 [Messenger] Reset peak memory usage for each message
2 parentsa8d6a36 +896ab90 commit99881e6

File tree

5 files changed

+107
-5
lines changed

5 files changed

+107
-5
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
useSymfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
useSymfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
useSymfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
useSymfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
2122
useSymfony\Component\Messenger\EventListener\ResetServicesListener;
2223
useSymfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2324
useSymfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
@@ -218,6 +219,9 @@
218219
service('services_resetter'),
219220
])
220221

222+
->set('messenger.listener.reset_memory_usage', ResetMemoryUsageListener::class)
223+
->tag('kernel.event_subscriber')
224+
221225
->set('messenger.routable_message_bus', RoutableMessageBus::class)
222226
->args([
223227
abstract_arg('message bus locator'),

‎src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Add`Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and`Symfony\Component\Messenger\Stamp\DeduplicateStamp`
1010
* Add`--class-filter` option to the`messenger:failed:remove` command
1111
* Add`$stamps` parameter to`HandleTrait::handle`
12+
* Add`Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener` to reset PHP's peak memory usage for each processed message
1213

1314
7.2
1415
---
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\EventListener;
13+
14+
useSymfony\Component\EventDispatcher\EventSubscriberInterface;
15+
useSymfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
16+
useSymfony\Component\Messenger\Event\WorkerRunningEvent;
17+
18+
/**
19+
* @author Tim Düsterhus <tim@tideways-gmbh.com>
20+
*/
21+
finalclass ResetMemoryUsageListenerimplements EventSubscriberInterface
22+
{
23+
privatebool$collect =false;
24+
25+
publicfunctionresetBefore(WorkerMessageReceivedEvent$event):void
26+
{
27+
// Reset the peak memory usage for accurate measurement of the
28+
// memory usage on a per-message basis.
29+
memory_reset_peak_usage();
30+
$this->collect =true;
31+
}
32+
33+
publicfunctioncollectAfter(WorkerRunningEvent$event):void
34+
{
35+
if ($event->isWorkerIdle() &&$this->collect) {
36+
gc_collect_cycles();
37+
$this->collect =false;
38+
}
39+
}
40+
41+
publicstaticfunctiongetSubscribedEvents():array
42+
{
43+
return [
44+
WorkerMessageReceivedEvent::class => ['resetBefore', -1024],
45+
WorkerRunningEvent::class => ['collectAfter', -1024],
46+
];
47+
}
48+
}

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
useSymfony\Component\Messenger\Event\WorkerRunningEvent;
2727
useSymfony\Component\Messenger\Event\WorkerStartedEvent;
2828
useSymfony\Component\Messenger\Event\WorkerStoppedEvent;
29+
useSymfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
2930
useSymfony\Component\Messenger\EventListener\ResetServicesListener;
3031
useSymfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3132
useSymfony\Component\Messenger\Exception\RuntimeException;
@@ -586,7 +587,7 @@ public function testFlushBatchOnStop()
586587
$this->assertSame($expectedMessages,$handler->processedMessages);
587588
}
588589

589-
publicfunctiontestGcCollectCyclesIsCalledOnMessageHandle()
590+
publicfunctiontestGcCollectCyclesIsCalledOnIdleWorker()
590591
{
591592
$apiMessage =newDummyMessage('API');
592593

@@ -595,14 +596,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()
595596
$bus =$this->createMock(MessageBusInterface::class);
596597

597598
$dispatcher =newEventDispatcher();
599+
$dispatcher->addSubscriber(newResetMemoryUsageListener());
600+
$before =0;
601+
$dispatcher->addListener(WorkerRunningEvent::class,function (WorkerRunningEvent$event)use (&$before) {
602+
static$i =0;
603+
604+
$after =gc_status()['runs'];
605+
if (0 ===$i) {
606+
$this->assertFalse($event->isWorkerIdle());
607+
$this->assertSame(0,$after -$before);
608+
}elseif (1 ===$i) {
609+
$this->assertTrue($event->isWorkerIdle());
610+
$this->assertSame(1,$after -$before);
611+
}elseif (3 ===$i) {
612+
// Wait a few idle phases before stopping.
613+
$this->assertSame(1,$after -$before);
614+
$event->getWorker()->stop();
615+
}
616+
617+
$i++;
618+
},PHP_INT_MIN);
619+
620+
621+
$worker =newWorker(['transport' =>$receiver],$bus,$dispatcher);
622+
623+
gc_collect_cycles();
624+
$before =gc_status()['runs'];
625+
626+
$worker->run([
627+
'sleep' =>0,
628+
]);
629+
}
630+
631+
publicfunctiontestMemoryUsageIsResetOnMessageHandle()
632+
{
633+
$apiMessage =newDummyMessage('API');
634+
635+
$receiver =newDummyReceiver([[newEnvelope($apiMessage)]]);
636+
637+
$bus =$this->createMock(MessageBusInterface::class);
638+
639+
$dispatcher =newEventDispatcher();
640+
$dispatcher->addSubscriber(newResetMemoryUsageListener());
598641
$dispatcher->addSubscriber(newStopWorkerOnMessageLimitListener(1));
599642

643+
// Allocate and deallocate 4 MB. The use of random_int() is to
644+
// prevent compile-time optimization.
645+
$memory =str_repeat(random_int(0,1),4 *1024 *1024);
646+
unset($memory);
647+
648+
$before =memory_get_peak_usage();
649+
600650
$worker =newWorker(['transport' =>$receiver],$bus,$dispatcher);
601651
$worker->run();
602652

603-
$gcStatus =gc_status();
653+
// This should be roughly 4 MB smaller than $before.
654+
$after =memory_get_peak_usage();
604655

605-
$this->assertGreaterThan(0,$gcStatus['runs']);
656+
$this->assertTrue($after <$before);
606657
}
607658

608659
/**

‎src/Symfony/Component/Messenger/Worker.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ public function run(array $options = []): void
128128
// this should prevent multiple lower priority receivers from
129129
// blocking too long before the higher priority are checked
130130
if ($envelopeHandled) {
131-
gc_collect_cycles();
132-
133131
break;
134132
}
135133
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp