- Notifications
You must be signed in to change notification settings - Fork3k
Description
Bug Report
Current Behavior
When you reuse a pipe with publish or publishReplay all uses of the pipe share the same instance of ConnectableObservable.
Which causes the connection to connect when the other one connected and gets data from the other source.
const pipeLine = pipe( map((x: number) => x * x), publish(),);const obs1 = src1.pipe(pipeLine);const obs2 = src2.pipe(pipeLine);obs1.subscribe(d => console.log('sub 1', d));obs2.subscribe(d => console.log('I should not be running', d));obs1.connect();This is what you get on the console
sub 1 1I should not be running 1sub 1 4I should not be running 4sub 1 9I should not be running 9sub 1 16I should not be running 16sub 1 25I should not be running 25obs2 which was never connected has been connected and it's also connected to the wrong source.
Reproduction
https://stackblitz.com/edit/rxjs-nrcyfu
Expected behavior
reusing pipes should not effect which source connects to which subscription.
Only the 1st subscription should run as the 2nd one hasn't been connected yet.
Also the 2nd subscription should not be subscribed to the first observable.
Workaround
Use a factory method instead.
const pipeLine = () => pipe( map((x: number) => x * x), publish(), );Possible Solution
for publishReplay
https://github.com/ReactiveX/rxjs/blob/259e5cd2e8149bc5db14cc0216afec7131e8aa11/src/internal/operators/publishReplay.ts
Instead of creating the replay subject on line 22 instead it can be created in the call back on line 24. That way each source observable would have it's own replay subject.
for publish
On line 61 we can return a factory that takes the source instead of calling multicast directly.