- Notifications
You must be signed in to change notification settings - Fork39
🐍 Minos is a framework which helps you create reactive microservices in Python
License
minos-framework/minos-python
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Minos is a framework which helps you createreactive microservices in Python. Internally, it leverages Event Sourcing, CQRS and a message driven architecture to fulfil the commitments of an asynchronous environment.
The roadmap of this project is publicly accessible at thisGitHub Repository.
Theminos framework is built strongly inspired by the following set of patterns:
- Microservice architecture: Architect an application as a collection of loosely coupled services.
- Decompose by subdomain: Define services corresponding to Domain-Driven Design (DDD) subdomains
- Self-contained Service: Microservices can respond to a synchronous request without waiting for the response from any other service.
- Database per service: Keep each microservice's persistent data private to that service and accessible only via its API. A service's transactions only involve its database.
- Saga: Transaction that spans multiple services.
- CQRS: view database, which is a read-only replica that is designed to support queries that retrieves data from microservice. The application keeps the replica up to date by subscribing to Domain events published by the service that own the data.
- Domain event: A service often needs to publish events when it updates its data. These events might be needed, for example, to update a CQRS view.
- Event Sourcing: Event sourcing persists the state of a business entity such an Order or a Customer as a sequence of state-changing events. Whenever the state of a business entity changes, a new event is appended to the list of events. Since saving an event is a single operation, it is inherently atomic. The application reconstructs an entity's current state by replaying the events.
- Messaging: Services communicating by exchanging messages over messaging channels. (Apache Kafka is used in this case)
- API gateway: Single entry point for all clients. The API gateway proxy/route to the appropriate service.
- Service registry: Database of services. A service registry might invoke a service instance's health check API to verify that it is able to handle requests
- Self Registration: Each service instance register on startup and unregister on stop.
- Access token: The API Gateway authenticates the request and passes an access token (e.g. JSON Web Token) that securely identifies the requestor in each request to the services. A service can include the access token in requests it makes to other services.
- Health Check API: A service has a health check API endpoint (e.g. HTTP
/health) that returns the health of the service.
The easiest way to manage a project is with theminos command-line interface, which provides commands to setup both the project skeleton (configures containerization, databases, brokers, etc.) and the microservice skeleton (the base microservice structure, environment configuration, etc.).
You can install it with:
pip install minos-cli
Here is a summary containing the most useful commands:
minos new project $NAME: Create a new Projectminos set $RESOURCE $BACKEND: Configure an environment resource (broker, database, etc.).minos deploy project: Deploy a project.minos new microservice $NAME: Create a new microservice.minos deploy microservicedeploy a microservice.
For more information, visit theminos-cli repository.
The best place to start learning how to use the Minos Framework is atMinos Learn. The official API Reference is publicly available at theGitHub Pages.
This section includes a quickstart guide to create your firstminos microservice, so that anyone can get the gist of the framework.
The required environment to run this quickstart is the following:
- A
python>=3.9interpreter with version equal or greater to . - A
kafkainstance available atlocalhost:9092 - A
postgresinstance available atlocalhost:5432with thefoo_dbandfoobar_dbdatabases accessible with theuser:passcredentials. - Two TCP sockets available to use at
localhost:4545andlocalhost:4546.
Click to show adocker-compose.yml that provides thekafka andpostgres instances ready to be used!
# docker-compose.ymlversion:"3.9"services:zookeeper:restart:alwaysimage:wurstmeister/zookeeper:latestkafka:restart:alwaysimage:wurstmeister/kafka:latestports: -"9092:9092"depends_on: -zookeeperenvironment:KAFKA_ADVERTISED_HOST_NAME:localhostKAFKA_ZOOKEEPER_CONNECT:zookeeper:2181postgres:restart:alwaysimage:postgres:latestports: -"5432:5432"environment: -POSTGRES_USER=user -POSTGRES_PASSWORD=pass
Then, start the environment:
docker-compose up
To create the databases, just run the following command:
docker-composeexec postgres psql -U user -tc'CREATE database foo_db'docker-composeexec postgres psql -U user -tc'CREATE database foobar_db'
Note that these parameters can be customized on the configuration files.
If you want to directly useminos without the command-line utility, the following command will install the needed packages:
pip install \ minos-microservice-aggregate \ minos-microservice-common \ minos-microservice-cqrs \ minos-microservice-networks \ minos-microservice-saga\ minos-broker-kafka \ minos-http-aiohttpTo keep things simpler, this quickstart will create a microservice assuming all the source code is stored on a singlefoo/main.py file. In addition to the source file, afoo/config.yml will contain all the configuration stuff.
The directory structure will become:
.└── foo ├── config.yml └── main.pyCreate afoo/config.yml file and add the following lines:
Click to show the full file
# foo/config.ymlversion:2name:fooaggregate:entities: -main.Foorepositories:transaction:minos.aggregate.PostgreSqlTransactionRepositoryevent:minos.aggregate.PostgreSqlEventRepositorysnapshot:minos.aggregate.PostgreSqlSnapshotRepositorydatabases:default:database:foo_dbuser:minospassword:min0ssaga:path:"./foo.lmdb"interfaces:broker:port:minos.networks.BrokerPortpublisher:client:minos.plugins.kafka.KafkaBrokerPublisherqueue:minos.networks.PostgreSqlBrokerPublisherQueuesubscriber:client:minos.plugins.kafka.KafkaBrokerSubscriberqueue:minos.networks.PostgreSqlBrokerSubscriberQueuevalidator:minos.networks.PostgreSqlBrokerSubscriberDuplicateValidatorhttp:port:minos.networks.HttpPortconnector:client:minos.plugins.aiohttp.AioHttpConnectorport:4545periodic:port:minos.networks.PeriodicPortpools:lock:minos.common.PostgreSqlLockPooldatabasse:minos.common.PostgreSqlPoolbroker:minos.networks.BrokerClientPoolsaga:manager:minos.saga.SagaManagerrouters: -minos.networks.BrokerRouter -minos.networks.PeriodicRouter -minos.networks.RestHttpRoutermiddleware: -minos.saga.transactional_commandservices: -minos.networks.SystemService -minos.aggregate.TransactionService -minos.aggregate.SnapshotService -minos.saga.SagaService -main.FooCommandService -main.FooQueryService
Create afoo/main.py file and add the following content:
# foo/main.pyfrompathlibimportPathfromminos.aggregateimportAggregate,RootEntityfromminos.commonimportEntrypointLauncherfromminos.cqrsimportCommandService,QueryServiceclassFoo(RootEntity):"""Foo RootEntity class."""classFooAggregate(Aggregate[Foo]):"""Foo Aggregate class."""classFooCommandService(CommandService):"""Foo Command Service class."""classFooQueryService(QueryService):"""Foo Query Service class."""if__name__=='__main__':launcher=EntrypointLauncher.from_config(Path(__file__).parent/"config.yml")launcher.launch()
Execute the following command to start the microservice:
python foo/main.py
The way to model data inminos is highly inspired by theEvent Sourcing ideas. For this reason, the classes to be used to model data are:
minos.aggregate.Entity: A model that has an identifier that gives it a unique identity, in the sense that some values from which it is composed could change, but its identity will continue being the same.minos.aggregate.ExternalEntity: A model that belongs to another microservice (or aggregate boundary) but needs to be used for some reason inside this microservice (or aggregate boundary).minos.aggregate.RootEntity: Is anEntitysuperset that provides global identity across the project compared to standardEntitymodels, that has only local identity (theRootEntitycan be accessed from another microservices asExternalEntitymodels, but standardEntitymodels can only be accessed within the microservice that define them). TheRootEntityis also the one that interacts with the persistence layer (theEventRepositoryandSnapshotRepositoryinstances).minos.aggregate.Ref: A wrapper class that provides the functionality to store a reference of otherRootEntityorExternalEntityinstances.minos.aggregate.EntitySet: A container ofEntityinstances that takes advantage of the incremental behaviour of theEventRepository.minos.aggregate.ValueObject: A model that is only identified by the values that compose it, so that if some of them changes, then the model becomes completely different (for that reason, these models are immutable).minos.aggregate.ValueObjectSet: A container ofValueObjectinstances that takes advantage of the incremental behaviour of the `EventRepository.minos.aggregate.Aggregate: A collection ofEntityand/orValueObjectmodels that are related to each other through aRootEntity.minos.aggregate.Event: A model that contains the difference between the aRootEntityinstance and its previous version (if any).
Here is an example of the creation theFoo aggregate. In this case, it has two attributes, abar being astr, and afoobar being an optional reference to the externalFooBar aggregate, which it is assumed that it has asomething attribute.
# foo/main.pyfrom __future__importannotationsfromtypingimportOptionalfromuuidimportUUIDfromminos.aggregateimportAggregate,RootEntity,ExternalEntity,RefclassFoo(RootEntity):"""Foo RootEntity class."""bar:strfoobar:Optional[Ref[FooBar]]classFooBar(ExternalEntity):"""FooBar ExternalEntity clas."""something:strclassFooAggregate(Aggregate[Foo]):"""Foo Aggregate class."""@staticmethodasyncdefcreate_foo(bar:str)->UUID:"""Create a new Foo instance :param bar: The bar of the new instance. :return: The identifier of the new instance. """foo=awaitFoo.create(bar)returnfoo.uuid@staticmethodasyncdefupdate_foobar(uuid:UUID,foobar:Optional[Ref[FooBar]])->None:"""Update the foobar attribute of the ``Foo`` instance. :param uuid: The identifier of the ``Foo`` instance. :param foobar: The foobar value to be set. :return: This method does not return anything. """foo=awaitFoo.get(uuid)foo.foobar=foobarawaitfoo.save()
Click to show the full file
# foo/main.pyfrom __future__importannotationsfrompathlibimportPathfromtypingimportOptionalfromuuidimportUUIDfromminos.aggregateimportAggregate,RootEntity,ExternalEntity,Reffromminos.commonimportEntrypointLauncherfromminos.cqrsimportCommandService,QueryServiceclassFoo(RootEntity):"""Foo RootEntity class."""bar:strfoobar:Optional[Ref[FooBar]]classFooBar(ExternalEntity):"""FooBar ExternalEntity clas."""something:strclassFooAggregate(Aggregate[Foo]):"""Foo Aggregate class."""@staticmethodasyncdefcreate_foo(bar:str)->UUID:"""Create a new Foo instance :param bar: The bar of the new instance. :return: The identifier of the new instance. """foo=awaitFoo.create(bar)returnfoo.uuid@staticmethodasyncdefupdate_foobar(uuid:UUID,foobar:Optional[Ref[FooBar]])->None:"""Update the foobar attribute of the ``Foo`` instance. :param uuid: The identifier of the ``Foo`` instance. :param foobar: The foobar value to be set. :return: This method does not return anything. """foo=awaitFoo.get(uuid)foo.foobar=foobarawaitfoo.save()classFooCommandService(CommandService):"""Foo Command Service class."""classFooQueryService(QueryService):"""Foo Query Service class."""if__name__=='__main__':launcher=EntrypointLauncher.from_config(Path(__file__).parent/"config.yml")launcher.launch()
Here is an example of the definition of a command to createFoo instances. To do that, it is necessary to define aCommandService that contains the handling function. It will handle both the broker messages sent to the"CreateFoo" topic and the rest calls to the"/foos" path with the"POST" method. In this case, the handling function unpacks theRequest's content and then calls thecreate method from theAggregate, which stores theFoo instance following an event-driven strategy (it also publishes the"FooCreated" event). Finally, aResponse is returned to be handled by the external caller (another microservice or the API-gateway).
# foo/main.pyfromminos.cqrsimportCommandServicefromminos.networksimportenroute,Request,ResponseclassFooCommandService(CommandService):"""Foo Command Service class."""@enroute.broker.command("CreateFoo")@enroute.rest.command("/foos","POST")asyncdefcreate_foo(self,request:Request)->Response:"""Create a new Foo. :param request: The ``Request`` that contains the ``bar`` attribute. :return: A ``Response`` containing identifier of the already created instance. """content=awaitrequest.content()bar=content["bar"]uuid=awaitFooAggregate.create_foo(bar)returnResponse({"uuid":uuid})
Click to show the full file
# foo/main.pyfrom __future__importannotationsfrompathlibimportPathfromtypingimportOptionalfromuuidimportUUIDfromminos.aggregateimportAggregate,RootEntity,ExternalEntity,Reffromminos.commonimportEntrypointLauncherfromminos.cqrsimportCommandService,QueryServicefromminos.networksimportRequest,Response,enrouteclassFoo(RootEntity):"""Foo RootEntity class."""bar:strfoobar:Optional[Ref[FooBar]]classFooBar(ExternalEntity):"""FooBar ExternalEntity clas."""something:strclassFooAggregate(Aggregate[Foo]):"""Foo Aggregate class."""@staticmethodasyncdefcreate_foo(bar:str)->UUID:"""Create a new Foo instance :param bar: The bar of the new instance. :return: The identifier of the new instance. """foo=awaitFoo.create(bar)returnfoo.uuid@staticmethodasyncdefupdate_foobar(uuid:UUID,foobar:Optional[Ref[FooBar]])->None:"""Update the foobar attribute of the ``Foo`` instance. :param uuid: The identifier of the ``Foo`` instance. :param foobar: The foobar value to be set. :return: This method does not return anything. """foo=awaitFoo.get(uuid)foo.foobar=foobarawaitfoo.save()classFooCommandService(CommandService):"""Foo Command Service class."""@enroute.broker.command("CreateFoo")@enroute.rest.command("/foos","POST")asyncdefcreate_foo(self,request:Request)->Response:"""Create a new Foo. :param request: The ``Request`` that contains the ``bar`` attribute. :return: A ``Response`` containing identifier of the already created instance. """content=awaitrequest.content()bar=content["bar"]uuid=awaitFooAggregate.create_foo(bar)returnResponse({"uuid":uuid})classFooQueryService(QueryService):"""Foo Query Service class."""if__name__=='__main__':launcher=EntrypointLauncher.from_config(Path(__file__).parent/"config.yml")launcher.launch()
Execute the following command to start the microservice:
python foo/main.py
To check that everything works fine, execute the following command:
curl --location --request POST'http://localhost:4545/foos' \--header'Content-Type: application/json' \--data-raw'{ "bar": "test"}'
And the expected response will be similar to:
{"uuid":"YOUR_UUID"}Here is an example of the event and query handling. In this case, it must be defined on aQueryService class. In this case a"FooCreated" and"FooUpdated.foobar" events are handled (they will print the content on the microservice's logs). The event contents typically contains instances ofAggregateDiff type, which is referred to the difference respect to the previously stored instance. The exposed query is connected to the calls that come from the"/foos/example" path and"GET" method and a naive string is returned.
Disclaimer: A realQueryService implementation must populate a query-oriented database based on the events to which is subscribed to, and expose queries performed over that query-oriented database.
# foo/main.pyfromminos.cqrsimportQueryServicefromminos.networksimportenroute,Request,ResponseclassFooQueryService(QueryService):"""Foo Query Service class."""@enroute.broker.event("FooCreated")asyncdeffoo_created(self,request:Request)->None:"""Handle the "FooCreated" event. :param request: The ``Request`` that contains a ``Event``. :return: This method does not return anything. """event=awaitrequest.content()print(f"A Foo was created:{event}")@enroute.broker.event("FooUpdated.foobar")asyncdeffoo_foobar_updated(self,request:Request)->None:"""Handle the "FooUpdated.foobar" event. :param request: The ``Request`` that contains a ``Event``. :return: This method does not return anything. """event=awaitrequest.content()print(f"The 'foobar' field of a Foo was updated:{event}")@enroute.rest.query("/foos/example","GET")asyncdefexample(self,request:Request)->Response:"""Handle the example query. :param request: The ``Request`` that contains the necessary information. :return: A ``Response`` instance. """returnResponse("This is an example response!")
Click to show the full file
# foo/main.pyfrom __future__importannotationsfrompathlibimportPathfromtypingimportOptionalfromuuidimportUUIDfromminos.aggregateimportAggregate,RootEntity,ExternalEntity,Reffromminos.commonimportEntrypointLauncherfromminos.cqrsimportCommandService,QueryServicefromminos.networksimportRequest,Response,enrouteclassFoo(RootEntity):"""Foo RootEntity class."""bar:strfoobar:Optional[Ref[FooBar]]classFooBar(ExternalEntity):"""FooBar ExternalEntity clas."""something:strclassFooAggregate(Aggregate[Foo]):"""Foo Aggregate class."""@staticmethodasyncdefcreate_foo(bar:str)->UUID:"""Create a new Foo instance :param bar: The bar of the new instance. :return: The identifier of the new instance. """foo=awaitFoo.create(bar)returnfoo.uuid@staticmethodasyncdefupdate_foobar(uuid:UUID,foobar:Optional[Ref[FooBar]])->None:"""Update the foobar attribute of the ``Foo`` instance. :param uuid: The identifier of the ``Foo`` instance. :param foobar: The foobar value to be set. :return: This method does not return anything. """foo=awaitFoo.get(uuid)foo.foobar=foobarawaitfoo.save()classFooCommandService(CommandService):"""Foo Command Service class."""@enroute.broker.command("CreateFoo")@enroute.rest.command("/foos","POST")asyncdefcreate_foo(self,request:Request)->Response:"""Create a new Foo. :param request: The ``Request`` that contains the ``bar`` attribute. :return: A ``Response`` containing identifier of the already created instance. """content=awaitrequest.content()bar=content["bar"]uuid=awaitFooAggregate.create_foo(bar)returnResponse({"uuid":uuid})classFooQueryService(QueryService):"""Foo Query Service class."""@enroute.broker.event("FooCreated")asyncdeffoo_created(self,request:Request)->None:"""Handle the "FooCreated" event. :param request: The ``Request`` that contains a ``Event``. :return: This method does not return anything. """event=awaitrequest.content()print(f"A Foo was created:{event}")@enroute.broker.event("FooUpdated.foobar")asyncdeffoo_foobar_updated(self,request:Request)->None:"""Handle the "FooUpdated.foobar" event. :param request: The ``Request`` that contains a ``Event``. :return: This method does not return anything. """event=awaitrequest.content()print(f"The 'foobar' field of a Foo was updated:{event}")@enroute.rest.query("/foos/example","GET")asyncdefexample(self,request:Request)->Response:"""Handle the example query. :param request: The ``Request`` that contains the necessary information. :return: A ``Response`` instance. """returnResponse("This is an example response!")if__name__=='__main__':launcher=EntrypointLauncher.from_config(Path(__file__).parent/"config.yml")launcher.launch()
Execute the following command to start the microservice:
python foo/main.py
Now, if a new instance is created (with a rest call, like in theprevious section), theFooCreated event will be handled and the microservice's console will print something like:
A Foo was created: Event(...)Also, to check that everything is fine the example query can be executed with:
curl --location --request GET'http://localhost:4545/foos/example'And the expected result should be something like:
"This is an example response!"Here is an example of the interaction between two microservices through a SAGA pattern. In this case, the interaction starts with a call to the"/foos/add-foobar" path and the"POST" method, which performs aSagaManager run over theADD_FOOBAR_SAGA saga. This saga has two steps, one remote that executes the"CreateFooBar" command (possibly defined on the supposed"foobar" microservice), and a local step that is executed on this microservice. TheCreateFooBarDTO defines the structure of the request to be sent when the"CreateFooBar" command is executed.
# foo/main.pyfromminos.commonimportModelTypefromminos.cqrsimportCommandServicefromminos.networksimportenroute,Requestfromminos.sagaimportSaga,SagaContext,SagaRequest,SagaResponseclassFooCommandService(CommandService):"""Foo Command Service class."""@enroute.rest.command("/foos/add-foobar","POST")asyncdefupdate_foo(self,request:Request)->None:"""Run a saga example. :param request: The ``Request`` that contains the initial saga's context. :return: This method does not return anything. """content=awaitrequest.content()context=SagaContext(uuid=content["uuid"],something=content["something"])awaitself.saga_manager.run(ADD_FOOBAR_SAGA,context)def_create_foobar(context:SagaContext)->SagaRequest:something=context["something"]content=CreateFooBarDTO(56,something)returnSagaRequest("CreateFooBar",content)asyncdef_success_foobar(context:SagaContext,response:SagaResponse)->SagaContext:context["foobar_uuid"]=awaitresponse.content()returncontextasyncdef_error_foobar(context:SagaContext,response:SagaResponse)->SagaContext:raiseValueError("The foobar could not be created!")asyncdef_update_foo(context:SagaContext)->None:awaitFooAggregate.update_foobar(context["uuid"],context["foobar_uuid"])CreateFooBarDTO=ModelType.build("AnotherDTO", {"number":int,"text":str})ADD_FOOBAR_SAGA= (Saga() .remote_step() .on_execute(_create_foobar) .on_success(_success_foobar) .on_error(_error_foobar) .local_step() .on_execute(_update_foo) .commit())
Click to show the full file
# foo/main.pyfrom __future__importannotationsfrompathlibimportPathfromtypingimportOptionalfromuuidimportUUIDfromminos.aggregateimportAggregate,RootEntity,ExternalEntity,Reffromminos.commonimportModelType,EntrypointLauncherfromminos.cqrsimportCommandService,QueryServicefromminos.networksimportRequest,Response,enroutefromminos.sagaimportSaga,SagaContext,SagaRequest,SagaResponseclassFoo(RootEntity):"""Foo RootEntity class."""bar:strfoobar:Optional[Ref[FooBar]]classFooBar(ExternalEntity):"""FooBar ExternalEntity clas."""something:strclassFooAggregate(Aggregate[Foo]):"""Foo Aggregate class."""@staticmethodasyncdefcreate_foo(bar:str)->UUID:"""Create a new Foo instance :param bar: The bar of the new instance. :return: The identifier of the new instance. """foo=awaitFoo.create(bar)returnfoo.uuid@staticmethodasyncdefupdate_foobar(uuid:UUID,foobar:Optional[Ref[FooBar]])->None:"""Update the foobar attribute of the ``Foo`` instance. :param uuid: The identifier of the ``Foo`` instance. :param foobar: The foobar value to be set. :return: This method does not return anything. """foo=awaitFoo.get(uuid)foo.foobar=foobarawaitfoo.save()classFooCommandService(CommandService):"""Foo Command Service class."""@enroute.broker.command("CreateFoo")@enroute.rest.command("/foos","POST")asyncdefcreate_foo(self,request:Request)->Response:"""Create a new Foo. :param request: The ``Request`` that contains the ``bar`` attribute. :return: A ``Response`` containing identifier of the already created instance. """content=awaitrequest.content()bar=content["bar"]uuid=awaitFooAggregate.create_foo(bar)returnResponse({"uuid":uuid})@enroute.rest.command("/foos/add-foobar","POST")asyncdefupdate_foo(self,request:Request)->None:"""Run a saga example. :param request: The ``Request`` that contains the initial saga's context. :return: This method does not return anything. """content=awaitrequest.content()context=SagaContext(uuid=content["uuid"],something=content["something"])awaitself.saga_manager.run(ADD_FOOBAR_SAGA,context)def_create_foobar(context:SagaContext)->SagaRequest:something=context["something"]content=CreateFooBarDTO(56,something)returnSagaRequest("CreateFooBar",content)asyncdef_success_foobar(context:SagaContext,response:SagaResponse)->SagaContext:context["foobar_uuid"]=awaitresponse.content()returncontextasyncdef_error_foobar(context:SagaContext,response:SagaResponse)->SagaContext:raiseValueError("The foobar could not be created!")asyncdef_update_foo(context:SagaContext)->None:awaitFooAggregate.update_foobar(context["uuid"],context["foobar_uuid"])CreateFooBarDTO=ModelType.build("AnotherDTO", {"number":int,"text":str})ADD_FOOBAR_SAGA= (Saga() .remote_step() .on_execute(_create_foobar) .on_success(_success_foobar) .on_error(_error_foobar) .local_step() .on_execute(_update_foo) .commit())classFooQueryService(QueryService):"""Foo Query Service class."""@enroute.broker.event("FooCreated")asyncdeffoo_created(self,request:Request)->None:"""Handle the "FooCreated" event. :param request: The ``Request`` that contains a ``Event``. :return: This method does not return anything. """event=awaitrequest.content()print(f"A Foo was created:{event}")@enroute.broker.event("FooUpdated.foobar")asyncdeffoo_foobar_updated(self,request:Request)->None:"""Handle the "FooUpdated.foobar" event. :param request: The ``Request`` that contains a ``Event``. :return: This method does not return anything. """event=awaitrequest.content()print(f"The 'foobar' field of a Foo was updated:{event}")@enroute.rest.query("/foos/example","GET")asyncdefexample(self,request:Request)->Response:"""Handle the example query. :param request: The ``Request`` that contains the necessary information. :return: A ``Response`` instance. """returnResponse("This is an example response!")if__name__=='__main__':launcher=EntrypointLauncher.from_config(Path(__file__).parent/"config.yml")launcher.launch()
Execute the following command to start thefoo microservice:
python foo/main.py
Disclaimer: Note that in this case another microservice is needed to complete the saga.
Thefoobar microservice will simply have aCreateFooBar command to create new instances of itsFooBar root entity.
The directory structure will become:
.├── foo│ ├── config.yml│ └── main.py└── foobar ├── config.yml └── main.pyHere is thefoobar/config.yml config file:
Click to show the full file
# foobar/config.ymlversion:2name:foobaraggregate:entities: -main.FooBarrepositories:transaction:minos.aggregate.PostgreSqlTransactionRepositoryevent:minos.aggregate.PostgreSqlEventRepositorysnapshot:minos.aggregate.PostgreSqlSnapshotRepositorydatabases:default:database:foobar_dbuser:minospassword:min0ssaga:path:"./foobar.lmdb"interfaces:broker:port:minos.networks.BrokerPortpublisher:client:minos.plugins.kafka.KafkaBrokerPublisherqueue:minos.networks.PostgreSqlBrokerPublisherQueuesubscriber:client:minos.plugins.kafka.KafkaBrokerSubscriberqueue:minos.networks.PostgreSqlBrokerSubscriberQueuevalidator:minos.networks.PostgreSqlBrokerSubscriberDuplicateValidatorhttp:port:minos.networks.HttpPortconnector:client:minos.plugins.aiohttp.AioHttpConnectorport:4546periodic:port:minos.networks.PeriodicPortpools:lock:minos.common.PostgreSqlLockPooldatabasse:minos.common.PostgreSqlPoolbroker:minos.networks.BrokerClientPoolsaga:manager:minos.saga.SagaManagerrouters: -minos.networks.BrokerRouter -minos.networks.PeriodicRouter -minos.networks.RestHttpRoutermiddleware: -minos.saga.transactional_commandservices: -minos.networks.SystemService -minos.aggregate.TransactionService -minos.aggregate.SnapshotService -minos.saga.SagaService -main.FooBarCommandService
Here is thefoobar/main.py source file:
Click to show the full file
from __future__importannotationsfrompathlibimportPathfromuuidimportUUIDfromminos.aggregateimportAggregate,RootEntityfromminos.commonimportEntrypointLauncherfromminos.cqrsimportCommandServicefromminos.networksimportRequest,Response,enrouteclassFooBar(RootEntity):"""FooBar Root Entity clas."""something:strclassFooBarAggregate(Aggregate[FooBar]):"""FooBar Aggregate class."""@staticmethodasyncdefcreate_foobar(something:str)->UUID:"""Create a new ``FooBar`` instance. :param something: The something attribute. :return: The identifier of the new instance. """foobar=awaitFooBar.create(something)returnfoobar.uuidclassFooBarCommandService(CommandService):"""Foo Command Service class."""@enroute.broker.command("CreateFooBar")asyncdefcreate_foobar(self,request:Request)->Response:"""Create a new FooBar. :param request: The ``Request`` that contains the ``something`` attribute. :return: A ``Response`` containing identifier of the already created instance. """content=awaitrequest.content()something=content["text"]uuid=awaitFooBarAggregate.create_foobar(something)returnResponse(uuid)if__name__=='__main__':launcher=EntrypointLauncher.from_config(Path(__file__).parent/"config.yml")launcher.launch()
Execute the following command to start thefoobar microservice:
python foobar/main.py
To check that everything works fine, execute the following command:
curl --location --request POST'http://localhost:4545/foos/add-foobar' \--header'Content-Type: application/json' \--data-raw'{ "uuid": "YOUR_UUID", "something": "something"}'
This request will start a new Saga, that sends a command to thefoobar microservice, retrieve theFooBar identifier and update theFoo instance. After that, theFooQueryService will handle the update event and print a message similar to this one on the console.
The 'foobar' field of a Foo was updated: Event(...)This project follows a modular structure based on python packages.
The core packages provide the base implementation of the framework.
- minos-microservice-aggregate: The Aggregate pattern implementation.
- minos-microservice-common: The common core package.
- minos-microservice-cqrs: The CQRS pattern implementation.
- minos-microservice-networks: The networks core package.
- minos-microservice-saga: The SAGA pattern implementation.
The plugin packages provide connectors to external technologies like brokers, discovery services, databases, serializers and so on.
- minos-broker-kafka: The
kafkaplugin package. - minos-broker-rabbitmq: The
rabbitmqplugin package. - minos-discovery-minos: The
minos-discoveryplugin package. - minos-http-aiohttp: The
aiohttpplugin package. - minos-router-graphql: The
grapqhlplugin package.
The source code of this project is hosted at thisGitHub Repository.
For usage questions, the best place to go to isStackOverflow.
Most development discussions take place over thisGitHub Issues. In addition, aGitter channel is available for development-related questions.
We are looking forward to having your contributions. No matter whether it is a pull request with new features, or the creation of an issue related to a bug you have found.
Please consider these guidelines before you submit any modification.
- If you happen to find a bug, please file a new issue filling the 'Bug report' template.
- Set the appropriate labels, so we can categorise it easily.
- Wait for any core developer's feedback on it.
- Create an issue following the previous steps.
- Fork the project.
- Push your changes to a local branch.
- Run the tests!
- Submit a pull request from your fork's branch.
This project is distributed under theMIT license.
About
🐍 Minos is a framework which helps you create reactive microservices in Python
Topics
Resources
License
Code of conduct
Contributing
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Uh oh!
There was an error while loading.Please reload this page.
Contributors7
Uh oh!
There was an error while loading.Please reload this page.
