2727 */
2828class Connection
2929{
30+ private const DEFAULT_OPTIONS = [
31+ 'stream ' =>'messages ' ,
32+ 'group ' =>'symfony ' ,
33+ 'consumer ' =>'consumer ' ,
34+ 'auto_setup ' =>true ,
35+ ];
36+
3037private $ connection ;
3138private $ stream ;
3239private $ group ;
@@ -38,9 +45,10 @@ public function __construct(array $configuration, array $connectionCredentials =
3845$ this ->connection =$ redis ?:new \Redis ();
3946$ this ->connection ->connect ($ connectionCredentials ['host ' ] ??'127.0.0.1 ' ,$ connectionCredentials ['port ' ] ??6379 );
4047$ this ->connection ->setOption (\Redis::OPT_SERIALIZER ,$ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP );
41- $ this ->stream =$ configuration ['stream ' ] ??'' ?:'messages ' ;
42- $ this ->group =$ configuration ['group ' ] ??'' ?:'symfony ' ;
43- $ this ->consumer =$ configuration ['consumer ' ] ??'' ?:'consumer ' ;
48+ $ this ->stream =$ configuration ['stream ' ] ??self ::DEFAULT_OPTIONS ['stream ' ];
49+ $ this ->group =$ configuration ['group ' ] ??self ::DEFAULT_OPTIONS ['group ' ];
50+ $ this ->consumer =$ configuration ['consumer ' ] ??self ::DEFAULT_OPTIONS ['consumer ' ];
51+ $ this ->autoSetup =$ configuration ['auto_setup ' ] ??self ::DEFAULT_OPTIONS ['auto_setup ' ];
4452 }
4553
4654public static function fromDsn (string $ dsn ,array $ redisOptions = [],\Redis $ redis =null ):self
@@ -51,9 +59,9 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
5159
5260$ pathParts =explode ('/ ' ,$ parsedUrl ['path ' ] ??'' );
5361
54- $ stream =$ pathParts [1 ] ??'' ;
55- $ group =$ pathParts [2 ] ??'' ;
56- $ consumer =$ pathParts [3 ] ??'' ;
62+ $ stream =$ pathParts [1 ] ??null ;
63+ $ group =$ pathParts [2 ] ??null ;
64+ $ consumer =$ pathParts [3 ] ??null ;
5765
5866$ connectionCredentials = [
5967'host ' =>$ parsedUrl ['host ' ] ??'127.0.0.1 ' ,
@@ -64,11 +72,21 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
6472parse_str ($ parsedUrl ['query ' ],$ redisOptions );
6573 }
6674
67- return new self (['stream ' =>$ stream ,'group ' =>$ group ,'consumer ' =>$ consumer ],$ connectionCredentials ,$ redisOptions ,$ redis );
75+ $ autoSetup =null ;
76+ if (\array_key_exists ('auto_setup ' ,$ redisOptions )) {
77+ $ autoSetup =filter_var ($ redisOptions ['auto_setup ' ],FILTER_VALIDATE_BOOLEAN );
78+ unset($ redisOptions ['auto_setup ' ]);
79+ }
80+
81+ return new self (['stream ' =>$ stream ,'group ' =>$ group ,'consumer ' =>$ consumer ,'auto_setup ' =>$ autoSetup ],$ connectionCredentials ,$ redisOptions ,$ redis );
6882 }
6983
7084public function get (): ?array
7185 {
86+ if ($ this ->autoSetup ) {
87+ $ this ->setup ();
88+ }
89+
7290$ messageId ='> ' ;// will receive new messages
7391
7492if ($ this ->couldHavePendingMessages ) {
@@ -141,6 +159,10 @@ public function reject(string $id): void
141159
142160public function add (string $ body ,array $ headers ):void
143161 {
162+ if ($ this ->autoSetup ) {
163+ $ this ->setup ();
164+ }
165+
144166$ e =null ;
145167try {
146168$ added =$ this ->connection ->xadd ($ this ->stream ,'* ' , ['message ' =>json_encode (
@@ -161,5 +183,7 @@ public function setup(): void
161183 }catch (\RedisException $ e ) {
162184throw new TransportException ($ e ->getMessage (),0 ,$ e );
163185 }
186+
187+ $ this ->autoSetup =false ;
164188 }
165189}