Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

A Python RESTful API using FastAPI with a Kafka Consumer

NotificationsYou must be signed in to change notification settings

pedrodeoliveira/fastapi-kafka-consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This project shows how to use aKafka Consumer inside a Python Web API built usingFastAPI. This can be very useful for use cases where one is building a Web API thatneeds to have some state, and that the state is updated by receiving a message from amessage broker (in this case Kafka).

One example of this could be a ML Web API, that is serving requests for performinginferences according to thecurrent model. Often, this model will be updated over timeas the model gets retrained according to a given schedule or after model/data drift jobhas been executed. Therefore, one solution could be to notify the Web API by sending amessage with either the new model or some metadata that will enable the API to fetch thenew model.

Technologies

The implementation was done inpython>=3.7 using the web frameworkfastapi, and forinteracting withKafka theaiokafka library was chosen. The latter fits very wellwithin anasync framework likefastapi is.

How to Run

The first step will be to haveKafka broker and zookeeper running, by default the bootstrap server is expected to be running onlocalhost:9092. This can be changed using theenvironment variableKAFKA_BOOTSTRAP_SERVERS.

Next, the following environment variableKAFKA_TOPIC should be defined with desired for the topic used to send messages.

$export KAFKA_TOPIC=<my_topic>

The topic can be created using the command line, or it can be automatically created bythe consumer inside the Web API.

Start the Web API by running:

$ python main.py

Finally, send a message by running theproducer.py:

$ python producer.py
Result
```Sending message with value: {'message_id': '4142', 'text': 'some text', 'state': 96}```

The producer sends a message with a fieldstate that is used in this demonstration forshowing how the state of the Web API can be updated. One can confirm that the state of theWeb API is being updated by performing aGET request on the/state endpoint.

curl http://localhost:8000/state
Result before sending message
```{"state":0}```
Result after sending message
```{"state":23}```    The actual value will vary given it's a random number.

Consumer Initialization

During the initialization process of the Consumer the log end offset is checked to determine whether there are messages already in the topic. If so, the consumer willseek to this offset so that it can read the last committed message in this topic.

This is useful to guarantee that the consumer does not miss on previously published messages, either because they were published before the consumer was up, or because the Web API has been down for some time. For this use case, we consider that only the most recentstate matters, and thereby, we only care about the last committed message.

Each instance of the Web API will have it's own consumer group (they share the same group name prefix + a random id), so that each instance of the API receives the samestate updates.

About

A Python RESTful API using FastAPI with a Kafka Consumer

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp