Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Complete Guide to Learn RxJS.

License

NotificationsYou must be signed in to change notification settings

manthanank/learn-rxjs

Repository files navigation

This repository contains the code examples for the RxJS library. RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

NPM PackageReleasesnpmnpmnpmnpm

Contents

RxJS (Reactive Extensions Library for JavaScript)

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

Installation

npm install rxjs

Importing

'rxjs'-forexample:import{ of}from'rxjs';'rxjs/operators'-forexample:import{ map}from'rxjs/operators';'rxjs/ajax'-forexample:import{ ajax}from'rxjs/ajax';'rxjs/fetch'-forexample:import{ fromFetch}from'rxjs/fetch';'rxjs/webSocket'-forexample:import{ webSocket}from'rxjs/webSocket';'rxjs/testing'-forexample:import{ TestScheduler}from'rxjs/testing';

Observables

Observables are declarative which provide support for passing messages between publishers and subscribers.

// pipe// subscribe
import{Observable}from'rxjs';constobservable=newObservable((subscriber)=>{subscriber.next(1);subscriber.next(2);subscriber.next(3);setTimeout(()=>{subscriber.next(4);subscriber.complete();},1000);});
import{Observable}from'rxjs';constobservable=newObservable((subscriber)=>{subscriber.next(1);subscriber.next(2);subscriber.next(3);setTimeout(()=>{subscriber.next(4);subscriber.complete();},1000);});console.log('just before subscribe');observable.subscribe({next(x){console.log('got value '+x);},error(err){console.error('something wrong occurred: '+err);},complete(){console.log('done');},});console.log('just after subscribe');

Observer

An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error, and complete.

// next// error// complete
constobserver={next:x=>console.log('Observer got a next value: '+x),error:err=>console.error('Observer got an error: '+err),complete:()=>console.log('Observer got a complete notification'),};
observable.subscribe(observer);

Operators

Operators are functions. There are two kinds of operators:

Pipeable Operators are the kind that can be piped to Observables using the syntaxobservableInstance.pipe(operator()).

Creation Operators are the other kind of operator, which can be called as standalone functions to create a new Observable.

Categories of operators

Creation Operators

Theses operators are used to create an observable. Some of the creation operators are:

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

ajax - It creates an observable for an Ajax request with either a request object with url, headers, etc or a string for a URL.

Example with Angular:

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{ajax}from'rxjs/ajax';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>ajax Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constgithubUsers=ajax('https://api.github.com/users?per_page=2');githubUsers.subscribe((res)=>console.log(res.status,res.response));}}bootstrapApplication(App);

Stackblitz Angular Example Link

Example with TypeScript:

import{ajax}from'rxjs/ajax';constgithubUsers=ajax('https://api.github.com/users?per_page=2');githubUsers.subscribe((res)=>console.log(res.status,res.response));

Stackblitz TypeScript Example Link

bindCallback -bindCallback operator is used to convert a callback-style function into an observable.

It allows you to work with asynchronous APIs that follow the Node.js-style callback pattern, where the last argument of a function is a callback function that is invoked with the result or error.

bindNodeCallback -bindNodeCallback is a function that converts a Node.js-style callback function into an Observable.

Example with Angular:

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{bindNodeCallback}from'rxjs/ajax';// Assume we have a Node.js-style callback function for file readingfunctionreadFileAsync(filePath:string,callback:(error:Error|null,data:string)=>void){// Some asynchronous file reading logic// Call the callback with the error (if any) and the file datasetTimeout(()=>{if(filePath==='/path/to/file.txt'){callback(null,'File content goes here');}else{callback(newError('File not found'),null);}},2000);}@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>bindNodeCallback Example</h1>    <div>{{ fileContent }}</div>  `,})exportclassAppimplementsOnInit{fileContent:string;ngOnInit(){constreadFile=bindNodeCallback(readFileAsync);constfilePath='/path/to/file.txt';constreadFile$=readFile(filePath);readFile$.subscribe((data:string)=>{this.fileContent=data;console.log('File content:',data);},(error:Error)=>{console.error('Error reading file:',error);});}}bootstrapApplication(App);

Stackblitz Angular Example Link

defer - Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{switchMap,defer,of,timer,merge}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`<h1>defer operator</h1>`,})exportclassAppimplementsOnInit{ngOnInit(){consts1=of(newDate());//will capture current date timeconsts2=defer(()=>of(newDate()));//will capture date time at the moment of subscriptionconsole.log(newDate());timer(2000).pipe(switchMap(_=>merge(s1,s2))).subscribe(console.log);}}bootstrapApplication(App);

empty - Replaced with the EMPTY constant or scheduled (e.g. scheduled([], scheduler)). Will be removed in v8.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{empty}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>empty operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsubscribe=empty().subscribe({next:()=>console.log('Next'),complete:()=>console.log('Complete!')});}}bootstrapApplication(App);

from - Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>from operator</h1>  `,})exportclassAppimplementsOnInit{data:any;ngOnInit(){constobj=from(['a','b','c','d']);obj.subscribe((res)=>{console.log(res);this.data=res;});}}bootstrapApplication(App);

Stackblitz Example Link

fromEvent - Creates an Observable that emits events of a specific type coming from the given event target.

Example:

import'zone.js/dist/zone';import{AfterViewInit,Component,ElementRef,ViewChild}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>fromEvent Example</h1>    <button #add>Add</button>    {{countVal}}    <table>      <tr *ngFor="let value of values">        <td>{{value}}</td>      </tr>    </table>  `,})exportclassAppimplementsAfterViewInit{data:any;count=0;values=[];countVal:any;  @ViewChild('add')add:ElementRef;ngAfterViewInit(){letcount=0;fromEvent(this.add.nativeElement,'click').subscribe((data)=>{console.log(count++);this.countVal=count++;console.log(this.countVal);this.count++;this.values.push(this.count);});}}bootstrapApplication(App);

Stackblitz Example Link

fromEventPattern - Creates an Observable from an arbitrary API for registering event handlers.

import'zone.js/dist/zone';import{AfterViewInit,Component,ElementRef,ViewChild}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEventPattern,Subject}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <button (click)="startListening()">Start Listening</button>    <button (click)="stopListening()">Stop Listening</button>  `,})exportclassAppimplementsAfterViewInit{privateeventListener:EventListenerOrEventListenerObject;privateeventSubject:Subject<Event>;ngOnInit(){this.eventSubject=newSubject<Event>();this.eventListener=(event:Event)=>this.eventSubject.next(event);}ngOnDestroy(){this.eventSubject.complete();}startListening(){constobservable=fromEventPattern(// Function to add the event listener(handler:EventListenerOrEventListenerObject)=>{document.addEventListener('customEvent',handler);},// Function to remove the event listener(handler:EventListenerOrEventListenerObject)=>{document.removeEventListener('customEvent',handler);});observable.subscribe((event:Event)=>{console.log('Event received:',event);// Handle the event as needed});this.eventSubject.subscribe((event:Event)=>{document.dispatchEvent(event);});}stopListening(){this.eventSubject.complete();}}bootstrapApplication(App);

generate - Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{generate}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>generate example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constresult=generate(0,x=>x<3,x=>x+1,x=>x);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

interval - Creates an Observable that emits sequential numbers every specified interval of time, on a specified SchedulerLike.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>interval operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constnumbers=interval(1000);consttakeFourNumbers=numbers.pipe(take(4));takeFourNumbers.subscribe(x=>console.log('Next: ',x));}}bootstrapApplication(App);

of - Converts the arguments to an observable sequence.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,of}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>of operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constobj=of('a','b','c','d');obj.subscribe((res)=>{console.log(res);});}}bootstrapApplication(App);

range - Creates an Observable that emits a sequence of numbers within a specified range.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{range}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>range operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){//emit 1-10 in sequenceconstsource=range(1,10);//output: 1,2,3,4,5,6,7,8,9,10constexample=source.subscribe(val=>console.log(val));}}bootstrapApplication(App);

Stackblitz Example Link

throwError - Creates an observable that will create an error instance and push it to the consumer as an error immediately upon subscription.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{throwError}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>throwError operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){leterrorCount=0;consterrorWithTimestamp$=throwError(()=>{consterror:any=newError(`This is error number${++errorCount}`);error.timestamp=Date.now();returnerror;});errorWithTimestamp$.subscribe({error:err=>console.log(err.timestamp,err.message)});errorWithTimestamp$.subscribe({error:err=>console.log(err.timestamp,err.message)});}}bootstrapApplication(App);

Stackblitz Example Link

timer - It is a creation operator used to create an observable that starts emitting the values after the timeout, and the value will keep increasing after each call.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{timer}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>timer operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=timer(1000);//output: 0constsubscribe=source.subscribe(val=>console.log(val));}}bootstrapApplication(App);

Stackblitz Example Link

iif - Checks a boolean at subscription time, and chooses between one of two observable sources

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{iif,of,interval}from'rxjs';import{mergeMap}from'rxjs/operators';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`  <h1>iif operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constr$=of('R');constx$=of('X');interval(1000).pipe(mergeMap(v=>iif(()=>v%4===0,r$,x$))).subscribe(console.log);}}bootstrapApplication(App);

Stackblitz Example Link

Back to top⤴️

Join Creation Operators

These operators are used to create an observable by combining multiple observables. Some of the join creation operators are:

  • combineLatest
  • concat
  • forkJoin
  • merge
  • partition
  • race
  • zip

combineLatest - Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{timer,combineLatest}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>combineLatest operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constfirstTimer=timer(0,1000);// emit 0, 1, 2... after every second, starting from nowconstsecondTimer=timer(500,1000);// emit 0, 1, 2... after every second, starting 0,5s from nowconstcombinedTimers=combineLatest([firstTimer,secondTimer]);combinedTimers.subscribe(value=>console.log(value));}}bootstrapApplication(App);

Stackblitz RxJS Example Link

Stackblitz Example Link

concat - Creates an output Observable which sequentially emits all values from the first given Observable and then moves on to the next.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,take,range,concat}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>combineLatest operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){consttimer=interval(1000).pipe(take(4));constsequence=range(1,10);constresult=concat(timer,sequence);result.subscribe((x)=>console.log(x));}}bootstrapApplication(App);

Stackblitz RxJS Example Link

Stackblitz Example Link

forkJoin - Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{forkJoin,of,timer}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>forkJoin operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constobservable=forkJoin({foo:of(1,2,3,4),bar:Promise.resolve(8),baz:timer(4000),});observable.subscribe({next:(value)=>console.log(value),complete:()=>console.log('This is how it ends!'),});}}bootstrapApplication(App);

Stackblitz RxJS Example Link

Stackblitz Example Link

merge - Creates an output Observable which concurrently emits all values from every given input Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{merge,fromEvent,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>merge operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');consttimer=interval(1000);constclicksOrTimer=merge(clicks,timer);clicksOrTimer.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz RxJS Example Link

Stackblitz Example Link

partition - Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,partition}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>partition operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constobservableValues=of(1,2,3,4,5,6);const[evens$,odds$]=partition(observableValues,value=>value%2===0);odds$.subscribe(x=>console.log('odds',x));evens$.subscribe(x=>console.log('evens',x));}}bootstrapApplication(App);

Stackblitz RxJS Example Link

Stackblitz Example Link

race - Returns an observable that mirrors the first source observable to emit an item.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,map,race}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>race operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constobs1=interval(7000).pipe(map(()=>'slow one'));constobs2=interval(3000).pipe(map(()=>'fast one'));constobs3=interval(5000).pipe(map(()=>'medium one'));}}bootstrapApplication(App);

zip - Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,zip,map}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>zip operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constage$=of(27,25,29);constname$=of('Foo','Bar','Beer');constisDev$=of(true,true,false);zip(age$,name$,isDev$).pipe(map(([age,name,isDev])=>({ age, name, isDev}))).subscribe(x=>console.log(x));}}bootstrapApplication(App);

Back to top⤴️

Transformation Operators

These operators are used to transform the values emitted by an observable. Some of the transformation operators are:

  • buffer
  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen
  • concatMap
  • concatMapTo
  • exhaust
  • exhaustMap
  • expand
  • groupBy
  • map
  • mapTo
  • mergeMap
  • mergeMapTo
  • mergeScan
  • pairwise
  • partition
  • pluck
  • scan
  • switchScan
  • switchMap
  • switchMapTo
  • window
  • windowCount
  • windowTime
  • windowToggle
  • windowWhen

buffer - Buffers the source Observable values until closingNotifier emits.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,interval,buffer}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>buffer operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constintervalEvents=interval(1000);constbuffered=intervalEvents.pipe(buffer(clicks));buffered.subscribe(x=>console.log(x));}}bootstrapApplication(App);

bufferCount - Buffers the source Observable values until the size hits the maximum bufferSize given.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,bufferCount}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>bufferCount operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constbuffered=clicks.pipe(bufferCount(2));buffered.subscribe(x=>console.log(x));}}bootstrapApplication(App);

bufferTime - Buffers the source Observable values for a specific time period.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,bufferTime}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>bufferTime operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constbuffered=clicks.pipe(bufferTime(1000));buffered.subscribe(x=>console.log(x));}}bootstrapApplication(App);

bufferToggle - Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,interval,bufferToggle,EMPTY}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>bufferToggle operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constopenings=interval(1000);constbuffered=clicks.pipe(bufferToggle(openings,i=>i%2 ?interval(500) :EMPTY));buffered.subscribe(x=>console.log(x));}}bootstrapApplication(App);

bufferWhen - Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,bufferWhen,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>bufferWhen operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constbuffered=clicks.pipe(bufferWhen(()=>interval(1000+Math.random()*4000)));buffered.subscribe(x=>console.log(x));}}bootstrapApplication(App);

concatMap - Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,concatMap,interval,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>concatMap operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(concatMap(ev=>interval(1000).pipe(take(4))));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

concatMapTo - Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,concatMapTo,interval,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>concatMapTo operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(concatMapTo(interval(1000).pipe(take(4))));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

exhaust - Renamed to exhaustAll.

exhaustMap - Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,exhaustMap,interval,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>exhaustMap operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(exhaustMap(()=>interval(1000).pipe(take(5))));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

expand - Recursively projects each source value to an Observable which is merged in the output Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,expand,of,delay,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>expand operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constpowersOfTwo=clicks.pipe(map(()=>1),expand(x=>of(2*x).pipe(delay(1000))),take(10));powersOfTwo.subscribe(x=>console.log(x));}}bootstrapApplication(App);

groupBy - Groups the elements of an observable sequence according to a specified key selector function.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,groupBy,mergeMap,reduce}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>groupBy operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of({id:1,name:'JavaScript'},{id:2,name:'Parcel'},{id:2,name:'webpack'},{id:1,name:'TypeScript'},{id:3,name:'TSLint'}).pipe(groupBy(p=>p.id),mergeMap(group$=>group$.pipe(reduce((acc,cur)=>[...acc,cur],[])))).subscribe(p=>console.log(p));}}bootstrapApplication(App);

map - Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,map}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>map operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constdata=from([{id:1,},{id:2,},{id:3,},{id:4,},{id:5,},]);data.pipe(map((data)=>data.id)).subscribe((res)=>console.log(res));}}bootstrapApplication(App);

mapTo - Emits the given constant value on the output Observable every time the source Observable emits a value.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,mapTo}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>mapTo operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constgreetings=clicks.pipe(mapTo('Hi'));greetings.subscribe(x=>console.log(x));}}bootstrapApplication(App);

mergeMap - Projects each source value to an Observable which is merged in the output Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,mergeMap,interval,map}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>mergeMap operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constletters=of('a','b','c');constresult=letters.pipe(mergeMap(x=>interval(1000).pipe(map(i=>x+i))));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

mergeMapTo - Projects each source value to the same Observable which is merged multiple times in the output Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,mergeMapTo,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>mergeMapTo operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(mergeMapTo(interval(1000)));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

mergeScan - Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,mergeScan,of}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>mergeScan operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclick$=fromEvent(document,'click');constone$=click$.pipe(map(()=>1));constseed=0;constcount$=one$.pipe(mergeScan((acc,one)=>of(acc+one),seed));count$.subscribe(x=>console.log(x));}}bootstrapApplication(App);

pairwise - Groups pairs of consecutive emissions together and emits them as an array of two values.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,pairwise,map}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>pairwise operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent<PointerEvent>(document,'click');constpairs=clicks.pipe(pairwise());constdistance=pairs.pipe(map(([first,second])=>{constx0=first.clientX;consty0=first.clientY;constx1=second.clientX;consty1=second.clientY;returnMath.sqrt(Math.pow(x0-x1,2)+Math.pow(y0-y1,2));}));distance.subscribe(x=>console.log(x));}}bootstrapApplication(App);

partition - Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,partition}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>partition operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constobservableValues=of(1,2,3,4,5,6);const[evens$,odds$]=partition(observableValues,value=>value%2===0);odds$.subscribe(x=>console.log('odds',x));evens$.subscribe(x=>console.log('evens',x));}}bootstrapApplication(App);

pluck - Maps each source value to its specified nested property.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,pluck,toArray}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>Pluck Example</h1>    <div>      <div>        <ul *ngFor="let item of data1">          <li>{{item}}</li>        </ul>      </div>      <div>        <ul *ngFor="let item of data2">          <li>{{item}}</li>        </ul>      </div>    </div>  `,})exportclassAppimplementsOnInit{users=[{name:'abc',age:'25',address:{state:'DL',country:'India',},},{name:'efg',age:'25',address:{state:'MH',country:'India',},},{name:'lmn',age:'25',address:{state:'KA',country:'India',},},{name:'pqr',age:'25',address:{state:'KL',country:'India',},},{name:'xyz',age:'25',address:{state:'GA',country:'India',},},];data1:any;data2:any;constructor(){}ngOnInit(){from(this.users).pipe(pluck('name'),toArray()).subscribe((res)=>{console.log(res);this.data1=res;});from(this.users).pipe(pluck('address','state'),toArray()).subscribe((res)=>{console.log(res);this.data2=res;});}}bootstrapApplication(App);

scan - Useful for encapsulating and managing state. Applies an accumulator (or "reducer function") to each value from the source after an initial state is established -- either via a seed value (second argument), or from the first value from the source.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,scan,map}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>scan operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constnumbers$=of(1,2,3);numbers$.pipe(// Get the sum of the numbers coming in.scan((total,n)=>total+n),// Get the average by dividing the sum by the total number// received so far (which is 1 more than the zero-based index).map((sum,index)=>sum/(index+1))).subscribe(console.log);}}bootstrapApplication(App);

Stackblitz Example Link

switchScan - Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, emitting values only from the most recently returned Observable.

switchMap - Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,of,switchMap}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>switchMap operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constdata=from(['abc','xyz','efg','pqr','lmn']);data.pipe(switchMap((data)=>this.getData(data))).subscribe((res)=>console.log(res));}getData(data){returnof('name is'+data);}}bootstrapApplication(App);

switchMapTo - Projects each source value to the same Observable which is flattened multiple times with switchMap in the output Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,switchMapTo,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>switchMapTo operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(switchMapTo(interval(1000)));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

window - Branch out the source Observable values as a nested Observable whenever windowBoundaries emits.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,interval,window,map,take,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>window operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constsec=interval(1000);constresult=clicks.pipe(window(sec),map(win=>win.pipe(take(2))),// take at most 2 emissions from each windowmergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

windowCount - Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize values.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,windowCount,map,skip,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowCount operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(windowCount(3),map(win=>win.pipe(skip(1))),// skip first of every 3 clicksmergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example 1 Link

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,windowCount,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowCount operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(windowCount(2,3),mergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example 2 Link

windowTime - Branch out the source Observable values as a nested Observable periodically in time.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,windowTime,map,take,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowTime operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(windowTime(1000),map(win=>win.pipe(take(2))),// take at most 2 emissions from each windowmergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example 1 Link

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,windowTime,map,take,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowTime operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(windowTime(1000,5000),map(win=>win.pipe(take(2))),// take at most 2 emissions from each windowmergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example 2 Link

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,windowTime,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowTime operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(windowTime(1000,5000,2),// take at most 2 emissions from each windowmergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example 3 Link

windowToggle - Branch out the source Observable values as a nested Observable starting from an emission from openings and ending when the output of closingSelector emits.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,interval,windowToggle,EMPTY,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowToggle operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constopenings=interval(1000);constresult=clicks.pipe(windowToggle(openings,i=>i%2 ?interval(500) :EMPTY),mergeAll());result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

windowWhen - Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,windowWhen,interval,map,take,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>windowWhen operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(windowWhen(()=>interval(1000+Math.random()*4000)),map(win=>win.pipe(take(2))),// take at most 2 emissions from each windowmergeAll()// flatten the Observable-of-Observables);result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example 1 Link

Back to top⤴️

Filtering Operators

These operators allow you to filter the values emitted by Observables. Some of the most commonly used filtering operators are:

  • audit
  • auditTime
  • debounce
  • debounceTime
  • distinct
  • distinctUntilChanged
  • distinctUntilKeyChanged
  • elementAt
  • filter
  • first
  • ignoreElements
  • last
  • sample
  • sampleTime
  • single
  • skip
  • skipLast
  • skipUntil
  • skipWhile
  • take
  • takeLast
  • takeUntil
  • takeWhile
  • throttle
  • throttleTime

audit - Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,audit,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>audit operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(audit(ev=>interval(1000)));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

auditTime - Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,auditTime}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>auditTime operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(auditTime(1000));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

debounce - Emits a notification from the source Observable only after a particular time span determined by another Observable has passed without another source emission.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,scan,debounce,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>debounce operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(scan(i=>++i,1),debounce(i=>interval(200*i)));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

debounceTime - Emits a notification from the source Observable only after a particular time span has passed without another source emission.

import'zone.js/dist/zone';import{AfterViewInit,Component,ElementRef,OnInit,ViewChild,}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,debounceTime}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>debounceTime Example</h1>    <input type="text" #myInput />    <p *ngIf="requestedData != null">Data: {{requestedData}}</p>  `,})exportclassAppimplementsOnInit,AfterViewInit{requestedData=null;  @ViewChild('myInput')myInput:ElementRef;constructor(){}ngOnInit(){}ngAfterViewInit(){constsearchItem=fromEvent<any>(this.myInput.nativeElement,'keyup').pipe(map((event)=>{event.target.value;}),debounceTime(1000));searchItem.subscribe((res)=>{console.log(res);this.requestedData=res;setTimeout(()=>{this.requestedData=null;},2000);});}}bootstrapApplication(App);

distinct - Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,distinct}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>distinct operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of(1,1,2,2,2,1,2,3,4,3,2,1).pipe(distinct()).subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

distinctUntilChanged - Returns a result Observable that emits all values pushed by the source observable if they are distinct in comparison to the last value the result observable emitted.

import'zone.js/dist/zone';import{AfterViewInit,Component,ElementRef,OnInit,ViewChild,}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,debounceTime,distinctUntilChanged}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>distinctUntilChanged Example</h1>    <input type="text" #myInput />    <p *ngIf="requestedData != null">Data: {{requestedData}}</p>  `,})exportclassAppimplementsOnInit,AfterViewInit{requestedData=null;  @ViewChild('myInput')myInput:ElementRef;constructor(){}ngOnInit(){}ngAfterViewInit(){constsearchItem=fromEvent<any>(this.myInput.nativeElement,'keyup').pipe(map((event)=>{event.target.value;}),debounceTime(500),distinctUntilChanged());searchItem.subscribe((res)=>{console.log(res);this.requestedData=res;setTimeout(()=>{this.requestedData=null;},2000);});}}bootstrapApplication(App);

distinctUntilKeyChanged - Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,distinctUntilKeyChanged}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>distinctUntilKeyChanged Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of({age:4,name:'Foo'},{age:7,name:'Bar'},{age:5,name:'Foo'},{age:6,name:'Foo'}).pipe(distinctUntilKeyChanged('name')).subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz RxJS Example Link

Stackblitz Angular Example Link

elementAt - Emits the single value at the specified index in a sequence of emissions from the source Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,elementAt}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>elementAt Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(elementAt(2));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

filter - Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,filter}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>filter Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constdiv=document.createElement('div');div.style.cssText='width: 200px; height: 200px; background: #09c;';document.body.appendChild(div);constclicks=fromEvent(document,'click');constclicksOnDivs=clicks.pipe(filter(ev=>(<HTMLElement>ev.target).tagName==='DIV'));clicksOnDivs.subscribe(x=>console.log(x));}}bootstrapApplication(App);

first - Emits only the first value (or the first value that meets some condition) emitted by the source Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,first}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>first Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(first());result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

ignoreElements - Ignores all items emitted by the source Observable and only passes calls of complete or error.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,ignoreElements}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>ignoreElements Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of('you','talking','to','me').pipe(ignoreElements()).subscribe({next:word=>console.log(word),error:err=>console.log('error:',err),complete:()=>console.log('the end'),});}}bootstrapApplication(App);

last - Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,last}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>last Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=from(['x','y','z']);constresult=source.pipe(last());result.subscribe(value=>console.log(`Last alphabet:${value}`));}}bootstrapApplication(App);

sample - Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,interval,sample}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>sample Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constseconds=interval(1000);constclicks=fromEvent(document,'click');constresult=seconds.pipe(sample(clicks));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

sampleTime - Emits the most recently emitted value from the source Observable within periodic time intervals.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,sampleTime}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>sampleTime Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(sampleTime(1000));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

single - Returns an observable that asserts that only one value is emitted from the observable that matches the predicate. If no predicate is provided, then it will assert that the observable only emits one value.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,single}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>single Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource1=of({name:'Ben'},{name:'Tracy'},{name:'Laney'},{name:'Lily'});source1.pipe(single(x=>x.name.startsWith('B'))).subscribe(x=>console.log(x));// Emits 'Ben'constsource2=of({name:'Ben'},{name:'Tracy'},{name:'Bradley'},{name:'Lincoln'});source2.pipe(single(x=>x.name.startsWith('B'))).subscribe({error:err=>console.error(err)});// Error emitted: SequenceError('Too many values match')constsource3=of({name:'Laney'},{name:'Tracy'},{name:'Lily'},{name:'Lincoln'});source3.pipe(single(x=>x.name.startsWith('B'))).subscribe({error:err=>console.error(err)});// Error emitted: NotFoundError('No values match')}}bootstrapApplication(App);

skip -Returns an Observable that skips the first count items emitted by the source Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,skip}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>skip Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){// emit every half secondconstsource=interval(500);// skip the first 10 emitted valuesconstresult=source.pipe(skip(10));result.subscribe(value=>console.log(value));}}bootstrapApplication(App);

skipLast - Skip a specified number of values before the completion of an observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,skipLast}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>skipLast Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constnumbers=of(1,2,3,4,5);constskipLastTwo=numbers.pipe(skipLast(2));skipLastTwo.subscribe(x=>console.log(x));}}bootstrapApplication(App);

skipUntil - Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,fromEvent,skipUntil}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>skipUntil Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constintervalObservable=interval(1000);constclick=fromEvent(document,'click');constemitAfterClick=intervalObservable.pipe(skipUntil(click));// clicked at 4.6s. output: 5...6...7...8........ or// clicked at 7.3s. output: 8...9...10..11.......emitAfterClick.subscribe(value=>console.log(value));}}bootstrapApplication(App);

skipWhile - Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,skipWhile}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>skipWhile Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=from(['Green Arrow','SuperMan','Flash','SuperGirl','Black Canary'])// Skip the heroes until SuperGirlconstexample=source.pipe(skipWhile(hero=>hero!=='SuperGirl'));// output: SuperGirl, Black Canaryexample.subscribe(femaleHero=>console.log(femaleHero));}}bootstrapApplication(App);

take - Emits only the first count values emitted by the source Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,interval,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>take Example</h1>  `,})exportclassAppimplementsOnInit{data=['a','b','c','d'];ngOnInit(){constdatas=from(this.data);datas.pipe(take(3)).subscribe((res)=>{console.log(res);});}}bootstrapApplication(App);

takeLast - Waits for the source to complete, then emits the last N values from the source, as specified by the count argument.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{range,takeLast}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>takeLast Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constmany=range(1,100);constlastThree=many.pipe(takeLast(3));lastThree.subscribe(x=>console.log(x));}}bootstrapApplication(App);

takeUntil - Emits the values emitted by the source Observable until a notifier Observable emits a value.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,fromEvent,takeUntil}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>takeUntil Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=interval(1000);constclicks=fromEvent(document,'click');constresult=source.pipe(takeUntil(clicks));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

takeWhile - Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,takeWhile}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>takeWhile Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent<PointerEvent>(document,'click');constresult=clicks.pipe(takeWhile(ev=>ev.clientX>200));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

throttle - Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,throttle,interval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>throttle Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(throttle(()=>interval(1000)));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

throttleTime - Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,throttleTime}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>throttleTime Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constresult=clicks.pipe(throttleTime(1000));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Back to top⤴️

Join Operators

The join operators are used to combine the emissions of multiple Observables. Some of the join operators are:

  • combineLatestAll
  • concatAll
  • exhaustAll
  • mergeAll
  • switchAll
  • startWith
  • withLatestFrom

combineLatestAll - Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,interval,take,combineLatestAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>combineLatestAll Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');consthigherOrder=clicks.pipe(map(()=>interval(Math.random()*2000).pipe(take(3))),take(2));constresult=higherOrder.pipe(combineLatestAll());result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

concatAll - Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,interval,take,concatAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>concatAll Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');consthigherOrder=clicks.pipe(map(()=>interval(1000).pipe(take(4))));constfirstOrder=higherOrder.pipe(concatAll());firstOrder.subscribe(x=>console.log(x));}}bootstrapApplication(App);

exhaustAll - Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,interval,take,exhaustAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>exhaustAll Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');consthigherOrder=clicks.pipe(map(()=>interval(1000).pipe(take(5))));constresult=higherOrder.pipe(exhaustAll());result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

mergeAll - Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,map,interval,mergeAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>mergeAll Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');consthigherOrder=clicks.pipe(map(()=>interval(1000)));constfirstOrder=higherOrder.pipe(mergeAll());firstOrder.subscribe(x=>console.log(x));}}bootstrapApplication(App);

switchAll - Converts a higher-order Observable into a first-order Observable producing values only from the most recent observable sequence

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,tap,map,interval,switchAll}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>switchAll Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click').pipe(tap(()=>console.log('click')));constsource=clicks.pipe(map(()=>interval(1000)));source.pipe(switchAll()).subscribe(x=>console.log(x));}}bootstrapApplication(App);

startWith - Returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{timer,map,startWith}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>startWith Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){timer(1000).pipe(map(()=>'timer emit'),startWith('timer start')).subscribe(x=>console.log(x));}}bootstrapApplication(App);

withLatestFrom - Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,interval,withLatestFrom}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>withLatestFrom Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');consthigherOrder=clicks.pipe(map(()=>interval(1000).pipe(take(5))));constresult=higherOrder.pipe(exhaustAll());result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

Multicasting Operators

These operators are used to share the observable execution among multiple subscribers. Some of the multicasting operators are:

  • multicast
  • publish
  • publishBehavior
  • publishLast
  • publishReplay
  • share

multicast - Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

Link

publish - Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{zip,interval,of,map,publish,merge,tap}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>publish Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource$=zip(interval(2000),of(1,2,3,4,5,6,7,8,9)).pipe(map(([,number])=>number));source$.pipe(publish(multicasted$=>merge(multicasted$.pipe(tap(x=>console.log('Stream 1:',x))),multicasted$.pipe(tap(x=>console.log('Stream 2:',x))),multicasted$.pipe(tap(x=>console.log('Stream 3:',x)))))).subscribe();}}bootstrapApplication(App);

publishBehavior - Creates a ConnectableObservable that utilizes a BehaviorSubject.

Link

publishLast - Returns a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{ConnectableObservable,interval,publishLast,tap,take}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>publishLast Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constconnectable=<ConnectableObservable<number>>interval(1000).pipe(tap(x=>console.log('side effect',x)),take(3),publishLast());connectable.subscribe({next:x=>console.log('Sub. A',x),error:err=>console.log('Sub. A Error',err),complete:()=>console.log('Sub. A Complete')});connectable.subscribe({next:x=>console.log('Sub. B',x),error:err=>console.log('Sub. B Error',err),complete:()=>console.log('Sub. B Complete')});connectable.connect();}}bootstrapApplication(App);

publishReplay - Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.

Link

share - Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot. This is an alias for multicast(() => new Subject()), refCount().

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,tap,map,take,share}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>share Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=interval(1000).pipe(tap(x=>console.log('Processing: ',x)),map(x=>x*x),take(6),share());source.subscribe(x=>console.log('subscription 1: ',x));source.subscribe(x=>console.log('subscription 2: ',x));}}bootstrapApplication(App);

Back to top⤴️

Error Handling Operators

These operators are used to handle errors that occur in the observable sequence. Some of the error handling operators are:

  • catchError
  • retry
  • retryWhen

catchError - Catches errors on the observable to be handled by returning a new observable or throwing an error.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,map,catchError}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>catchError Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of(1,2,3,4,5).pipe(map(n=>{if(n===4){throw'four!';}returnn;}),catchError(err=>of('I','II','III','IV','V'))).subscribe(x=>console.log(x));}}bootstrapApplication(App);

retry - Returns an Observable that mirrors the source Observable with the exception of an error.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,mergeMap,throwError,of,retry}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>retry Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=interval(1000);constresult=source.pipe(mergeMap(val=>val>5 ?throwError(()=>'Error!') :of(val)),retry(2)// retry 2 times on error);result.subscribe({next:value=>console.log(value),error:err=>console.log(`${err}: Retried 2 times then quit!`)});// Output:// 0..1..2..3..4..5..// 0..1..2..3..4..5..// 0..1..2..3..4..5..// 'Error!: Retried 2 times then quit!'}}bootstrapApplication(App);

retryWhen - Returns an Observable that mirrors the source Observable with the exception of an error. If the source Observable calls error, this method will emit the Throwable that caused the error to the ObservableInput returned from notifier. If that Observable calls complete or error then this method will call complete or error on the child subscription. Otherwise this method will resubscribe to the source Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,map,retryWhen,tap,delayWhen,timer}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>retryWhen Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=interval(1000);constresult=source.pipe(map(value=>{if(value>5){// error will be picked up by retryWhenthrowvalue;}returnvalue;}),retryWhen(errors=>errors.pipe(// log error messagetap(value=>console.log(`Value${value} was too high!`)),// restart in 5 secondsdelayWhen(value=>timer(value*1000)))));result.subscribe(value=>console.log(value));// results:// 0// 1// 2// 3// 4// 5// 'Value 6 was too high!'// - Wait 5 seconds then repeat}}bootstrapApplication(App);

Back to top⤴️

Utility Operators

These operators are used to perform utility operations on the observable sequence. Some of the utility operators are:

  • tap
  • delay
  • delayWhen
  • dematerialize
  • materialize
  • observeOn
  • subscribeOn
  • timeInterval
  • timestamp
  • timeout
  • timeoutWith
  • toArray

tap - Used to perform side-effects for notifications from the source observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,Subscription,tap}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>tap Example</h1>  `,})exportclassAppimplementsOnInit{subscription:Subscription;ngOnInit(){letcount=interval(1000);this.subscription=count.pipe(tap((res)=>{console.log('Before tap',res);if(res==5){this.subscription.unsubscribe();}})).subscribe((res)=>{console.log('After tap',res);});}}bootstrapApplication(App);

delay - Delays the emission of items from the source Observable by a given timeout or until a given Date.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,delay}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>delay Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constdelayedClicks=clicks.pipe(delay(1000));// each click emitted after 1 seconddelayedClicks.subscribe(x=>console.log(x));}}bootstrapApplication(App);

delayWhen - Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,delay}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>delayWhen Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constdelayedClicks=clicks.pipe(delayWhen(()=>interval(Math.random()*5000)));delayedClicks.subscribe(x=>console.log(x));}}bootstrapApplication(App);

dematerialize - Converts an Observable of ObservableNotification objects into the emissions that they represent.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{NextNotification,ErrorNotification,of,dematerialize}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>dematerialize Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constnotifA:NextNotification<string>={kind:'N',value:'A'};constnotifB:NextNotification<string>={kind:'N',value:'B'};constnotifE:ErrorNotification={kind:'E',error:newTypeError('x.toUpperCase is not a function')};constmaterialized=of(notifA,notifB,notifE);constupperCase=materialized.pipe(dematerialize());upperCase.subscribe({next:x=>console.log(x),error:e=>console.error(e)});}}bootstrapApplication(App);

materialize - Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,materialize,map}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>materialize Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constletters=of('a','b',13,'d');constupperCase=letters.pipe(map((x:any)=>x.toUpperCase()));constmaterialized=upperCase.pipe(materialize());materialized.subscribe(x=>console.log(x));}}bootstrapApplication(App);

observeOn - Re-emits all notifications from source Observable with specified scheduler.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,observeOn,animationFrameScheduler}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>observeOn Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsomeDiv=document.createElement('div');someDiv.style.cssText='width: 200px;background: #09c';document.body.appendChild(someDiv);constintervals=interval(10);// Intervals are scheduled// with async scheduler by default...intervals.pipe(observeOn(animationFrameScheduler)// ...but we will observe on animationFrame)// scheduler to ensure smooth animation..subscribe(val=>{someDiv.style.height=val+'px';});}}bootstrapApplication(App);

subscribeOn - Asynchronously subscribes Observers to this Observable on the specified SchedulerLike.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,merge}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>subscribeOn Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){consta=of(1,2,3);constb=of(4,5,6);merge(a,b).subscribe(console.log);}}bootstrapApplication(App);

timeInterval - Emits an object containing the current value, and the time that has passed between emitting the current value and the previous value, which is calculated by using the provided scheduler's now() method to retrieve the current time at each emission, then calculating the difference. The scheduler defaults to asyncScheduler, so by default, the interval will be in milliseconds.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,timeInterval}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>timeInterval Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constseconds=interval(1000);seconds.pipe(timeInterval()).subscribe(value=>console.log(value));}}bootstrapApplication(App);

timestamp - Attaches a timestamp to each item emitted by an observable indicating when it was emitted

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,timestamp}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>timestamp Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclickWithTimestamp=fromEvent(document,'click').pipe(timestamp());// Emits data of type { value: PointerEvent, timestamp: number }clickWithTimestamp.subscribe(data=>{console.log(data);});}}bootstrapApplication(App);

timeout - Errors if Observable does not emit a value in given time span.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,delay}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>timeout Example</h1>  `,})classCustomTimeoutErrorextendsError{constructor(){super('It was too slow');this.name='CustomTimeoutError';}}exportclassAppimplementsOnInit{ngOnInit(){constslow$=interval(900);slow$.pipe(timeout({each:1000,with:()=>throwError(()=>newCustomTimeoutError())})).subscribe({error:console.error});}}bootstrapApplication(App);

timeoutWith - When the passed timespan elapses before the source emits any given value, it will unsubscribe from the source, and switch the subscription to another observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,timeoutWith}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>timeoutWith Example</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constslow$=interval(1000);constfaster$=interval(500);slow$.pipe(timeoutWith(900,faster$)).subscribe(console.log);}}bootstrapApplication(App);

toArray - Collects all source emissions and emits them as an array when the source completes.

Example:

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{from,interval,of,Subscription,take,toArray}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>toArray Example</h1>  `,})exportclassAppimplementsOnInit{subscription:Subscription;data=[{id:1,name:'abc'},{id:2,name:'efg'},{id:3,name:'lmn'},{id:4,name:'pqr'},{id:5,name:'xyz'},];ngOnInit(){//eaxample 1constcount=interval(1000);this.subscription=count.pipe(take(5),toArray()).subscribe((res)=>{console.log(res);});//exampe 2constdatas=from(this.data);this.subscription=datas.pipe(toArray()).subscribe((res)=>{console.log(res);});//example 3constdata=of('a','b','c');this.subscription=data.pipe(toArray()).subscribe((res)=>{console.log(res);});}}bootstrapApplication(App);

Stackblitz Example Link

Back to top⤴️

Conditional and Boolean Operators

These operators are used to evaluate conditions and return boolean values. Some of the operators are:

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

defaultIfEmpty - Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,takeUntil,interval,defaultIfEmpty}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>defaultIfEmpty operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constclicks=fromEvent(document,'click');constclicksBeforeFive=clicks.pipe(takeUntil(interval(5000)));constresult=clicksBeforeFive.pipe(defaultIfEmpty('no clicks'));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

every - Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,every}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>every operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of(1,2,3,4,5,6).pipe(every(x=>x<5)).subscribe(x=>console.log(x));// -> false}}bootstrapApplication(App);

find - Emits only the first value emitted by the source Observable that meets some condition.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,find}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>find operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constdiv=document.createElement('div');div.style.cssText='width: 200px; height: 200px; background: #09c;';document.body.appendChild(div);constclicks=fromEvent(document,'click');constresult=clicks.pipe(find(ev=>(<HTMLElement>ev.target).tagName==='DIV'));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

findIndex - Emits only the index of the first value emitted by the source Observable that meets some condition.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{fromEvent,findIndex}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>findIndex operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constdiv=document.createElement('div');div.style.cssText='width: 200px; height: 200px; background: #09c;';document.body.appendChild(div);constclicks=fromEvent(document,'click');constresult=clicks.pipe(findIndex(ev=>(<HTMLElement>ev.target).tagName==='DIV'));result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

isEmpty - Emits false if the input Observable emits any values, or emits true if the input Observable completes without emitting any values.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{Subject,isEmpty}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>isEmpty operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=newSubject<string>();constresult=source.pipe(isEmpty());source.subscribe(x=>console.log(x));result.subscribe(x=>console.log(x));source.next('a');source.next('b');source.next('c');source.complete();}}bootstrapApplication(App);

Back to top⤴️

Mathematical and Aggregate Operators

Theses operators are used to perform mathematical operations on the source observable. Some of the operators are:

  • count
  • max
  • min
  • reduce

count - Counts the number of emissions on the source and emits that number when the source completes.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{interval,fromEvent,takeUntil,count}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>count operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constseconds=interval(1000);constclicks=fromEvent(document,'click');constsecondsBeforeClick=seconds.pipe(takeUntil(clicks));constresult=secondsBeforeClick.pipe(count());result.subscribe(x=>console.log(x));}}bootstrapApplication(App);

max - The max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,max}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`    <h1>max operator</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of(5,4,7,2,8).pipe(max()).subscribe(x=>console.log(x));}}bootstrapApplication(App);

min - The min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.

Example of Min in RxJS:

import{of}from'rxjs';import{min}from'rxjs/operators';constsource=of(5,4,7,2,8);constexample=source.pipe(min());example.subscribe(x=>console.log(x));// Output: 2

Stackblitz Example Link

Example of Min in Angular:

import'zone.js/dist/zone';import{Component,OnInit}from'@angular/core';import{CommonModule}from'@angular/common';import{bootstrapApplication}from'@angular/platform-browser';import{of,min}from'rxjs';@Component({selector:'my-app',standalone:true,imports:[CommonModule],template:`      <h1>Example of Min in Angular</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){of(5,4,7,2,8).pipe(min()).subscribe(x=>console.log(x));}}bootstrapApplication(App);

Stackblitz Example Link

reduce - Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.

Example of Reduce in RxJS:

import{of}from'rxjs';import{reduce}from'rxjs/operators';constsource=of(1,2,3,4);constexample=source.pipe(reduce((acc,val)=>acc+val,0));example.subscribe(console.log);// Output: 10

Stackblitz Example Link

Example of Reduce in Angular:

import{Component,OnInit}from'@angular/core';import{bootstrapApplication}from'@angular/platform-browser';import{of,reduce}from'rxjs';import'zone.js';@Component({selector:'app-root',standalone:true,template:`      <h1>Example of Reduce in Angular</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){constsource=of(1,2,3,4);constexample=source.pipe(reduce((acc,val)=>acc+val,0));example.subscribe(console.log);}}bootstrapApplication(App);

Stackblitz Example Link

Back to top⤴️

Subscription

A Subscription is an object that represents a disposable resource, usually the execution of an Observable.

Example of Subscription in RxJS:

import{interval}from'rxjs';constobservable=interval(1000);constsubscription=observable.subscribe(x=>console.log(x));subscription.unsubscribe();

Sthackblitz Example Link

Example of Subscription in Angular:

import{Component,OnInit}from'@angular/core';import{bootstrapApplication}from'@angular/platform-browser';import{interval}from'rxjs';import'zone.js';@Component({selector:'app-root',standalone:true,template:`      <h1>subscription in Angular</h1>  `,})exportclassAppimplementsOnInit{subscription:any;ngOnInit(){constobservable=interval(1000);this.subscription=observable.subscribe((x)=>console.log(x));}ngOnDestroy(){this.subscription.unsubscribe();}}bootstrapApplication(App);

Stackblitz Example Link

Back to top⤴️

Subject

An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers.

Example of Subject in RxJS:

import{Subject}from'rxjs';constsubject=newSubject<number>();subject.subscribe({next:(v)=>console.log(`observerA:${v}`)});subject.subscribe({next:(v)=>console.log(`observerB:${v}`)});subject.next(1);subject.next(2);

Stackblitz Example Link

Example of Subject in Angular:

import{Component,OnInit}from'@angular/core';import{bootstrapApplication}from'@angular/platform-browser';import{Subject}from'rxjs';import'zone.js';@Component({selector:'app-root',standalone:true,template:`      <h1>Example of Subject in Angular</h1>  `,})exportclassAppimplementsOnInit{subject=newSubject<number>();ngOnInit(){this.subject.subscribe({next:(v)=>console.log(`observerA:${v}`),});this.subject.subscribe({next:(v)=>console.log(`observerB:${v}`),});this.subject.next(1);this.subject.next(2);}}bootstrapApplication(App);

Stackblitz Example Link

Back to top⤴️

Scheduler

A Scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.

Example of Scheduler in RxJS:

import{asyncScheduler}from'rxjs';console.log('before');asyncScheduler.schedule(()=>{console.log('asyncScheduler');});console.log('after');

Stackblitz Example Link

Example of Scheduler in Angular:

import{Component,OnInit}from'@angular/core';import{bootstrapApplication}from'@angular/platform-browser';import{asyncScheduler}from'rxjs';import'zone.js';@Component({selector:'app-root',standalone:true,template:`      <h1>Example of Scheduler in Angular</h1>  `,})exportclassAppimplementsOnInit{ngOnInit(){console.log('before');asyncScheduler.schedule(()=>{console.log('asyncScheduler');});console.log('after');}}bootstrapApplication(App);

Stackblitz Example Link

Back to top⤴️

Connect with me

Support

If you like this learning repository and find it useful, consider buying me a coffee or sponsoring me through the GitHub Sponsor. Your support will help me to continue and bring more exciting projects. Thank you!

Buy Me A Coffee

Sponsor Me


Show your support by 🌟 the repository.



[8]ページ先頭

©2009-2025 Movatter.jp