Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[HttpFoundation] Streamlining server event streaming#58743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
fabpot merged 1 commit intosymfony:7.3fromyceruto:event_stream_response
Feb 7, 2025

Conversation

yceruto
Copy link
Member

@ycerutoyceruto commentedNov 4, 2024
edited
Loading

QA
Branch?7.3
Bug fix?no
New feature?yes
Deprecations?no
Issues-
LicenseMIT

WhileMercure is the recommended option for most cases, implementing a custom Server-sent Event (SSE) backend relying on Symfony components is still a good alternative (considering you're building an app with a few users only)

Example using today’s code:

publicfunction__invoke():StreamedResponse{$response =newStreamedResponse(function () {foreach ($this->watchJobsInProgress()as$job) {echo"type: jobs\n";echo"data:".$job->toJson()."\n\n";            StreamedResponse::closeOutputBuffers(0,true);flush();if (connection_aborted()) {break;            }sleep(1);        }    });$response->headers->set('Content-Type','text/event-stream');$response->headers->set('Cache-Control','no-cache');$response->headers->set('Connection','keep-alive');$response->headers->set('X-Accel-Buffering','no');return$response;}

So, this PR simplifies that implementation by adding two new artifacts:

✨ NewServerEvent DTO class

Spec:https://html.spec.whatwg.org/multipage/server-sent-events.html

TheServerEvent class represents individual events streamed from the server to clients. It includes fields likedata,type,id,retry, andcomment, so it can cover a variety of use cases.

One key feature is thegetIterator() method, which automatically concat all these fields into the proper SSE format. This saves us from having to manually format the events, making the process a lot smoother. Plus, it supports iterabledata, meaning it can handle multi-line messages or more complex data structures all in one event.

✨ NewEventStreamResponse class

  • Simplified Headers Setup

    returnnewEventStreamResponse(/* ... */);

    This class automatically sets the required headers (Content-Type,Cache-Control, andConnection), ensuring that every response is properly configured for event streaming without additional code.

  • Generator-based Event Streaming with Auto Flush

    returnnewEventStreamResponse(function ():\Generator {yieldnewServerEvent(time(), type:'ping');});

    The callback function can nowyield events directly, automatically handling serialization and output buffering as each new event occurs.

  • Event Streaming with Sending Control

    returnnewEventStreamResponse(function (EventStreamResponse$response) {$response->sendEvent(newServerEvent('...'));// do something in between ...$response->sendEvent(newServerEvent('...'));});

    The callback function receives a new argument—the response instance—allowing you to manuallysendEvent().

    This method is especially useful whenyield is not possible due to scope limitations. In the example below, the callback handles the event inside an event stream response, listening for Redis messages and directly triggering thesendEvent() method:

    returnnewEventStreamResponse(function (EventStreamResponse$response) {$redis =new \Redis();$redis->connect('127.0.0.1');$redis->subscribe(['message'],function (/* ... */,string$message)use ($response) {$response->sendEvent(newServerEvent($message));      });  });
  • Retry Event Fallback

    returnnewEventStreamResponse(/* ... */, retry:1000);

    ServerEvent allows setting individualretry intervals, which take precedence over the default retry configured inEventStreamResponse. If not specified at the event level, the global retry setting applies.

This is the final, optimized version after the proposed improvements:

publicfunction__invoke():EventStreamResponse{returnnewEventStreamResponse(function () {foreach ($this->watchJobsInProgress()as$job) {yieldnewServerEvent($job->toJson(), type:'jobs');sleep(1);        }    });}

Cheers!

tucksaun, welcoMattic, mtarld, and lyrixx reacted with thumbs up emojiwelcoMattic, GromNaN, lyrixx, and yceruto reacted with heart emoji
@carsonbotcarsonbot added Status: Needs Review DXDX = Developer eXperience (anything that improves the experience of using Symfony) Feature HttpFoundation labelsNov 4, 2024
@carsonbotcarsonbot added this to the7.2 milestoneNov 4, 2024
@carsonbotcarsonbot changed the title[HttpFoundation][DX] Streamlining server event streaming[HttpFoundation] Streamlining server event streamingNov 4, 2024
@nicolas-grekas
Copy link
Member

From an infrastructure PoV, this doesn't make sense to me: doing it like that, each stream connection consumes way to much memory to make anything useful. This can scale up to maybe hundreds of connections (maybe tenth) and then you hit a dead-end.

@yceruto
Copy link
MemberAuthor

yceruto commentedNov 5, 2024
edited
Loading

@nicolas-grekas If we do it with the Symfony stack for sure, I agree with you, but we can still use the HttpFoundation component with ReactPHP and the proper infrastructure, right?

@nicolas-grekas
Copy link
Member

Sure, but is that a real use case? How would such an app look like?

@yceruto
Copy link
MemberAuthor

I don't have a real use case with ReactPHP, just with Symfony itself and an app within a private network. My infra is very small and I don't have the connections/memory problems in this case, just a few users.

My intention here is to simplify things for these straightforward cases; otherwise I'd recommend using Mercure of course.

@fabpotfabpot modified the milestones:7.2,7.3Nov 20, 2024
@yceruto
Copy link
MemberAuthor

I still think these improvements can significantly enhance the developer experience for small applications with a limited number of connections or where memory consumption is manageable.

Friendly ping @symfony/mergers is this something unconventional?

@faizanakram99
Copy link
Contributor

faizanakram99 commentedDec 20, 2024
edited
Loading

Fwiw, I think it's useful too. We also have something similar in house mainly for live feedback, for interoperability with cli (symfony commands), it extends Output class (not ideal I know but fulfills the needs).

So yes, we would benefit from this change.

<?phpdeclare(strict_types=1);namespaceZzz\Common;usePsr\Log\LoggerInterface;useSymfony\Component\Console\Output\Output;useSymfony\Contracts\Service\Attribute\Required;class EventSourceOutputextends Output{    #[Required]publicLoggerInterface$logger;privatebool$streamStarted =false;publicfunctionwriteln(string|iterable$messages,int$options =self::OUTPUT_RAW):void    {parent::writeln($messages,$options);    }publicfunctionwrite(string|iterable$messages,bool$newline =false,int$options =self::OUTPUT_RAW):void    {parent::write($messages,$newline,$options);    }protectedfunctiondoWrite(string$message,bool$newline):void    {echo"{$this->stream($message)}\n\n";    }privatefunctionstream(string$message):string    {if (!$this->streamStarted) {$this->startStream();        }if ((bool)\preg_match('|^<(\w+)>(.+)</(\w+)>$|',$message,$matches)) {return'close' ===$matches[1]                ?"data:{$matches[2]}"                :$this->prepareStream($matches[1],$matches[2]);        }return$this->prepareStream('info', (string)\preg_replace('/\s+/','',\strip_tags($message)));    }privatefunctionprepareStream(string$type,string$text):string    {if (!(bool)\preg_match('!!u',$text)) {// Not UTF-8$text ='...';        }try {return\sprintf('data: %s',\json_encode(['type' =>$type,'text' =>$text], \JSON_THROW_ON_ERROR));        }catch (\JsonException$e) {$this->logger->critical($e->getMessage(), ['exception' =>$e]);return'data: END_SSE';        }    }privatefunctionstartStream():void    {\ini_set('output_buffering','off');\ini_set('zlib.output_compression','0');\ob_implicit_flush();while (\ob_get_level() >0) {\ob_end_clean();        }if (\function_exists('apache_setenv')) {apache_setenv('no-gzip','1');apache_setenv('dont-vary','1');        }\header('Content-Type: text/event-stream');\header('Cache-Control: no-cache');\header('Connection: keep-alive');\header('X-Accel-Buffering: no');echo\str_pad('',10000);echo"\n";$this->streamStarted =true;    }publicfunctionhasStreamStarted():bool    {return$this->streamStarted;    }publicfunctionendStream():void    {$this->writeln('<close>END_SSE</close>');    }}

Do note thatX-Accel-Buffering: no header is needed for ngjnx, otherwise it buffers output

yceruto and fabpot reacted with thumbs up emoji

@welcoMattic
Copy link
Member

I like this, as it allows developers of small/private projects with small traffic to leverage SSE without supporting a Mercure instance.

But, I also agree with Nicolas, this can quickly leads to high memory consumption and this should be well documented as a warning.

yceruto and chapterjason reacted with heart emoji

@dunglas
Copy link
Member

I'm in favor of this feature too.

Mercure focus is event broadcasting, which cannot be (easily) achieved using this patch. But SSE is also useful for other use cases than broadcasting.

For instance, this feature could be useful for hinting the client about the progress of a synchronous job.

Regarding the memory/performance concerns, mentioning in the documentation that for broadcasting but also for high-traffic sites using Mercure would be more straightforward should be good enough.

welcoMattic, jvasseur, yceruto, and chapterjason reacted with thumbs up emojiyceruto and chapterjason reacted with heart emoji

@yceruto
Copy link
MemberAuthor

Happy to read your positive comments! I'll be addressing them soon, thanks!

dunglas and welcoMattic reacted with thumbs up emoji

@ycerutoycerutoforce-pushed theevent_stream_response branch 5 times, most recently fromb692c6c to3abc5b1CompareFebruary 6, 2025 05:17
@ycerutoycerutoforce-pushed theevent_stream_response branch 2 times, most recently fromdb27a24 to2498bd8CompareFebruary 6, 2025 05:31
@yceruto
Copy link
MemberAuthor

I’ve addressed all the comments. hopefully, I understood them all correctly.
I also updated the PR description with a more specific use case.

it's ready for review again 🤞

@fabpot
Copy link
Member

Thank you@yceruto.

@fabpotfabpot merged commitb27f2ec intosymfony:7.3Feb 7, 2025
9 of 11 checks passed
@welcoMattic
Copy link
Member

Side note to mention that this feature will ease the integration of Symfony apps asMCP servers, which as 2 transports modes:stdio andSSE. 🤖 cc@lyrixx

yceruto and lyrixx reacted with thumbs up emojiyceruto and lyrixx reacted with rocket emoji

@ycerutoyceruto deleted the event_stream_response branchFebruary 7, 2025 14:13
@fabpotfabpot mentioned this pull requestMay 2, 2025
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@jvasseurjvasseurjvasseur left review comments

@faizanakram99faizanakram99faizanakram99 left review comments

@fabpotfabpotfabpot approved these changes

@dunglasdunglasdunglas approved these changes

Assignees
No one assigned
Labels
DXDX = Developer eXperience (anything that improves the experience of using Symfony)FeatureHttpFoundationStatus: Reviewed
Projects
None yet
Milestone
7.3
Development

Successfully merging this pull request may close these issues.

8 participants
@yceruto@nicolas-grekas@faizanakram99@welcoMattic@dunglas@fabpot@jvasseur@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp