Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb27f2ec

Browse files
committed
feature#58743 [HttpFoundation] Streamlining server event streaming (yceruto)
This PR was merged into the 7.3 branch.Discussion----------[HttpFoundation] Streamlining server event streaming| Q | A| ------------- | ---| Branch? | 7.3| Bug fix? | no| New feature? | yes| Deprecations? | no| Issues | -| License | MITWhile [Mercure](https://mercure.rocks/) is the recommended option for most cases, implementing a custom Server-sent Event (SSE) backend relying on Symfony components is still a good alternative.Example using today’s code:```phppublic function __invoke(): StreamedResponse{ $response = new StreamedResponse(function () { foreach ($this->watchJobsInProgress() as $job) { echo "type: jobs\n"; echo "data: ".$job->toJson()."\n\n"; StreamedResponse::closeOutputBuffers(0, true); flush(); if (connection_aborted()) { break; } sleep(1); } }); $response->headers->set('Content-Type', 'text/event-stream'); $response->headers->set('Cache-Control', 'no-cache'); $response->headers->set('Connection', 'keep-alive'); $response->headers->set('X-Accel-Buffering', 'no'); return $response;}```So, this PR simplifies that implementation by adding two new artifacts:### ✨ New `ServerEvent` DTO classSpec:https://html.spec.whatwg.org/multipage/server-sent-events.htmlThe `ServerEvent` class represents individual events streamed from the server to clients. It includes fields like `data`, `type`, `id`, `retry`, and `comment`, so it can cover a variety of use cases.One key feature is the `__toString()` method, which automatically serializes all these fields into the proper SSE format. This saves us from having to manually format the events, making the process a lot smoother. Plus, it supports iterable `data`, meaning it can handle multi-line messages or more complex data structures all in one event.### ✨ New `EventStreamResponse` class* **Simplified Headers Setup** ```php return new EventStreamResponse(/* ... */); ``` This class automatically sets the required headers (`Content-Type`, `Cache-Control`, and `Connection`), ensuring that every response is properly configured for event streaming without additional code.* **Generator-based Event Streaming with Auto Flush** ```php return new EventStreamResponse(function (): \Generator { yield new ServerEvent(time(), type: 'ping'); }); ``` The callback function can now `yield` events directly, automatically handling serialization and output buffering as each new event occurs.* **Event Streaming with Sending Control** ```php return new EventStreamResponse(function (EventStreamResponse $response) { $response->sendEvent(new ServerEvent('...')); // do something in between ... $response->sendEvent(new ServerEvent('...')); }); ``` The callback function receives a new argument—the response instance—allowing you to manually `sendEvent()`. This method is especially useful when `yield` is not possible due to scope limitations. In the example below, the callback handles the event inside an event stream response, listening for Redis messages and directly triggering the `sendEvent()` method: ```php return new EventStreamResponse(function (EventStreamResponse $response) { $redis = new \Redis(); $redis->connect('127.0.0.1'); $redis->subscribe(['message'], function (/* ... */, string $message) use ($response) { $response->sendEvent(new ServerEvent($message)); }); }); ```* **Retry Event Fallback** ```php return new EventStreamResponse(/* ... */, retry: 1000); ``` `ServerEvent` allows setting individual `retry` intervals, which take precedence over the default retry configured in `EventStreamResponse`. If not specified at the event level, the global retry setting applies.This is the final, optimized version after the proposed improvements:```phppublic function __invoke(): EventStreamResponse{ return new EventStreamResponse(function () { foreach ($this->watchJobsInProgress() as $job) { yield new ServerEvent($job->toJson(), type: 'jobs'); sleep(1); } });}```Cheers!Commits-------0bfd3d3 Streamlining server event streaming
2 parents3974926 +0bfd3d3 commitb27f2ec

File tree

4 files changed

+385
-0
lines changed

4 files changed

+385
-0
lines changed

‎src/Symfony/Component/HttpFoundation/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add support for iterable of string in`StreamedResponse`
8+
* Add`EventStreamResponse` and`ServerEvent` classes to streamline server event streaming
89

910
7.2
1011
---
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\HttpFoundation;
13+
14+
/**
15+
* Represents a streaming HTTP response for sending server events
16+
* as part of the Server-Sent Events (SSE) streaming technique.
17+
*
18+
* To broadcast events to multiple users at once, for long-running
19+
* connections and for high-traffic websites, prefer using the Mercure
20+
* Symfony Component, which relies on Software designed for these use
21+
* cases: https://symfony.com/doc/current/mercure.html
22+
*
23+
* @see ServerEvent
24+
*
25+
* @author Yonel Ceruto <open@yceruto.dev>
26+
*
27+
* Example usage:
28+
*
29+
* return new EventStreamResponse(function () {
30+
* yield new ServerEvent(time());
31+
*
32+
* sleep(1);
33+
*
34+
* yield new ServerEvent(time());
35+
* });
36+
*/
37+
class EventStreamResponseextends StreamedResponse
38+
{
39+
/**
40+
* @param int|null $retry The number of milliseconds the client should wait
41+
* before reconnecting in case of network failure
42+
*/
43+
publicfunction__construct(?callable$callback =null,int$status =200,array$headers = [],private ?int$retry =null)
44+
{
45+
$headers += [
46+
'Connection' =>'keep-alive',
47+
'Content-Type' =>'text/event-stream',
48+
'Cache-Control' =>'private, no-cache, no-store, must-revalidate, max-age=0',
49+
'X-Accel-Buffering' =>'no',
50+
'Pragma' =>'no-cache',
51+
'Expire' =>'0',
52+
];
53+
54+
parent::__construct($callback,$status,$headers);
55+
}
56+
57+
publicfunctionsetCallback(callable$callback):static
58+
{
59+
if ($this->callback) {
60+
returnparent::setCallback($callback);
61+
}
62+
63+
$this->callback =function ()use ($callback) {
64+
if (is_iterable($events =$callback($this))) {
65+
foreach ($eventsas$event) {
66+
$this->sendEvent($event);
67+
68+
if (connection_aborted()) {
69+
break;
70+
}
71+
}
72+
}
73+
};
74+
75+
return$this;
76+
}
77+
78+
/**
79+
* Sends a server event to the client.
80+
*
81+
* @return $this
82+
*/
83+
publicfunctionsendEvent(ServerEvent$event):static
84+
{
85+
if ($this->retry >0 && !$event->getRetry()) {
86+
$event->setRetry($this->retry);
87+
}
88+
89+
foreach ($eventas$part) {
90+
echo$part;
91+
92+
if (!\in_array(\PHP_SAPI, ['cli','phpdbg','embed'],true)) {
93+
static::closeOutputBuffers(0,true);
94+
flush();
95+
}
96+
}
97+
98+
return$this;
99+
}
100+
101+
publicfunctiongetRetry(): ?int
102+
{
103+
return$this->retry;
104+
}
105+
106+
publicfunctionsetRetry(int$retry):void
107+
{
108+
$this->retry =$retry;
109+
}
110+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\HttpFoundation;
13+
14+
/**
15+
* An event generated on the server intended for streaming to the client
16+
* as part of the SSE streaming technique.
17+
*
18+
* @implements \IteratorAggregate<string>
19+
*
20+
* @author Yonel Ceruto <open@yceruto.dev>
21+
*/
22+
class ServerEventimplements \IteratorAggregate
23+
{
24+
/**
25+
* @param string|iterable<string> $data The event data field for the message
26+
* @param string|null $type The event type
27+
* @param int|null $retry The number of milliseconds the client should wait
28+
* before reconnecting in case of network failure
29+
* @param string|null $id The event ID to set the EventSource object's last event ID value
30+
* @param string|null $comment The event comment
31+
*/
32+
publicfunction__construct(
33+
privatestring|iterable$data,
34+
private ?string$type =null,
35+
private ?int$retry =null,
36+
private ?string$id =null,
37+
private ?string$comment =null,
38+
) {
39+
}
40+
41+
publicfunctiongetData():iterable|string
42+
{
43+
return$this->data;
44+
}
45+
46+
/**
47+
* @return $this
48+
*/
49+
publicfunctionsetData(iterable|string$data):static
50+
{
51+
$this->data =$data;
52+
53+
return$this;
54+
}
55+
56+
publicfunctiongetType(): ?string
57+
{
58+
return$this->type;
59+
}
60+
61+
/**
62+
* @return $this
63+
*/
64+
publicfunctionsetType(string$type):static
65+
{
66+
$this->type =$type;
67+
68+
return$this;
69+
}
70+
71+
publicfunctiongetRetry(): ?int
72+
{
73+
return$this->retry;
74+
}
75+
76+
/**
77+
* @return $this
78+
*/
79+
publicfunctionsetRetry(?int$retry):static
80+
{
81+
$this->retry =$retry;
82+
83+
return$this;
84+
}
85+
86+
publicfunctiongetId(): ?string
87+
{
88+
return$this->id;
89+
}
90+
91+
/**
92+
* @return $this
93+
*/
94+
publicfunctionsetId(string$id):static
95+
{
96+
$this->id =$id;
97+
98+
return$this;
99+
}
100+
101+
publicfunctiongetComment(): ?string
102+
{
103+
return$this->comment;
104+
}
105+
106+
publicfunctionsetComment(string$comment):static
107+
{
108+
$this->comment =$comment;
109+
110+
return$this;
111+
}
112+
113+
/**
114+
* @return \Traversable<string>
115+
*/
116+
publicfunctiongetIterator():\Traversable
117+
{
118+
static$lastRetry =null;
119+
120+
$head ='';
121+
if ($this->comment) {
122+
$head .=\sprintf(': %s',$this->comment)."\n";
123+
}
124+
if ($this->id) {
125+
$head .=\sprintf('id: %s',$this->id)."\n";
126+
}
127+
if ($this->retry >0 &&$this->retry !==$lastRetry) {
128+
$head .=\sprintf('retry: %s',$lastRetry =$this->retry)."\n";
129+
}
130+
if ($this->type) {
131+
$head .=\sprintf('event: %s',$this->type)."\n";
132+
}
133+
yield$head;
134+
135+
if ($this->data) {
136+
if (is_iterable($this->data)) {
137+
foreach ($this->dataas$data) {
138+
yield\sprintf('data: %s',$data)."\n";
139+
}
140+
}else {
141+
yield\sprintf('data: %s',$this->data)."\n";
142+
}
143+
}
144+
145+
yield"\n";
146+
}
147+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\HttpFoundation\Tests;
13+
14+
usePHPUnit\Framework\TestCase;
15+
useSymfony\Component\HttpFoundation\EventStreamResponse;
16+
useSymfony\Component\HttpFoundation\ServerEvent;
17+
18+
class EventStreamResponseTestextends TestCase
19+
{
20+
publicfunctiontestInitializationWithDefaultValues()
21+
{
22+
$response =newEventStreamResponse();
23+
24+
$this->assertSame('text/event-stream',$response->headers->get('content-type'));
25+
$this->assertSame('max-age=0, must-revalidate, no-cache, no-store, private',$response->headers->get('cache-control'));
26+
$this->assertSame('keep-alive',$response->headers->get('connection'));
27+
28+
$this->assertSame(200,$response->getStatusCode());
29+
$this->assertNull($response->getRetry());
30+
}
31+
32+
publicfunctiontestStreamSingleEvent()
33+
{
34+
$response =newEventStreamResponse(function () {
35+
yieldnewServerEvent(
36+
data:'foo',
37+
type:'bar',
38+
retry:100,
39+
id:'1',
40+
comment:'bla bla',
41+
);
42+
});
43+
44+
$expected =<<<STR
45+
: bla bla
46+
id: 1
47+
retry: 100
48+
event: bar
49+
data: foo
50+
51+
52+
STR;
53+
54+
$this->assertSameResponseContent($expected,$response);
55+
}
56+
57+
publicfunctiontestStreamEventsAndData()
58+
{
59+
$data =staticfunction ():iterable {
60+
yield'first line';
61+
yield'second line';
62+
yield'third line';
63+
};
64+
65+
$response =newEventStreamResponse(function ()use ($data) {
66+
yieldnewServerEvent('single line');
67+
yieldnewServerEvent(['first line','second line']);
68+
yieldnewServerEvent($data());
69+
});
70+
71+
$expected =<<<STR
72+
data: single line
73+
74+
data: first line
75+
data: second line
76+
77+
data: first line
78+
data: second line
79+
data: third line
80+
81+
82+
STR;
83+
84+
$this->assertSameResponseContent($expected,$response);
85+
}
86+
87+
publicfunctiontestStreamEventsWithRetryFallback()
88+
{
89+
$response =newEventStreamResponse(function () {
90+
yieldnewServerEvent('foo');
91+
yieldnewServerEvent('bar');
92+
yieldnewServerEvent('baz', retry:1000);
93+
}, retry:1500);
94+
95+
$expected =<<<STR
96+
retry: 1500
97+
data: foo
98+
99+
data: bar
100+
101+
retry: 1000
102+
data: baz
103+
104+
105+
STR;
106+
107+
$this->assertSameResponseContent($expected,$response);
108+
}
109+
110+
publicfunctiontestStreamEventWithSendMethod()
111+
{
112+
$response =newEventStreamResponse(function (EventStreamResponse$response) {
113+
$response->sendEvent(newServerEvent('foo'));
114+
});
115+
116+
$this->assertSameResponseContent("data: foo\n\n",$response);
117+
}
118+
119+
privatefunctionassertSameResponseContent(string$expected,EventStreamResponse$response):void
120+
{
121+
ob_start();
122+
$response->send();
123+
$actual =ob_get_clean();
124+
125+
$this->assertSame($expected,$actual);
126+
}
127+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp