Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit97e4cd7

Browse files
committed
Implement BatchAsyncHandlerTrait for parallel message processing
1 parent522a316 commit97e4cd7

File tree

2 files changed

+391
-0
lines changed

2 files changed

+391
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Handler;
13+
14+
useAmp\Future;
15+
useSymfony\Component\Messenger\Stamp\FutureStamp;
16+
useSymfony\Component\Messenger\ParallelMessageBus;
17+
useSymfony\Component\Messenger\Envelope;
18+
19+
/**
20+
* A batch handler trait designed for parallel execution using ParallelMessageBus.
21+
*
22+
* This trait collects jobs in worker-specific batches and processes them
23+
* in parallel by dispatching each job individually through ParallelMessageBus.
24+
*/
25+
trait BatchAsyncHandlerTrait
26+
{
27+
/** @var array<string,array> Map of worker IDs to their job batches */
28+
privatearray$workerJobs = [];
29+
30+
/** @var ParallelMessageBus|null */
31+
private ?ParallelMessageBus$parallelBus =null;
32+
33+
/**
34+
* Set the parallel message bus to use for dispatching jobs.
35+
*/
36+
publicfunctionsetParallelMessageBus(ParallelMessageBus$bus):void
37+
{
38+
$this->parallelBus =$bus;
39+
}
40+
41+
publicfunctionflush(bool$force):void
42+
{
43+
$workerId =$this->getCurrentWorkerId();
44+
45+
if (isset($this->workerJobs[$workerId]) &&$jobs =$this->workerJobs[$workerId]) {
46+
$this->workerJobs[$workerId] = [];
47+
48+
if ($this->parallelBus) {
49+
// Process each job in parallel using ParallelMessageBus
50+
$futures = [];
51+
52+
foreach ($jobsas [$message,$ack]) {
53+
// Dispatch each message individually
54+
$envelope =$this->parallelBus->dispatch($message);
55+
56+
$futureStamp =$envelope->last(FutureStamp::class);
57+
if ($futureStamp) {
58+
/** @var Future $future */
59+
$future =$futureStamp->getFuture();
60+
$futures[] = [$future,$ack];
61+
}
62+
}
63+
64+
// If force is true, wait for all results
65+
if ($force &&$futures) {
66+
foreach ($futuresas [$future,$ack]) {
67+
try {
68+
$result =$future->await();
69+
$ack->ack($result);
70+
}catch (\Throwable$e) {
71+
$ack->nack($e);
72+
}
73+
}
74+
}
75+
}else {
76+
// Fallback to synchronous processing
77+
$this->process($jobs);
78+
}
79+
}
80+
}
81+
82+
/**
83+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
84+
*
85+
* @return mixed The number of pending messages in the batch if $ack is not null,
86+
* the result from handling the message otherwise
87+
*/
88+
privatefunctionhandle(object$message, ?Acknowledger$ack):mixed
89+
{
90+
$workerId =$this->getCurrentWorkerId();
91+
92+
if (!isset($this->workerJobs[$workerId])) {
93+
$this->workerJobs[$workerId] = [];
94+
}
95+
96+
if (null ===$ack) {
97+
$ack =newAcknowledger(get_debug_type($this));
98+
$this->workerJobs[$workerId][] = [$message,$ack];
99+
$this->flush(true);
100+
101+
return$ack->getResult();
102+
}
103+
104+
$this->workerJobs[$workerId][] = [$message,$ack];
105+
if (!$this->shouldFlush()) {
106+
return\count($this->workerJobs[$workerId]);
107+
}
108+
109+
$this->flush(true);
110+
111+
return0;
112+
}
113+
114+
privatefunctionshouldFlush():bool
115+
{
116+
$workerId =$this->getCurrentWorkerId();
117+
return$this->getBatchSize() <=\count($this->workerJobs[$workerId] ?? []);
118+
}
119+
120+
/**
121+
* Generates a unique identifier for the current worker context.
122+
*/
123+
privatefunctiongetCurrentWorkerId():string
124+
{
125+
// In a worker pool, each worker has a unique ID
126+
returngetmypid() ?:'default-worker';
127+
}
128+
129+
/**
130+
* Cleans up worker-specific resources when a worker completes its job.
131+
*/
132+
publicfunctioncleanupWorker():void
133+
{
134+
$workerId =$this->getCurrentWorkerId();
135+
136+
// Flush any remaining jobs before cleaning up
137+
if (isset($this->workerJobs[$workerId]) && !empty($this->workerJobs[$workerId])) {
138+
$this->flush(true);
139+
}
140+
141+
unset($this->workerJobs[$workerId]);
142+
}
143+
144+
/**
145+
* Completes the jobs in the list.
146+
* This is used as a fallback when ParallelMessageBus is not available.
147+
*
148+
* @param list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
149+
*/
150+
abstractprivatefunctionprocess(array$jobs):void;
151+
152+
privatefunctiongetBatchSize():int
153+
{
154+
return10;
155+
}
156+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Tests\Handler;
13+
14+
useAmp\Future;
15+
usePHPUnit\Framework\TestCase;
16+
useSymfony\Component\Messenger\Envelope;
17+
useSymfony\Component\Messenger\Handler\Acknowledger;
18+
useSymfony\Component\Messenger\Handler\BatchAsyncHandlerTrait;
19+
useSymfony\Component\Messenger\ParallelMessageBus;
20+
useSymfony\Component\Messenger\Stamp\FutureStamp;
21+
22+
class BatchAsyncHandlerTraitTestextends TestCase
23+
{
24+
publicfunctiontestHandleWillProcessMessagesAfterReachingBatchSize()
25+
{
26+
$handler =newTestBatchAsyncHandler();
27+
$handler->setBatchSize(2);
28+
29+
$message1 =new \stdClass();
30+
$message2 =new \stdClass();
31+
$message3 =new \stdClass();
32+
33+
$result1 =$handler->handle($message1,newAcknowledger('TestBatchAsyncHandler'));
34+
$this->assertSame(1,$result1);
35+
$this->assertEmpty($handler->getProcessedJobs());
36+
37+
$result2 =$handler->handle($message2,newAcknowledger('TestBatchAsyncHandler'));
38+
$this->assertSame(0,$result2);
39+
$this->assertCount(2,$handler->getProcessedJobs()[0]);
40+
41+
$result3 =$handler->handle($message3,newAcknowledger('TestBatchAsyncHandler'));
42+
$this->assertSame(1,$result3);
43+
$this->assertCount(2,$handler->getProcessedJobs()[0]);
44+
}
45+
46+
publicfunctiontestHandleWithNullAcknowledgerProcessesImmediately()
47+
{
48+
$handler =newTestBatchAsyncHandler();
49+
$handler->setBatchSize(5);
50+
51+
$message =new \stdClass();
52+
$message->payload ='test';
53+
54+
$result =$handler->handle($message,null);
55+
$this->assertSame('processed:test',$result);
56+
$this->assertCount(1,$handler->getProcessedJobs()[0]);
57+
}
58+
59+
publicfunctiontestFlushWithForceTrueProcessesRegardlessOfBatchSize()
60+
{
61+
$handler =newTestBatchAsyncHandler();
62+
$handler->setBatchSize(5);
63+
64+
$message1 =new \stdClass();
65+
$message2 =new \stdClass();
66+
67+
$handler->handle($message1,newAcknowledger('TestBatchAsyncHandler'));
68+
$handler->handle($message2,newAcknowledger('TestBatchAsyncHandler'));
69+
70+
$this->assertCount(0,$handler->getProcessedJobs());
71+
72+
$handler->flush(true);
73+
$this->assertCount(2,$handler->getProcessedJobs()[0]);
74+
}
75+
76+
publicfunctiontestParallelDispatch()
77+
{
78+
$message1 =new \stdClass();
79+
$message1->payload ='test1';
80+
81+
$message2 =new \stdClass();
82+
$message2->payload ='test2';
83+
84+
$future1 =$this->createMock(Future::class);
85+
$future1->expects($this->once())
86+
->method('await')
87+
->willReturn('future_result1');
88+
89+
$future2 =$this->createMock(Future::class);
90+
$future2->expects($this->once())
91+
->method('await')
92+
->willReturn('future_result2');
93+
94+
$futureStamp1 =newFutureStamp($future1);
95+
$futureStamp2 =newFutureStamp($future2);
96+
97+
$envelope1 =newEnvelope($message1, [$futureStamp1]);
98+
$envelope2 =newEnvelope($message2, [$futureStamp2]);
99+
100+
$parallelBus =$this->createMock(ParallelMessageBus::class);
101+
$parallelBus->expects($this->exactly(2))
102+
->method('dispatch')
103+
->willReturnOnConsecutiveCalls($envelope1,$envelope2);
104+
105+
$handler =newTestBatchAsyncHandler();
106+
$handler->setBatchSize(2);
107+
$handler->setParallelMessageBus($parallelBus);
108+
109+
$ack1 =$this->createMock(Acknowledger::class);
110+
$ack1->expects($this->once())
111+
->method('ack')
112+
->with('future_result1');
113+
114+
$ack2 =$this->createMock(Acknowledger::class);
115+
$ack2->expects($this->once())
116+
->method('ack')
117+
->with('future_result2');
118+
119+
$handler->handle($message1,$ack1);
120+
$handler->handle($message2,$ack2);
121+
122+
// Jobs should have been dispatched via the parallel bus
123+
$this->assertEmpty($handler->getProcessedJobs());
124+
}
125+
126+
publicfunctiontestFallbackToSyncProcessingWhenNoBusAvailable()
127+
{
128+
$handler =newTestBatchAsyncHandler();
129+
$handler->setBatchSize(2);
130+
131+
$message1 =new \stdClass();
132+
$message2 =new \stdClass();
133+
134+
$handler->handle($message1,newAcknowledger('TestBatchAsyncHandler'));
135+
$handler->handle($message2,newAcknowledger('TestBatchAsyncHandler'));
136+
137+
$this->assertCount(2,$handler->getProcessedJobs()[0]);
138+
}
139+
140+
publicfunctiontestParallelDispatchWithException()
141+
{
142+
$message =new \stdClass();
143+
$message->payload ='test_exception';
144+
145+
$future =$this->createMock(Future::class);
146+
$future->expects($this->once())
147+
->method('await')
148+
->willThrowException(new \RuntimeException('Test exception'));
149+
150+
$futureStamp =newFutureStamp($future);
151+
$envelope =newEnvelope($message, [$futureStamp]);
152+
153+
$parallelBus =$this->createMock(ParallelMessageBus::class);
154+
$parallelBus->expects($this->once())
155+
->method('dispatch')
156+
->willReturn($envelope);
157+
158+
$handler =newTestBatchAsyncHandler();
159+
$handler->setParallelMessageBus($parallelBus);
160+
161+
$ack =$this->createMock(Acknowledger::class);
162+
$ack->expects($this->once())
163+
->method('nack')
164+
->with($this->isInstanceOf(\RuntimeException::class));
165+
166+
$handler->handle($message,$ack);
167+
$handler->flush(true);
168+
}
169+
170+
publicfunctiontestCleanupWorker()
171+
{
172+
$handler =newTestBatchAsyncHandler();
173+
174+
$message =new \stdClass();
175+
$handler->handle($message,newAcknowledger('TestBatchAsyncHandler'));
176+
177+
$this->assertEmpty($handler->getProcessedJobs());
178+
179+
$handler->cleanupWorker();
180+
181+
$this->assertCount(1,$handler->getProcessedJobs()[0]);
182+
}
183+
}
184+
185+
/**
186+
* Test implementation of BatchAsyncHandlerTrait.
187+
*/
188+
class TestBatchAsyncHandler
189+
{
190+
use BatchAsyncHandlerTrait {
191+
BatchAsyncHandlerTrait::getCurrentWorkerIdasprivate traitGetCurrentWorkerId;
192+
}
193+
194+
privateint$batchSize =10;
195+
privatearray$processedJobs = [];
196+
197+
publicfunctionhandle(object$message, ?Acknowledger$ack):mixed
198+
{
199+
return$this->{'handle'}($message,$ack);
200+
}
201+
202+
privatefunctionprocess(array$jobs):void
203+
{
204+
$this->processedJobs[] =$jobs;
205+
206+
foreach ($jobsas [$message,$ack]) {
207+
if (property_exists($message,'payload')) {
208+
$result ='processed:' .$message->payload;
209+
$ack->ack($result);
210+
}else {
211+
$ack->ack(true);
212+
}
213+
}
214+
}
215+
216+
publicfunctionsetBatchSize(int$size):void
217+
{
218+
$this->batchSize =$size;
219+
}
220+
221+
publicfunctiongetProcessedJobs():array
222+
{
223+
return$this->processedJobs;
224+
}
225+
226+
privatefunctiongetBatchSize():int
227+
{
228+
return$this->batchSize;
229+
}
230+
231+
privatefunctiongetCurrentWorkerId():string
232+
{
233+
return'test-worker-id';
234+
}
235+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp