Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Pacharapol Withayasakpunt
Pacharapol Withayasakpunt

Posted on

     

Observables are promises on steroids

I am usingobservable-fns (which can also work withthreads.js). Not sure how powerfulRxJS andRxJava can be...

Still missing one important feature though, cancellability.

I am working on SQLite-SQLite and SQLite-MongoDB syncing.

/** * DbSqlite needs three basic columns *  * [uid]         TEXT PRIMARY KEY, * date_created  DATETIME DEFAULT CURRENT_TIMESTAMP, * date_sync     DATETIME, */functionreplicate(from:DbSqlite,to:DbSqlite,uids?:string[]){returnnewObservable<{message:stringpercent?:number}>((obs)=>{(async()=>{constgetDateSync=(r:any)=>Math.max(r.date_created,r.date_sync||0)awaitto.transaction(()=>{to.sql.db.parallelize(()=>{constsyncTable=(tableName:string)=>{from.sql.each(/*sql*/`            SELECT * FROM${safeColumnName(tableName)}`,(_:any,r1:any)=>{constuid=r1.uidif(uids&&!uids.includes(uid)){return}to.sql.db.get(/*sql*/`              SELECT date_created, date_sync FROM${safeColumnName(tableName)} WHERE [uid] = @uid`,{uid},(_:any,r2:any)=>{constupdateSync=()=>{r1.date_sync=+newDate()to.sql.db.run(/*sql*/`                  REPLACE INTO${safeColumnName(tableName)} (${Object.keys(r1).map(safeColumnName)})                  VALUES (${Object.keys(r1).map((c)=>`@${c}`)})`,r1)}if(r2){if(getDateSync(r1)>getDateSync(r2)){updateSync()}}else{updateSync()}})})}for(consttableNameof['user','card','quiz','lesson','deck']){obs.next({message:`Uploading table:${tableName}`})syncTable(tableName)}})})obs.complete()})().catch(obs.error)})}
Enter fullscreen modeExit fullscreen mode

If you can curious on what is safe columnName / tableName in SQLite, it is

Observables can be joined.

functionimport(filename:string){returnnewObservable<{message:stringpercent?:number}>((obs)=>{(async()=>{obs.next({message:`Opening:${filename}`})constsrcDb=awaitDbSqlite.open(filename)DbSqlite.replicate(srcDb,this).subscribe(obs.next,obs.error,obs.complete)})().catch(obs.error)})}
Enter fullscreen modeExit fullscreen mode

Observables can be polled via WebSockets.

conn.socket.on('message',async(msg:string)=>{const{id,filename}=JSON.parse(msg)constisNew=!socketMap.has(id)socketMap.set(id,(json:any)=>{conn.socket.send(JSON.stringify(json))})if(isNew){constobservable=db.import(filename)logger.info(`Start processing:${filename}`)observable.subscribe((status)=>{socketMap.get(id)!({id,status})logger.info(`Processing status:${filename}:${status}`)},(err)=>{socketMap.get(id)!({id,error:err.message})logger.error(`Processing error:${filename}:${err.message}`)},()=>{socketMap.get(id)!({id,status:'done'})logger.info(`Finished processing:${filename}`)})}})
Enter fullscreen modeExit fullscreen mode

What is remaining is whether Observables can be cancelled via WebSockets?

Top comments(9)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss
CollapseExpand
 
andywer profile image
Andy Wermke
Software engineer & creator of internet things. Node.js aficionado since 2011, React lover since 2014.Head of solarwallet.io at SatoshiPay (satoshipay.io).
  • Location
    Berlin, Germany
  • Joined

Hi! Andy here, the author of observable-fns and threads.js.

Just stumbled upon this blog post. Nice job!

About the cancellation: Observables are kind-of cancellable – you can unsubscribe from them.

Observables are "cold" by default which means every new subscriber will get their own fresh instance of this observable, which also means that the code innew Observable(/* ... */) will be run on every.subscribe() call (!). If a subscriber unsubscribes, the upstream subscription is terminated (cancellation) – just that there may be numerous more subscriptions of the same sort.

You can make observables "hot" by using themulticast() function. Then there will only ever be one upstream subscription that all downstream subscribers share. You can unsubscribe, too, just that the single shared upstream subscription will live on until the last subscriber has unsubscribed.

What does that mean for your use case?

  • Usemulticast() 😉
  • Then enjoy.unsubscribe() acting as cancellation
  • Profit $$$

Hope that helps 🙂

CollapseExpand
 
patarapolw profile image
Pacharapol Withayasakpunt
Currently interested in TypeScript, Vue, Kotlin and Python. Looking forward to learning DevOps, though.
  • Location
    Thailand
  • Education
    Yes
  • Joined
• Edited on• Edited

How do I listen to.once('all-unsubscribed') event? Like this one package -p-cancelable.

Otherwise, it is possible to useSubject, as inthread.js? (I wouldn't want to use Worker, as I write to SQLite as well, and it is to supposed to be written from multiple threads.)

CollapseExpand
 
andywer profile image
Andy Wermke
Software engineer & creator of internet things. Node.js aficionado since 2011, React lover since 2014.Head of solarwallet.io at SatoshiPay (satoshipay.io).
  • Location
    Berlin, Germany
  • Joined
• Edited on• Edited

How do I listen to .once('all-unsubscribed') event?

functionsubscribeToThings(){returnmulticast(newObservable(observer=>{// <Subscribe to things>constterminateSubscription=()=>{// <-- THIS IS THE "all-unsubscribed" CODE// Remember? multicast() makes that all downstream subscriptions are bundled into one,// so this is the callback function that will be run when this bundled subscription is terminated}returnterminateSubscription}))}subscribeToThings().subscribe(/* ... */)

Otherwise, it is possible to use Subject, as in thread.js?

Sure, who would stop you? ;)

Thread Thread
 
patarapolw profile image
Pacharapol Withayasakpunt
Currently interested in TypeScript, Vue, Kotlin and Python. Looking forward to learning DevOps, though.
  • Location
    Thailand
  • Education
    Yes
  • Joined

Currently, I use the signature,

Observable<{value?:QueryItemPartiali:numbercancelFunction:CallbackType}>

Will it work?

Thread Thread
 
andywer profile image
Andy Wermke
Software engineer & creator of internet things. Node.js aficionado since 2011, React lover since 2014.Head of solarwallet.io at SatoshiPay (satoshipay.io).
  • Location
    Berlin, Germany
  • Joined

That doesn't make much sense. Why would you want to emit a new cancellation function for every new value?

You shouldn't have this non-standard custom cancellation approach when there is already a standardized one (observable.subscribe(),subscription.unsubscribe()). Keep in mind that emitting functions is generally considered to be an anti-pattern.

Maybe the observable-fns readme lacks a link to some documentation about observables in general… I guess this is one of your first times using observables? (No offense – I'm seriously interested :) )

Thread Thread
 
patarapolw profile image
Pacharapol Withayasakpunt
Currently interested in TypeScript, Vue, Kotlin and Python. Looking forward to learning DevOps, though.
  • Location
    Thailand
  • Education
    Yes
  • Joined

Yes. Only promises and eventemitters.

Should I start with RxJS?

Thread Thread
 
andywer profile image
Andy Wermke
Software engineer & creator of internet things. Node.js aficionado since 2011, React lover since 2014.Head of solarwallet.io at SatoshiPay (satoshipay.io).
  • Location
    Berlin, Germany
  • Joined

Yeah, maybe that's the best to get started. You should definitely start with RxJS's documentation – it's pretty good!

Which library you then use doesn't really matter too much as their APIs are very similar to almost identical.

Thread Thread
 
patarapolw profile image
Pacharapol Withayasakpunt
Currently interested in TypeScript, Vue, Kotlin and Python. Looking forward to learning DevOps, though.
  • Location
    Thailand
  • Education
    Yes
  • Joined

const terminateSubscription = () =>

Do you actually mean

newObservable((obs)=>{returnterminateSubscription// I actually use// return () => { isUnsubscribed = true }})
Thread Thread
 
andywer profile image
Andy Wermke
Software engineer & creator of internet things. Node.js aficionado since 2011, React lover since 2014.Head of solarwallet.io at SatoshiPay (satoshipay.io).
  • Location
    Berlin, Germany
  • Joined

Yes, my bad! I updated my code sample accordingly.

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Currently interested in TypeScript, Vue, Kotlin and Python. Looking forward to learning DevOps, though.
  • Location
    Thailand
  • Education
    Yes
  • Joined

More fromPacharapol Withayasakpunt

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp