Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Simple & Efficient data access for Scala and Scala.js

License

NotificationsYou must be signed in to change notification settings

xebia-functional/fetch

Repository files navigation

Join the chat at https://gitter.im/47deg/fetchMaven CentralLicenseLatest versionScala.jsGitHub Issues

A library for Simple & Efficient data access in Scala and Scala.js


Installation

Add the following dependency to your project's build file.

For Scala 2.12.x through 3.x:

"com.47deg"%%"fetch"%"3.1.2"

Or, if using Scala.js (1.8.x):

"com.47deg"%%%"fetch"%"3.1.2"

Remote data

Fetch is a library for making access to data both simple and efficient. Fetch is especially useful when querying data thathas a latency cost, such as databases or web services.

Define your data sources

To tell Fetch how to get the data you want, you must implement theDataSource typeclass. Data sources havefetch andbatch methods that define how to fetch such a piece of data.

Data Sources take two type parameters:

  1. Identity is a type that has enough information to fetch the data
  2. Result is the type of data we want to fetch
importcats.data.NonEmptyListimportcats.effect.ConcurrenttraitDataSource[F[_],Identity,Result]{defdata:Data[Identity,Result]defCF:Concurrent[F]deffetch(id:Identity):F[Option[Result]]defbatch(ids:NonEmptyList[Identity]):F[Map[Identity,Result]]}

ReturningConcurrent instances from the fetch methods allows us to specify if the fetch must run synchronously or asynchronously, and use all the goodies available incats andcats-effect.

We'll implement a dummy data source that can convert integers to strings. For convenience, we define afetchString function that lifts identities (Int in our dummy data source) to aFetch.

importcats._importcats.data.NonEmptyListimportcats.effect._importcats.implicits._importfetch._deflatency[F[_]:Sync](milis:Long):F[Unit]=Sync[F].delay(Thread.sleep(milis))objectToStringextendsData[Int,String] {defname="To String"defsource[F[_]:Async]:DataSource[F,Int,String]=newDataSource[F,Int,String]{overridedefdata=ToStringoverridedefCF=Concurrent[F]overridedeffetch(id:Int):F[Option[String]]=for {      _<-CF.delay(println(s"--> [${Thread.currentThread.getId}] One ToString$id"))      _<- latency(100)      _<-CF.delay(println(s"<-- [${Thread.currentThread.getId}] One ToString$id"))    }yieldOption(id.toString)overridedefbatch(ids:NonEmptyList[Int]):F[Map[Int,String]]=for {      _<-CF.delay(println(s"--> [${Thread.currentThread.getId}] Batch ToString$ids"))      _<- latency(100)      _<-CF.delay(println(s"<-- [${Thread.currentThread.getId}] Batch ToString$ids"))    }yield ids.toList.map(i=> (i, i.toString)).toMap  }}deffetchString[F[_]:Async](n:Int):Fetch[F,String]=Fetch(n,ToString.source)

Creating a runtime

Since we'll useIO from thecats-effect library to execute our fetches, we'll need anIORuntime for executing ourIO instances.

importcats.effect.unsafe.implicits.global//Gives us an IORuntime in places it is normally not provided

Normally, in your applications, this is provided byIOApp, and you should not need to import this except in limited scenarios such as test environments that do not have Cats Effect integration.For more information, and particularly on why you would usually not want to make one of these yourself,see this post by Daniel Spiewak

Creating and running a fetch

Now that we can convertInt values toFetch[F, String], let's try creating a fetch.

deffetchOne[F[_]:Async]:Fetch[F,String]=  fetchString(1)

Let's run it and wait for the fetch to complete. We'll useIO#unsafeRunTimed for testing purposes, which will run anIO[A] toOption[A] and returnNone if it didn't complete in time:

importscala.concurrent.duration._Fetch.run[IO](fetchOne).unsafeRunTimed(5.seconds)// --> [173] One ToString 1// <-- [173] One ToString 1// res0: Option[String] = Some(value = "1")

As you can see in the previous example, theToStringSource is queried once to get the value of 1.

Batching

Multiple fetches to the same data source are automatically batched. For illustrating this, we are going to compose three independent fetch results as a tuple.

deffetchThree[F[_]:Async]:Fetch[F, (String,String,String)]=  (fetchString(1), fetchString(2), fetchString(3)).tupled

When executing the above fetch, note how the three identities get batched, and the data source is only queried once.

Fetch.run[IO](fetchThree).unsafeRunTimed(5.seconds)// --> [172] Batch ToString NonEmptyList(1, 2, 3)// <-- [172] Batch ToString NonEmptyList(1, 2, 3)// res1: Option[(String, String, String)] = Some(value = ("1", "2", "3"))

Note that theDataSource#batch method is not mandatory. It will be implemented in terms ofDataSource#fetch if you don't provide an implementation.

objectUnbatchedToStringextendsData[Int,String] {defname="Unbatched to string"defsource[F[_]:Async]=newDataSource[F,Int,String] {overridedefdata=UnbatchedToStringoverridedefCF=Concurrent[F]overridedeffetch(id:Int):F[Option[String]]=CF.delay(println(s"--> [${Thread.currentThread.getId}] One UnbatchedToString$id"))>>      latency(100)>>CF.delay(println(s"<-- [${Thread.currentThread.getId}] One UnbatchedToString$id"))>>CF.pure(Option(id.toString))  }}defunbatchedString[F[_]:Async](n:Int):Fetch[F,String]=Fetch(n,UnbatchedToString.source)

Let's create a tuple of unbatched string requests.

deffetchUnbatchedThree[F[_]:Async]:Fetch[F, (String,String,String)]=  (unbatchedString(1), unbatchedString(2), unbatchedString(3)).tupled

When executing the above fetch, note how the three identities get requested in parallel. You can overridebatch to execute queries sequentially if you need to.

Fetch.run[IO](fetchUnbatchedThree).unsafeRunTimed(5.seconds)// --> [172] One UnbatchedToString 1// --> [173] One UnbatchedToString 2// <-- [172] One UnbatchedToString 1// --> [172] One UnbatchedToString 3// <-- [173] One UnbatchedToString 2// <-- [172] One UnbatchedToString 3// res2: Option[(String, String, String)] = Some(value = ("1", "2", "3"))

Parallelism

If we combine two independent fetches from different data sources, the fetches can be run in parallel. First, let's add a data source that fetches a string's size.

objectLengthextendsData[String,Int] {defname="Length"defsource[F[_]:Async]=newDataSource[F,String,Int] {overridedefdata=LengthoverridedefCF=Concurrent[F]overridedeffetch(id:String):F[Option[Int]]=for {      _<-CF.delay(println(s"--> [${Thread.currentThread.getId}] One Length$id"))      _<- latency(100)      _<-CF.delay(println(s"<-- [${Thread.currentThread.getId}] One Length$id"))    }yieldOption(id.size)overridedefbatch(ids:NonEmptyList[String]):F[Map[String,Int]]=for {      _<-CF.delay(println(s"--> [${Thread.currentThread.getId}] Batch Length$ids"))      _<- latency(100)      _<-CF.delay(println(s"<-- [${Thread.currentThread.getId}] Batch Length$ids"))    }yield ids.toList.map(i=> (i, i.size)).toMap  }}deffetchLength[F[_]:Async](s:String):Fetch[F,Int]=Fetch(s,Length.source)

And now we can easily receive data from the two sources in a single fetch.

deffetchMulti[F[_]:Async]:Fetch[F, (String,Int)]=  (fetchString(1), fetchLength("one")).tupled

Note how the two independent data fetches run in parallel, minimizing the latency cost of querying the two data sources.

Fetch.run[IO](fetchMulti).unsafeRunTimed(5.seconds)// --> [173] One Length one// --> [172] One ToString 1// <-- [172] One ToString 1// <-- [173] One Length one// res3: Option[(String, Int)] = Some(value = ("1", 3))

Deduplication & Caching

The Fetch library supports deduplication and optional caching.By default, fetches that are chained together will share the same cache backend, providing some deduplication.

When fetching an identity twice within the sameFetch, such as a batch of fetches or when youflatMap one fetch into another, subsequent fetches for the same identity are cached.Let's try creating a fetch that asks for the same identity twice, by usingflatMap (in a for-comprehension) to chain the requests together:

deffetchTwice[F[_]:Async]:Fetch[F, (String,String)]=for {  one<- fetchString(1)  two<- fetchString(1)}yield (one, two)

While running it, notice that the data source is only queried once.The next time the identity is requested, it's served from the internal cache.

valrunFetchTwice=Fetch.run[IO](fetchTwice)
runFetchTwice.unsafeRunTimed(5.seconds)// --> [173] One ToString 1// <-- [173] One ToString 1// res4: Option[(String, String)] = Some(value = ("1", "1"))

This will still fetch the data again, however, if we call it once more:

runFetchTwice.unsafeRunTimed(5.seconds)// --> [172] One ToString 1// <-- [172] One ToString 1// res5: Option[(String, String)] = Some(value = ("1", "1"))

If we want to cache between multiple individual fetches, you should useFetch.runCache orFetch.runAll to return the cache for reusing later.Here is an example where we fetch four separate times, and explicitly share the cache to keep the deduplication functionality:

//We get the cache from the first run and pass it to all subsequent fetchesvalrunFetchFourTimesSharedCache=for {  (cache, one)<-Fetch.runCache[IO](fetchString(1))  two<-Fetch.run[IO](fetchString(1), cache)  three<-Fetch.run[IO](fetchString(1), cache)  four<-Fetch.run[IO](fetchString(1), cache)}yield (one, two, three, four)
runFetchFourTimesSharedCache.unsafeRunTimed(5.seconds)// --> [173] One ToString 1// <-- [173] One ToString 1// res6: Option[(String, String, String, String)] = Some(//   value = ("1", "1", "1", "1")// )

As you can see above, the cache will now work between calls and can be used to deduplicate requests over a period of time.Note that this does not support any kind of automatic cache invalidation, so you will need to keep track of which values you want to re-fetch if you plan on sharing the cache.

For more in-depth information, take a look at ourdocumentation.

Copyright

Fetch is designed and developed by 47 Degrees

Copyright (C) 2016-2023 47 Degrees.http://47deg.com


[8]ページ先頭

©2009-2025 Movatter.jp