Asyncio daemon tutorial

This tutorial shows how to build anasyncio daemon following the dependency injectionprinciple.

In this tutorial we will use:

  • Python 3

  • Docker

  • Docker Compose

Start from the scratch or jump to the section:

You can find complete project on theGithub.

What are we going to build?

We will build a monitoring daemon that monitors web services availability.

The daemon will send the requests to theexample.com andhttpbin.org every couple of seconds. For each successfully completedresponse it will log:

  • The response code

  • The amount of bytes in the response

  • The time took to complete the response

../_images/diagram.png

Prerequisites

We will usedocker compose in this tutorial. Let’s check the versions:

docker--versiondockercomposeversion

The output should look something like:

Dockerversion27.3.1,buildce12230DockerComposeversionv2.29.7

Note

If you don’t haveDocker ordockercompose you need to install them before proceeding.Follow these installation guides:

The prerequisites are satisfied. Let’s get started with the project layout.

Project layout

Create the project root folder and set it as a working directory:

mkdirasyncio-daemon-tutorialcdasyncio-daemon-tutorial

Now we need to create the initial project structure. Create the files and folders following nextlayout. All files should be empty for now. We will fill them later.

Initial project layout:

./├──monitoringdaemon/│├──__init__.py│├──__main__.py│└──containers.py├──config.yml├──docker-compose.yml├──Dockerfile└──requirements.txt

Initial project layout is ready. We will extend it in the next sections.

Let’s proceed to the environment preparation.

Prepare the environment

In this section we are going to prepare the environment for running our daemon.

First we need to specify the project requirements. We will use next packages:

  • dependency-injector - the dependency injection framework

  • aiohttp - the web framework (we need only http client)

  • pyyaml - the YAML files parsing library, used for the reading of the configuration files

  • pytest - the test framework

  • pytest-asyncio - the helper library for the testing of theasyncio application

  • pytest-cov - the helper library for measuring the test coverage

Put next lines into therequirements.txt file:

dependency-injectoraiohttppyyamlpytestpytest-asynciopytest-cov

Second, we need to create theDockerfile. It will describe the daemon’s build process andspecify how to run it. We will usepython:3.13-bookworm as a base image.

Put next lines into theDockerfile file:

FROMpython:3.13-bookwormENVPYTHONUNBUFFERED=1WORKDIR/codeCOPY./code/RUNapt-getinstallopenssl\&&pipinstall--upgradepip\&&pipinstall-rrequirements.txt\&&rm-rf~/.cacheCMD["python","-m","monitoringdaemon"]

Third, we need to define the container in the docker-compose configuration.

Put next lines into thedocker-compose.yml file:

services:monitor:build:./image:monitoring-daemonvolumes:-"./:/code"

All is ready. Let’s check that the environment is setup properly.

Run in the terminal:

dockercomposebuild

The build process may take a couple of minutes. You should see something like this in the end:

Successfullybuilt5b4ee5e76e35Successfullytaggedmonitoring-daemon:latest

After the build is done run the container:

dockercomposeup

The output should look like:

Creatingnetwork"asyncio-daemon-tutorial_default"withthedefaultdriverCreatingasyncio-daemon-tutorial_monitor_1...doneAttachingtoasyncio-daemon-tutorial_monitor_1asyncio-daemon-tutorial_monitor_1exitedwithcode0

The environment is ready. The application does not do any work and just exits with a code0.

Next step is to configure the logging and configuration file parsing.

Logging and configuration

In this section we will configure the logging and configuration file parsing.

Let’s start with the the main part of our application – the container. Container will keep all ofthe application components and their dependencies.

First two components that we’re going to add are the configuration provider and the resource providerfor configuring the logging.

Put next lines into thecontainers.py file:

"""Containers module."""importloggingimportsysfromdependency_injectorimportcontainers,providersclassContainer(containers.DeclarativeContainer):config=providers.Configuration(yaml_files=["config.yml"])logging=providers.Resource(logging.basicConfig,stream=sys.stdout,level=config.log.level,format=config.log.format,)

The configuration file will keep the logging settings. Put next lines into theconfig.yml file:

log:level:"INFO"format:"[%(asctime)s][%(levelname)s][%(name)s]:%(message)s"

Now let’s create the function that will run our daemon. It’s traditionally calledmain().Themain() function will start the dispatcher, but we will keep it empty for now.We will create the container instance before callingmain() inif__name__=="__main__".Container instance will parseconfig.yml and then we will call the logging configuration provider.

Put next lines into the__main__.py file:

"""Main module."""from.containersimportContainerdefmain()->None:...if__name__=="__main__":container=Container()container.init_resources()main()

Note

Container is the first object in the application.

Logging and configuration parsing part is done. In next section we will create the monitoringchecks dispatcher.

Dispatcher

Now let’s add the monitoring checks dispatcher.

The dispatcher will control a list of the monitoring tasks. It will execute each task accordingto the configured schedule. TheMonitor class is the base class for all the monitors. You cancreate different monitors by subclassing it and implementing thecheck() method.

../_images/classes-01.png

Let’s create dispatcher and the monitor base classes.

Createdispatcher.py andmonitors.py in themonitoringdaemon package:

./├──monitoringdaemon/│├──__init__.py│├──__main__.py│├──containers.py├──dispatcher.py└──monitors.py├──config.yml├──docker-compose.yml├──Dockerfile└──requirements.txt

Put next into themonitors.py:

"""Monitors module."""importloggingclassMonitor:def__init__(self,check_every:int)->None:self.check_every=check_everyself.logger=logging.getLogger(self.__class__.__name__)asyncdefcheck(self)->None:raiseNotImplementedError()

and next into thedispatcher.py:

"""Dispatcher module."""importasyncioimportloggingimportsignalimporttimefromtypingimportListfrom.monitorsimportMonitorclassDispatcher:def__init__(self,monitors:List[Monitor])->None:self._monitors=monitorsself._monitor_tasks:List[asyncio.Task]=[]self._logger=logging.getLogger(self.__class__.__name__)self._stopping=Falsedefrun(self)->None:asyncio.run(self.start())asyncdefstart(self)->None:self._logger.info("Starting up")formonitorinself._monitors:self._monitor_tasks.append(asyncio.create_task(self._run_monitor(monitor)),)asyncio.get_event_loop().add_signal_handler(signal.SIGTERM,self.stop)asyncio.get_event_loop().add_signal_handler(signal.SIGINT,self.stop)awaitasyncio.gather(*self._monitor_tasks,return_exceptions=True)self.stop()defstop(self)->None:ifself._stopping:returnself._stopping=Trueself._logger.info("Shutting down")fortask,monitorinzip(self._monitor_tasks,self._monitors):task.cancel()self._monitor_tasks.clear()self._logger.info("Shutdown finished successfully")@staticmethodasyncdef_run_monitor(monitor:Monitor)->None:def_until_next(last:float)->float:time_took=time.time()-lastreturnmonitor.check_every-time_tookwhileTrue:time_start=time.time()try:awaitmonitor.check()exceptasyncio.CancelledError:breakexceptException:monitor.logger.exception("Error executing monitor check")awaitasyncio.sleep(_until_next(last=time_start))

Now we need to add the dispatcher to the container.

Editcontainers.py:

"""Containers module."""importloggingimportsysfromdependency_injectorimportcontainers,providersfrom.importdispatcherclassContainer(containers.DeclarativeContainer):config=providers.Configuration(yaml_files=["config.yml"])logging=providers.Resource(logging.basicConfig,stream=sys.stdout,level=config.log.level,format=config.log.format,)dispatcher=providers.Factory(dispatcher.Dispatcher,monitors=providers.List(# TODO: add monitors),)

At the last we will inject dispatcher into themain() functionand call therun() method. We will useWiring feature.

Edit__main__.py:

"""Main module."""fromdependency_injector.wiringimportProvide,injectfrom.dispatcherimportDispatcherfrom.containersimportContainer@injectdefmain(dispatcher:Dispatcher=Provide[Container.dispatcher])->None:dispatcher.run()if__name__=="__main__":container=Container()container.init_resources()container.wire(modules=[__name__])main()

Finally let’s start the daemon to check that all works.

Run in the terminal:

dockercomposeup

The output should look like:

Startingasyncio-daemon-tutorial_monitor_1...doneAttachingtoasyncio-daemon-tutorial_monitor_1monitor_1|[2020-08-0816:12:35,772][INFO][Dispatcher]:Startingupmonitor_1|[2020-08-0816:12:35,774][INFO][Dispatcher]:Shuttingdownmonitor_1|[2020-08-0816:12:35,774][INFO][Dispatcher]:Shutdownfinishedsuccessfullyasyncio-daemon-tutorial_monitor_1exitedwithcode0

Everything works properly. Dispatcher starts up and exits because there are no monitoring tasks.

By the end of this section we have the application skeleton ready. In next section will willadd first monitoring task.

Example.com monitor

In this section we will add a monitoring task that will check the availability of thehttp://example.com.

We will start from the extending of our class model with a new type of the monitoring check, theHttpMonitor.

TheHttpMonitor is a subclass of theMonitor. We will implement thecheck() method thatwill send the HTTP request to the specified URL. The http request sending will be delegated totheHttpClient.

../_images/classes-02.png

First we need to create theHttpClient.

Createhttp.py in themonitoringdaemon package:

./├──monitoringdaemon/│├──__init__.py│├──__main__.py│├──containers.py│├──dispatcher.py├──http.py└──monitors.py├──config.yml├──docker-compose.yml├──Dockerfile└──requirements.txt

and put next into it:

"""Http client module."""fromaiohttpimportClientSession,ClientTimeout,ClientResponseclassHttpClient:asyncdefrequest(self,method:str,url:str,timeout:int)->ClientResponse:asyncwithClientSession(timeout=ClientTimeout(timeout))assession:asyncwithsession.request(method,url)asresponse:returnresponse

Now we need to add theHttpClient to the container.

Editcontainers.py:

"""Containers module."""importloggingimportsysfromdependency_injectorimportcontainers,providersfrom.importhttp,dispatcherclassContainer(containers.DeclarativeContainer):config=providers.Configuration(yaml_files=["config.yml"])logging=providers.Resource(logging.basicConfig,stream=sys.stdout,level=config.log.level,format=config.log.format,)http_client=providers.Factory(http.HttpClient)dispatcher=providers.Factory(dispatcher.Dispatcher,monitors=providers.List(# TODO: add monitors),)

Now we’re ready to add theHttpMonitor. We will add it to themonitors module.

Editmonitors.py:

"""Monitors module."""importloggingimporttimefromtypingimportDict,Anyfrom.httpimportHttpClientclassMonitor:def__init__(self,check_every:int)->None:self.check_every=check_everyself.logger=logging.getLogger(self.__class__.__name__)asyncdefcheck(self)->None:raiseNotImplementedError()classHttpMonitor(Monitor):def__init__(self,http_client:HttpClient,options:Dict[str,Any],)->None:self._client=http_clientself._method=options.pop("method")self._url=options.pop("url")self._timeout=options.pop("timeout")super().__init__(check_every=options.pop("check_every"))asyncdefcheck(self)->None:time_start=time.time()response=awaitself._client.request(method=self._method,url=self._url,timeout=self._timeout,)time_end=time.time()time_took=time_end-time_startself.logger.info("Check\n""%s%s\n""    response code:%s\n""    content length:%s\n""    request took:%s seconds",self._method,self._url,response.status,response.content_length,round(time_took,3))

We have everything ready to add thehttp://example.com monitoring check.We make two changes in the container:

  • Add the factory providerexample_monitor.

  • Inject theexample_monitor into the dispatcher.

Editcontainers.py:

"""Containers module."""importloggingimportsysfromdependency_injectorimportcontainers,providersfrom.importhttp,monitors,dispatcherclassContainer(containers.DeclarativeContainer):config=providers.Configuration(yaml_files=["config.yml"])logging=providers.Resource(logging.basicConfig,stream=sys.stdout,level=config.log.level,format=config.log.format,)http_client=providers.Factory(http.HttpClient)example_monitor=providers.Factory(monitors.HttpMonitor,http_client=http_client,options=config.monitors.example,)dispatcher=providers.Factory(dispatcher.Dispatcher,monitors=providers.List(example_monitor,),)

Providerexample_monitor has a dependency on the configuration options. Let’s define theseoptions.

Editconfig.yml:

log:level:"INFO"format:"[%(asctime)s][%(levelname)s][%(name)s]:%(message)s"monitors:example:method:"GET"url:"http://example.com"timeout:5check_every:5

All set. Start the daemon to check that all works.

Run in the terminal:

dockercomposeup

You should see:

Startingasyncio-daemon-tutorial_monitor_1...doneAttachingtoasyncio-daemon-tutorial_monitor_1monitor_1|[2020-08-0817:06:41,965][INFO][Dispatcher]:Startingupmonitor_1|[2020-08-0817:06:42,033][INFO][HttpMonitor]:Checkmonitor_1|GEThttp://example.commonitor_1|responsecode:200monitor_1|contentlength:648monitor_1|requesttook:0.067secondsmonitor_1|[2020-08-0817:06:47,040][INFO][HttpMonitor]:Checkmonitor_1|GEThttp://example.commonitor_1|responsecode:200monitor_1|contentlength:648monitor_1|requesttook:0.073seconds

Our daemon can monitorhttp://example.com availability.

Let’s add a monitor for thehttps://httpbin.org.

Httpbin.org monitor

Adding of a monitor for thehttps://httpbin.org will be mucheasier because we have all the components ready. We just need to create a new providerin the container and update the configuration.

Editcontainers.py:

"""Containers module."""importloggingimportsysfromdependency_injectorimportcontainers,providersfrom.importhttp,monitors,dispatcherclassContainer(containers.DeclarativeContainer):config=providers.Configuration(yaml_files=["config.yml"])logging=providers.Resource(logging.basicConfig,stream=sys.stdout,level=config.log.level,format=config.log.format,)http_client=providers.Factory(http.HttpClient)example_monitor=providers.Factory(monitors.HttpMonitor,http_client=http_client,options=config.monitors.example,)httpbin_monitor=providers.Factory(monitors.HttpMonitor,http_client=http_client,options=config.monitors.httpbin,)dispatcher=providers.Factory(dispatcher.Dispatcher,monitors=providers.List(example_monitor,httpbin_monitor,),)

Editconfig.yml:

log:level:"INFO"format:"[%(asctime)s][%(levelname)s][%(name)s]:%(message)s"monitors:example:method:"GET"url:"http://example.com"timeout:5check_every:5httpbin:method:"GET"url:"https://httpbin.org/get"timeout:5check_every:5

Let’s start the daemon and check the logs.

Run in the terminal:

dockercomposeup

You should see:

Startingasyncio-daemon-tutorial_monitor_1...doneAttachingtoasyncio-daemon-tutorial_monitor_1monitor_1|[2020-08-0818:09:08,540][INFO][Dispatcher]:Startingupmonitor_1|[2020-08-0818:09:08,618][INFO][HttpMonitor]:Checkmonitor_1|GEThttp://example.commonitor_1|responsecode:200monitor_1|contentlength:648monitor_1|requesttook:0.077secondsmonitor_1|[2020-08-0818:09:08,722][INFO][HttpMonitor]:Checkmonitor_1|GEThttps://httpbin.org/getmonitor_1|responsecode:200monitor_1|contentlength:310monitor_1|requesttook:0.18secondsmonitor_1|[2020-08-0818:09:13,619][INFO][HttpMonitor]:Checkmonitor_1|GEThttp://example.commonitor_1|responsecode:200monitor_1|contentlength:648monitor_1|requesttook:0.066secondsmonitor_1|[2020-08-0818:09:13,681][INFO][HttpMonitor]:Checkmonitor_1|GEThttps://httpbin.org/getmonitor_1|responsecode:200monitor_1|contentlength:310monitor_1|requesttook:0.126seconds

The functional part is done. Daemon monitorshttp://example.com andhttps://httpbin.org.

In next section we will add some tests.

Tests

In this section we will add some tests.

We will usepytest andcoverage.

Createtests.py in themonitoringdaemon package:

./├──monitoringdaemon/│├──__init__.py│├──__main__.py│├──containers.py│├──dispatcher.py│├──http.py│├──monitors.py└──tests.py├──config.yml├──docker-compose.yml├──Dockerfile└──requirements.txt

and put next into it:

"""Tests module."""importasyncioimportdataclassesfromunittestimportmockimportpytestfrom.containersimportContainer@dataclasses.dataclassclassRequestStub:status:intcontent_length:int@pytest.fixturedefcontainer():returnContainer(config={"log":{"level":"INFO","formant":"[%(asctime)s] [%(levelname)s] [%(name)s]:%(message)s",},"monitors":{"example":{"method":"GET","url":"http://fake-example.com","timeout":1,"check_every":1,},"httpbin":{"method":"GET","url":"https://fake-httpbin.org/get","timeout":1,"check_every":1,},},})@pytest.mark.asyncioasyncdeftest_example_monitor(container,caplog):caplog.set_level("INFO")http_client_mock=mock.AsyncMock()http_client_mock.request.return_value=RequestStub(status=200,content_length=635,)withcontainer.http_client.override(http_client_mock):example_monitor=container.example_monitor()awaitexample_monitor.check()assert"http://fake-example.com"incaplog.textassert"response code: 200"incaplog.textassert"content length: 635"incaplog.text@pytest.mark.asyncioasyncdeftest_dispatcher(container,caplog,event_loop):caplog.set_level("INFO")example_monitor_mock=mock.AsyncMock()httpbin_monitor_mock=mock.AsyncMock()withcontainer.override_providers(example_monitor=example_monitor_mock,httpbin_monitor=httpbin_monitor_mock,):dispatcher=container.dispatcher()event_loop.create_task(dispatcher.start())awaitasyncio.sleep(0.1)dispatcher.stop()assertexample_monitor_mock.check.calledasserthttpbin_monitor_mock.check.called

Run in the terminal:

dockercomposerun--rmmonitorpy.testmonitoringdaemon/tests.py--cov=monitoringdaemon

You should see:

platformlinux--Python3.13.1,pytest-8.3.4,pluggy-1.5.0rootdir:/codeplugins:cov-6.0.0,asyncio-0.24.0asyncio:mode=Mode.STRICT,default_loop_scope=Nonecollected2itemsmonitoringdaemon/tests.py..[100%]----------coverage:platformlinux,python3.10.0-final-0-----------NameStmtsMissCover----------------------------------------------------monitoringdaemon/__init__.py00100%monitoringdaemon/__main__.py11110%monitoringdaemon/containers.py110100%monitoringdaemon/dispatcher.py45589%monitoringdaemon/http.py6350%monitoringdaemon/monitors.py23196%monitoringdaemon/tests.py350100%----------------------------------------------------TOTAL1312085%

Note

Take a look at the highlights in thetests.py.

In thetest_example_monitor it emphasizes the overriding of theHttpClient. The realHTTP calls are mocked.

In thetest_dispatcher we override both monitors with the mocks.

Conclusion

In this tutorial we’ve built anasyncio monitoring daemon following the dependencyinjection principle.We’ve used theDependencyInjector as a dependency injection framework.

With a help ofContainers andProviders we have defined how to assemble application components.

List provider helped to inject a list of monitors into dispatcher.Configuration provider helped to deal with reading YAML file.

We usedWiring feature to inject dispatcher into themain() function.Provider overriding feature helped in testing.

We kept all the dependencies injected explicitly. This will help when you need to add orchange something in future.

You can find complete project on theGithub.

What’s next?

Sponsor the project on GitHub: