- Notifications
You must be signed in to change notification settings - Fork35
Pure functional event sourcing runtime
License
notxcain/aecor
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Aecor is an opinionated library to help building scalable, distributed eventsourced services written in Scala.
Built-in runtime implementation usesAkka for distribution and fault tolerance.
It heavily relies onCats andCats Effect
Aecor works on Scala 2.12 with Java 8.
The nameAecor
(lat. ocean) is inspired by a vision of modern distributed applications as an ocean of messages with pure behaviors floating in it.
To start using Aecor Akka Persistence Runtime add the following to yourbuild.sbt
file:
scalaVersion:="2.12.7"scalacOptions+="-Ypartial-unification"addCompilerPlugin("org.scalameta"%"paradise"%"3.0.0-M11" crossCrossVersion.full)libraryDependencies+="io.aecor"%%"akka-persistence-runtime"%"x.y.z"// See current version on the badge above
- Amazingseries on Aecor byVladimir Pavkin
In this short guide I'll show you how to define and deploy your first event sourced behavior on runtime backed by Akka Persistence and Akka Cluster Sharding.
Each entity needs an identity, so let's start with identifier type:
finalcaseclassSubscriptionId(value: java.util.UUID)extendsAnyVal
Then define what actions we're able to perform onSubscription
importaecor.macros.boopickleWireProtocolimportcats.tagless.autoFunctorKimportboopickle.Default._@boopickleWireProtocol@autoFunctorK(false)traitSubscription[F[_]] {defcreateSubscription(userId:String,productId:String,planId:String):F[Unit]defpauseSubscription:F[Unit]defresumeSubscription:F[Unit]defcancelSubscription:F[Unit]}
You may notice that there is noSubscriptionId
involved, and it's okay because this interface describes actions of a concreteSubscription
and entity behavior should not know about its identity, it's behavior should be defined solely by its state.
There is an abstract typeF[_]
which stays for an effect (seeRob Norris, Functional Programming with Effects) that would be performed during each action invocation.
Also being polymorphic in effect improves the reuse of this interface, you'll see it later.
@boopickleWireProtocol
- is a macro annotation that automates derivation of aWireProtocol
, which is used by Akka Runtime to encode and decode actions and corresponding responses.
We are event sourced, so let's define our events:
importaecor.runtime.akkapersistence.serialization._sealedabstractclassSubscriptionEventextendsProductwithSerializableobjectSubscriptionEvent {finalcaseclassSubscriptionCreated(userId:String,productId:String,planId:String)extendsSubscriptionEventfinalcaseobjectSubscriptionPausedextendsSubscriptionEventfinalcaseobjectSubscriptionResumedextendsSubscriptionEventfinalcaseobjectSubscriptionCancelledextendsSubscriptionEventimplicitvalpersistentEncoder:PersistentEncoder[SubscriptionEvent]=???implicitvalpersistentDecoder:PersistentDecoder[SubscriptionEvent]=???}
I've intentionally omitted implementation ofPersistentEncoder
andPersistentDecoder
, because providing generic JSON encoding would be careless as persistent event schema requires your attention and I would recommend to use Protobuf or other formats that support schema evolution.
Let's define a state on whichSubscription
operates.
importaecor.data.Folded.syntax._importSubscriptionState._finalcaseclassSubscriptionState(status:Status) {defupdate(e:SubscriptionEvent):Folded[SubscriptionState]= ematch {caseSubscriptionCreated(_, _, _)=> impossiblecaseSubscriptionPaused=> subscription.copy(status=Paused).nextcaseSubscriptionResumed=> subscription.copy(status=Active).nextcaseSubscriptionCancelled=> subscription.copy(status=Cancelled).next }}objectSubscriptionState {defcreate(e:SubscriptionEvent):Folded[SubscriptionState]= ematch {caseSubscriptionCreated(userId, productId, planId)=>Subscription(Active).nextcase _=> impossible }sealedabstractclassStatusextendsProductwithSerializableobjectStatus {finalcaseobjectActiveextendsStatusfinalcaseobjectPausedextendsStatusfinalcaseobjectCancelledextendsStatus }}
Pay attention toFolded
datatype, it has to constructor:
Impossible
is used to express impossible folds of events, so that you don't throw exceptions.Next(a: A)
is used to express successful event application.
Now, the final part before we launch.'
As I said earlierSubscription[F[_]]
is polymorphic in its effect type.
Our effect would be anyF[_]
with instance ofMonadAction[F, Option[SubscriptionState], SubscriptionEvent]
which provides essential operations for eventsources command handler
read: F[Option[SubscriptionState]]
- reads current stateappend(event: SubscriptionEvent, other: SubscriptionEvent*): F[Unit]
- append one or more eventsOther stuff like state recovery and event persistence is held by Akka Persistence Runtime.
So lets defineSubscriptionActions
importcats.implicits._finalclassSubscriptionActions[F[_]](implicitF:MonadAction[F,Option[SubscriptionState],SubscriptionEvent] )extendsSubscription[F] {importF._// import algebra functionsdefcreateSubscription(userId:String,productId:String,planId:String):F[Unit]= read.flatMap {caseSome(subscription)=> ignorecaseNone=>// Produce event append(SubscriptionCreated(userId, productId, planId)) }defpauseSubscription:F[Unit]= read.flatMap {caseSome(subscription)if subscription.status==Active=> append(SubscriptionPaused)case _=> ignore }defresumeSubscription:F[Unit]= read.flatMap {caseSome(subscription)if subscription.status==Paused=> append(SubscriptionResumed)case _=> ignore }defcancelSubscription:F[Unit]= read.flatMap {caseSome(subscription)if subscription.canCancel=> append(SubscriptionCancelled)case _=> ignore }}
Now when actions are defined we're ready to deploy
importcats.effect.IOimportaecor.runtime.akkapersistence._valsystem=ActorSystem("system")valjournalAdapter=CassandraJournalAdapter(system)valruntime=AkkaPersistenceRuntime(system, journalAdapter)valbehavior:EventsourcedBehavior[Subscription,IO,Option[SubscriptionState],SubscriptionEvent]EventsourcedBehavior.optional(newSubscriptionActions,Fold.optional(SubscriptionState.create)(_.update(_)) )valdeploySubscriptions:IO[SubscriptionId=>Subscription[IO]]= runtime.deploy("Subscription", behavior,Tagging.const[SubscriptionId](EventTag("Subscription")) )
val journalQuery = runtime.journal
Using Aecor in your organization? Send us a PR to list your company here:
About
Pure functional event sourcing runtime
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors9
Uh oh!
There was an error while loading.Please reload this page.