Before we get started, have you tried our newPython Code Assistant? It's like having an expert coder at your fingertips. Check it out!
A webhook can be thought of as a type ofAPI that is driven by events rather than requests. Instead of one application making a request to another to receive a response, a webhook is a service that allows one program to send data to another as soon as a particular event takes place.
Webhooks are sometimes referred to as reverse APIs because communication is initiated by the application sending the data rather than the one receiving it. With web services becoming increasingly interconnected, webhooks are seeing more action as a lightweight solution for enabling real-time notifications and data updates without the need to develop a full-scale API.
Webhooks usually act as messengers for smaller data. They help in sending messages, alerts, notifications and real-time information from the server-side application to the client-side application.
Let’s say for instance, you want your application to get notified when tweets that mention a certain account and contain a specific hashtag are published. Instead of your application continuously asking Twitter for new posts meeting these criteria, it makes much more sense for Twitter to send a notification to your application only when such an event takes place.
This is the purpose of a webhook instead of having to repeatedly request the data (polling mechanism), the receiving application can sit back and get what it needs without having to send repeated requests to another system.
Webhooks can open up a lot of possibilities:
In this tutorial, we will lay the groundwork for a streaming application based on webhooks and encompassing several components:
We will leverage several components likeRedis,Flask,SocketIO, andChartJS to develop nice-looking visualization tool for the aforementioned components.
Pre-requisitesAs our requirements stand, the following components come into play:
If this tutorial intrigues you and makes you want to dive into the code immediately, you can checkthis repository for reviewing the code used in this article.
Related:Asynchronous Tasks with Celery in Python.
Setting up the package is quite simple and straightforward. Of course you need Python 3 installed on your system and it is highly recommended to setup a virtual environment where we will install the needed libraries:
$ pip install Faker==8.2.0 Flask==1.1.2 Flask-SocketIO==5.0.1 redis==3.5.3 requests==2.25.1At the end of this tutorial, our folder structure will look like the following:
Let's start writing the actual code. First, let's define the configuration parameters for our application withinconfig.py:
#Application configuration File#################################Secret key that will be used by Flask for securely signing the session cookie# and can be used for other security related needsSECRET_KEY = 'SECRET_KEY'########################################Minimum Number Of Tasks To GenerateMIN_NBR_TASKS = 1#Maximum Number Of Tasks To GenerateMAX_NBR_TASKS = 100#Time to wait when producing tasksWAIT_TIME = 1#Webhook endpoint Mapping to the listenerWEBHOOK_RECEIVER_URL = 'http://localhost:5001/consumetasks'########################################Map to the REDIS Server PortBROKER_URL = 'redis://localhost:6379'#######################################Next, creating an initialization file for our tasks and webhooks producer ininit_producer.py:
# init_producer.pyfrom flask import Flask#Create a Flask instanceapp = Flask(__name__)#Load Flask configurations from config.pyapp.secret_key = app.config['SECRET_KEY']app.config.from_object("config")Now let's write code necessary for producing tasks usingFaker module:
# tasks_producer.pyimport randomfrom faker.providers import BaseProviderfrom faker import Fakerimport configimport timeimport requestsimport jsonimport uuid# Define a TaskProviderclass TaskProvider(BaseProvider): def task_priority(self): severity_levels = [ 'Low', 'Moderate', 'Major', 'Critical' ] return severity_levels[random.randint(0, len(severity_levels)-1)]# Create a Faker instance and seeding to have the same results every time we execute the script# Return data in EnglishfakeTasks = Faker('en_US')# Seed the Faker instance to have the same results every time we run the programfakeTasks.seed_instance(0)# Assign the TaskProvider to the Faker instancefakeTasks.add_provider(TaskProvider)# Generate A Fake Taskdef produce_task(batchid, taskid): # Message composition message = { 'batchid': batchid, 'id': taskid, 'owner': fakeTasks.unique.name(), 'priority': fakeTasks.task_priority() # ,'raised_date':fakeTasks.date_time_this_year() # ,'description':fakeTasks.text() } return messagedef send_webhook(msg): """ Send a webhook to a specified URL :param msg: task details :return: """ try: # Post a webhook message # default is a function applied to objects that are not serializable = it converts them to str resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=json.dumps( msg, sort_keys=True, default=str), headers={'Content-Type': 'application/json'}, timeout=1.0) # Returns an HTTPError if an error has occurred during the process (used for debugging). resp.raise_for_status() except requests.exceptions.HTTPError as err: #print("An HTTP Error occurred",repr(err)) pass except requests.exceptions.ConnectionError as err: #print("An Error Connecting to the API occurred", repr(err)) pass except requests.exceptions.Timeout as err: #print("A Timeout Error occurred", repr(err)) pass except requests.exceptions.RequestException as err: #print("An Unknown Error occurred", repr(err)) pass except: pass else: return resp.status_code# Generate A Bunch Of Fake Tasksdef produce_bunch_tasks(): """ Generate a Bunch of Fake Tasks """ n = random.randint(config.MIN_NBR_TASKS, config.MAX_NBR_TASKS) batchid = str(uuid.uuid4()) for i in range(n): msg = produce_task(batchid, i) resp = send_webhook(msg) time.sleep(config.WAIT_TIME) print(i, "out of ", n, " -- Status", resp, " -- Message = ", msg) yield resp, n, msgif __name__ == "__main__": for resp, total, msg in produce_bunch_tasks(): passThe above code leverages theFaker module in order to create a stream of fictitious randomized tasks and to send for each produced task a webhook to the endpointWEBHOOK_RECEIVER_URL previously defined in our configuration fileconfig.py.
The number of tasks generated in each batch will be a random number controlled by the thresholdsMIN_NBR_TASKS andMAX_NBR_TASKS defined inconfig.py.
The webhook JSON message is composed of the following attributes: batchid, taskid, owner and priority.
Each batch of tasks generated will be identified by a unique reference calledbatchid.
The task priority will be limited to pre-selected options: Low, Moderate, High and Critical.
The primary use of the above code isproduce_bunch_tasks() function, which is a generator yielding the following:
Before digging further, let's test ourtasks_producer.py program:
$ python tasks_producer.pyYou should see an output similar to the following:
Now let's build our Flask app that emulates a service producing tasks:
#app_producer.pyfrom flask import Response, render_templatefrom init_producer import appimport tasks_producerdef stream_template(template_name, **context): app.update_template_context(context) t = app.jinja_env.get_template(template_name) rv = t.stream(context) rv.enable_buffering(5) return rv@app.route("/", methods=['GET'])def index(): return render_template('producer.html')@app.route('/producetasks', methods=['POST'])def producetasks(): print("producetasks") return Response(stream_template('producer.html', data= tasks_producer.produce_bunch_tasks() ))if __name__ == "__main__": app.run(host="localhost",port=5000, debug=True)Within this flask app, we defined two main routes:
"/": Renders the template web page (producer.html)"/producetasks": Calls the functionproduce_bunch_tasks() and stream the flow of tasks generated to the Flask application.The server sentsServer-Sent Events (SSEs), which are a type of server push mechanism, where a client receives a notification whenever a new event occurs on the server.
Next, we will define the templateproducer.html file:
<!doctype html><html> <head> <title>Tasks Producer</title> <style> .content { width: 100%; } .container{ max-width: none; } </style> <meta name="viewport" content="width=device-width, initial-scale=1.0"/> </head><body> <div> <form method='post' action = "/producetasks"> <button type="submit">Produce Tasks</button> </form> </div> <div> <div></div> {% for rsp,total, msg in data: %} <script> var rsp = "{{ rsp }}"; var total = "{{ total }}"; var msg = "{{ msg }}"; var lineidx = "{{ loop.index }}"; //If the webhook request succeeds color it in blue else in red. if (rsp == '200') { rsp = rsp.fontcolor("blue"); } else { rsp = rsp.fontcolor("red"); } //Add the details of the generated task to the Messages section. document.getElementById('Messages').innerHTML += "<br>" + lineidx + " out of " + total + " -- "+ rsp + " -- " + msg; </script> {% endfor %} </div></body></html>Three variables are passed to this template file:
total: representing the total number of tasks produced.status: representing the status of the dispatched webhook.msg: the webhook JSON message.The template file contains a Javascript enabling to iterate throughout the received stream and to display the webhooks/tasks as they're received.
Now that our program is ready, let's test it out and check the output generated:
$ python app_producer.pyAccess the linkhttp://localhost:5000 where Flask instance is running, press on the buttonProduce Tasks and you will see a continuous stream of randomized tasks automatically generated as shown in the following screen:

You will notice that the response status of the dispatched webhook is equal toNone, and displayed in red signaling the failure to reach its destination. Later on when we activate the tasks consumer, you will outline that the response status of the dispatched webhook is equal to200 and displayed in blue signaling the success to reach the webhook endpoint.
Now, let's create the initialization file for our tasks consumer/handler:
# init_consumer.pyfrom flask import Flask#Create a Flask instanceapp = Flask(__name__)#Load Flask configurations from config.pyapp.secret_key = app.config['SECRET_KEY']app.config.from_object("config")#Setup the Flask SocketIO integration while mapping the Redis Server.from flask_socketio import SocketIOsocketio = SocketIO(app,logger=True,engineio_logger=True,message_queue=app.config['BROKER_URL'])Next, let's build a Flask app for handling the dispatched webhooks/tasks. The first step to handling webhooks is to build a custom endpoint. This endpoint needs to expect data through a POST request, and confirm the successful receipt of that data:
#app_consumer.pyfrom flask import render_template, request,sessionfrom flask_socketio import join_roomfrom init_consumer import app, socketioimport jsonimport uuid#Render the assigned template file@app.route("/", methods=['GET'])def index(): return render_template('consumer.html')# Sending Message through the websocketdef send_message(event, namespace, room, message): # print("Message = ", message) socketio.emit(event, message, namespace=namespace, room=room)# Registers a function to be run before the first request to this instance of the application# Create a unique session ID and store it within the application configuration file@app.before_first_requestdef initialize_params(): if not hasattr(app.config,'uid'): sid = str(uuid.uuid4()) app.config['uid'] = sid print("initialize_params - Session ID stored =", sid)# Receive the webhooks and emit websocket events@app.route('/consumetasks', methods=['POST'])def consumetasks(): if request.method == 'POST': data = request.json if data: print("Received Data = ", data) roomid = app.config['uid'] var = json.dumps(data) send_message(event='msg', namespace='/collectHooks', room=roomid, message=var) return 'OK'#Execute on connecting@socketio.on('connect', namespace='/collectHooks')def socket_connect(): # Display message upon connecting to the namespace print('Client Connected To NameSpace /collectHooks - ', request.sid)#Execute on disconnecting@socketio.on('disconnect', namespace='/collectHooks')def socket_connect(): # Display message upon disconnecting from the namespace print('Client disconnected From NameSpace /collectHooks - ', request.sid)#Execute upon joining a specific room@socketio.on('join_room', namespace='/collectHooks')def on_room(): if app.config['uid']: room = str(app.config['uid']) # Display message upon joining a room specific to the session previously stored. print(f"Socket joining room {room}") join_room(room)#Execute upon encountering any error related to the websocket@socketio.on_error_defaultdef error_handler(e): # Display message on error. print(f"socket error: {e}, {str(request.event)}")#Run using port 5001if __name__ == "__main__": socketio.run(app,host='localhost', port=5001,debug=True)In brief, we performed the following:
@app.before_first_request that ran once before the very first request to the app and is ignored on subsequent requests. Within this function we created a unique session ID and store it within the configuration file, this unique session ID will serve to allocate an exclusive room for each user when dealing with the web socket communication."/consumetasks" which expectsJSON data through POST requests and once received, it emits a web socket event concurrently./collectHooks for the namespace (namespaces are used to separate server logic over a single shared connection).After all this build up, let's code the frontend for our web app, createconsumer.html in thetemplates folder and copy the following code:
<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>Tasks Consumer</title> <link rel="stylesheet" href="{{url_for('static',filename='css/bootstrap.min.css')}}"> <link rel="stylesheet" href="{{url_for('static',filename='css/Chart.min.css')}}"></head><body> <div> <div></div> </div> <div> <div> <div> <div> <div> <canvas></canvas> </div> </div> </div> </div> </div> <!-- import the jquery library --> <script src="{{ url_for('static',filename='js/jquery.min.js') }}"></script> <!-- import the socket.io library --> <script src="{{ url_for('static',filename='js/socket.io.js') }}"></script> <!-- import the bootstrap library --> <script src="{{ url_for('static',filename='js/bootstrap.min.js') }}"></script> <!-- import the Chart library --> <script src="{{ url_for('static',filename='js/Chart.min.js') }}"></script><script> $(document).ready(function(){ const config = { //Type of the chart - Bar Chart type: 'bar', //Data for our chart data: { labels: ['Low','Moderate','Major','Critical'], datasets: [{ label: "Count Of Tasks", //Setting a color for each bar backgroundColor: ['green','blue','yellow','red'], borderColor: 'rgb(255, 99, 132)', data: [0,0,0,0], fill: false, }], }, //Configuration options options: { responsive: true, title: { display: true, text: 'Tasks Priority Matrix' }, tooltips: { mode: 'index', intersect: false, }, hover: { mode: 'nearest', intersect: true }, scales: { xAxes: [{ display: true, scaleLabel: { display: true, labelString: 'Priority' } }], yAxes: [{ display: true ,ticks: { beginAtZero: true } ,scaleLabel: { display: true, labelString: 'Total' } }] } } }; const context = document.getElementById('canvas').getContext('2d'); //Creating the bar chart const lineChart = new Chart(context, config); //Reserved for websocket manipulation var namespace='/collectHooks'; var url = 'http://' + document.domain + ':' + location.port + namespace; var socket = io.connect(url); //When connecting to the socket join the room socket.on('connect', function() { socket.emit('join_room'); }); //When receiving a message socket.on('msg' , function(data) { var msg = JSON.parse(data); var newLine = $('<li>'+ 'Batch ID. = ' + msg.batchid + ' -- Task ID. = ' + msg.id + ' -- Owner = ' + msg.owner + ' -- Priority = ' + msg.priority +'</li>'); newLine.css("color","blue"); $("#Messages").append(newLine); //Retrieve the index of the priority of the received message var lindex = config.data.labels.indexOf(msg.priority); //Increment the value of the priority of the received message config.data.datasets[0].data[lindex] += 1; //Update the chart lineChart.update(); }); });</script></body></html>The above template include the following:
Now let's test our program, please proceed as per the following steps:
app_producer.py:$ python app_producer.pyapp_consumer.py:$ python app_consumer.pyhttp://localhost:5000 link to visualize the tasks producer:
Press on theProduce Tasks button and a batch of tasks will be automatically generated and displayed gradually on the screen as shown below:
Now open another tab in your browser and accesshttp://localhost:5001 in order to visualize the tasks, consumer, the tasks will appear gradually in the messages section, and the bar chart will get updated automatically whenever a webhook is received:
When hovering the mouse over any of the chart bars, a tooltip showing the total number of tasks is displayed:

Webhooks are an important part of the web and they are becoming more popular. They allow your applications to exchange data instantly and seamlessly.
While webhooks are similar to APIs, they both play different roles, each with its own unique use case. Hopefully, this article has expanded your understanding, and remember that the key to getting the most out of webhooks is to know when they are the right choice for your application.
Learn also:Detecting Fraudulent Transactions in a Streaming App using Kafka in Python
Happy coding ♥
Just finished the article? Why not take your Python skills a notch higher with ourPython Code Assistant? Check it out!
View Full Code Build My Python CodeGot a coding query or need some guidance before you comment? Check out thisPython Code Assistant for expert advice and handy tips. It's like having a coding tutor right in your fingertips!
