Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Pure functional event sourcing runtime

License

NotificationsYou must be signed in to change notification settings

notxcain/aecor

Repository files navigation

Build StatusMaven CentralJoin the chat at https://gitter.im/notxcain/aecorScala Steward badge

Aecor

A pure functional library for defining and running eventsourced behaviors

Aecor is an opinionated library to help building scalable, distributed eventsourced services written in Scala.

Built-in runtime implementation usesAkka for distribution and fault tolerance.

It heavily relies onCats andCats Effect

Aecor works on Scala 2.12 with Java 8.

The nameAecor (lat. ocean) is inspired by a vision of modern distributed applications as an ocean of messages with pure behaviors floating in it.

Installing Aecor

To start using Aecor Akka Persistence Runtime add the following to yourbuild.sbt file:

scalaVersion:="2.12.7"scalacOptions+="-Ypartial-unification"addCompilerPlugin("org.scalameta"%"paradise"%"3.0.0-M11" crossCrossVersion.full)libraryDependencies+="io.aecor"%%"akka-persistence-runtime"%"x.y.z"// See current version on the badge above

Media

Entity Behavior Definition

In this short guide I'll show you how to define and deploy your first event sourced behavior on runtime backed by Akka Persistence and Akka Cluster Sharding.

Each entity needs an identity, so let's start with identifier type:

finalcaseclassSubscriptionId(value: java.util.UUID)extendsAnyVal

Then define what actions we're able to perform onSubscription

importaecor.macros.boopickleWireProtocolimportcats.tagless.autoFunctorKimportboopickle.Default._@boopickleWireProtocol@autoFunctorK(false)traitSubscription[F[_]] {defcreateSubscription(userId:String,productId:String,planId:String):F[Unit]defpauseSubscription:F[Unit]defresumeSubscription:F[Unit]defcancelSubscription:F[Unit]}

You may notice that there is noSubscriptionId involved, and it's okay because this interface describes actions of a concreteSubscription and entity behavior should not know about its identity, it's behavior should be defined solely by its state.

There is an abstract typeF[_] which stays for an effect (seeRob Norris, Functional Programming with Effects) that would be performed during each action invocation.

Also being polymorphic in effect improves the reuse of this interface, you'll see it later.

@boopickleWireProtocol - is a macro annotation that automates derivation of aWireProtocol, which is used by Akka Runtime to encode and decode actions and corresponding responses.

We are event sourced, so let's define our events:

importaecor.runtime.akkapersistence.serialization._sealedabstractclassSubscriptionEventextendsProductwithSerializableobjectSubscriptionEvent {finalcaseclassSubscriptionCreated(userId:String,productId:String,planId:String)extendsSubscriptionEventfinalcaseobjectSubscriptionPausedextendsSubscriptionEventfinalcaseobjectSubscriptionResumedextendsSubscriptionEventfinalcaseobjectSubscriptionCancelledextendsSubscriptionEventimplicitvalpersistentEncoder:PersistentEncoder[SubscriptionEvent]=???implicitvalpersistentDecoder:PersistentDecoder[SubscriptionEvent]=???}

I've intentionally omitted implementation ofPersistentEncoder andPersistentDecoder, because providing generic JSON encoding would be careless as persistent event schema requires your attention and I would recommend to use Protobuf or other formats that support schema evolution.

Let's define a state on whichSubscription operates.

importaecor.data.Folded.syntax._importSubscriptionState._finalcaseclassSubscriptionState(status:Status) {defupdate(e:SubscriptionEvent):Folded[SubscriptionState]= ematch {caseSubscriptionCreated(_, _, _)=>      impossiblecaseSubscriptionPaused=>      subscription.copy(status=Paused).nextcaseSubscriptionResumed=>      subscription.copy(status=Active).nextcaseSubscriptionCancelled=>      subscription.copy(status=Cancelled).next  }}objectSubscriptionState {defcreate(e:SubscriptionEvent):Folded[SubscriptionState]= ematch {caseSubscriptionCreated(userId, productId, planId)=>Subscription(Active).nextcase _=> impossible  }sealedabstractclassStatusextendsProductwithSerializableobjectStatus {finalcaseobjectActiveextendsStatusfinalcaseobjectPausedextendsStatusfinalcaseobjectCancelledextendsStatus    }}

Pay attention toFolded datatype, it has to constructor:

  • Impossible is used to express impossible folds of events, so that you don't throw exceptions.
  • Next(a: A) is used to express successful event application.

Now, the final part before we launch.'

As I said earlierSubscription[F[_]] is polymorphic in its effect type.

Our effect would be anyF[_] with instance ofMonadAction[F, Option[SubscriptionState], SubscriptionEvent] which provides essential operations for eventsources command handler

  • read: F[Option[SubscriptionState]] - reads current state
  • append(event: SubscriptionEvent, other: SubscriptionEvent*): F[Unit] - append one or more eventsOther stuff like state recovery and event persistence is held by Akka Persistence Runtime.

So lets defineSubscriptionActions

importcats.implicits._finalclassSubscriptionActions[F[_]](implicitF:MonadAction[F,Option[SubscriptionState],SubscriptionEvent]  )extendsSubscription[F] {importF._// import algebra functionsdefcreateSubscription(userId:String,productId:String,planId:String):F[Unit]=    read.flatMap {caseSome(subscription)=>        ignorecaseNone=>// Produce event        append(SubscriptionCreated(userId, productId, planId))    }defpauseSubscription:F[Unit]=    read.flatMap {caseSome(subscription)if subscription.status==Active=>        append(SubscriptionPaused)case _=>        ignore    }defresumeSubscription:F[Unit]=    read.flatMap {caseSome(subscription)if subscription.status==Paused=>        append(SubscriptionResumed)case _=>        ignore    }defcancelSubscription:F[Unit]=    read.flatMap {caseSome(subscription)if subscription.canCancel=>        append(SubscriptionCancelled)case _=>        ignore    }}

Now when actions are defined we're ready to deploy

importcats.effect.IOimportaecor.runtime.akkapersistence._valsystem=ActorSystem("system")valjournalAdapter=CassandraJournalAdapter(system)valruntime=AkkaPersistenceRuntime(system, journalAdapter)valbehavior:EventsourcedBehavior[Subscription,IO,Option[SubscriptionState],SubscriptionEvent]EventsourcedBehavior.optional(newSubscriptionActions,Fold.optional(SubscriptionState.create)(_.update(_))  )valdeploySubscriptions:IO[SubscriptionId=>Subscription[IO]]=  runtime.deploy("Subscription",    behavior,Tagging.const[SubscriptionId](EventTag("Subscription"))  )

Projections

val journalQuery = runtime.journal

Adopters

Using Aecor in your organization? Send us a PR to list your company here:

Packages

No packages published

Contributors9

Languages


[8]ページ先頭

©2009-2025 Movatter.jp