settings.py
# URL for our broker used for connecting to the Kafka clusterKAFKA_BROKER_URL = "localhost:9092"# name of the topic hosting the transactions to be processed and requiring processingTRANSACTIONS_TOPIC = "queuing.transactions"# these 2 variables will control the amount of transactions automatically generatedTRANSACTIONS_PER_SECOND = float("2.0")SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND# name of the topic hosting the legitimate transactionsLEGIT_TOPIC = "queuing.legit"# name of the topic hosting the suspicious transactionsFRAUD_TOPIC = "queuing.fraud"
transactions.py
from random import choices, randintfrom string import ascii_letters, digitsaccount_chars: str = digits + ascii_lettersdef _random_account_id() -> str: """Return a random account number made of 12 characters""" return "".join(choices(account_chars,k=12))def _random_amount() -> float: """Return a random amount between 1.00 and 1000.00""" return randint(100,1000000)/100def create_random_transaction() -> dict: """Create a fake randomised transaction.""" return { "source":_random_account_id() ,"target":_random_account_id() ,"amount":_random_amount() ,"currency":"EUR" }
producer.py
import osimport jsonfrom time import sleepfrom kafka import KafkaProducer# import initialization parametersfrom settings import *from transactions import create_random_transactionif __name__ == "__main__": producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL #Encode all values as JSON ,value_serializer = lambda value: json.dumps(value).encode() ,) while True: transaction: dict = create_random_transaction() producer.send(TRANSACTIONS_TOPIC, value= transaction) print(transaction) #DEBUG sleep(SLEEP_TIME)
detector.py
import osimport jsonfrom kafka import KafkaConsumer, KafkaProducerfrom settings import *def is_suspicious(transaction: dict) -> bool: """Simple condition to determine whether a transaction is suspicious.""" return transaction["amount"] >= 900if __name__ == "__main__": consumer = KafkaConsumer( TRANSACTIONS_TOPIC ,bootstrap_servers=KAFKA_BROKER_URL ,value_deserializer = lambda value: json.loads(value) , ) for message in consumer: transaction: dict = message.value topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC print(topic,transaction) #DEBUG
appdetector.py
from flask import Flask, Response, stream_with_context, render_template, json, url_forfrom kafka import KafkaConsumerfrom settings import *# create the flask object appapp = Flask(__name__)def stream_template(template_name, **context): print('template name =',template_name) app.update_template_context(context) t = app.jinja_env.get_template(template_name) rv = t.stream(context) rv.enable_buffering(5) return rvdef is_suspicious(transaction: dict) -> bool: """Determine whether a transaction is suspicious.""" return transaction["amount"] >= 900# this router will render the template named index.html and will pass the following parameters to it:# title and Kafka stream@app.route('/')def index(): def g(): consumer = KafkaConsumer( TRANSACTIONS_TOPIC , bootstrap_servers=KAFKA_BROKER_URL , value_deserializer=lambda value: json.loads(value) , ) for message in consumer: transaction: dict = message.value topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC print(topic, transaction) # DEBUG yield topic, transaction return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))if __name__ == "__main__": app.run(host="localhost" , debug=True)
templates/index.html
<!doctype html><title> Send Javascript with template demo </title><html><head></head><body> <div> <h1>{{title}}</h1> </div> <div></div> {% for topic, transaction in data: %} <script> var topic = "{{ topic }}"; var transaction = "{{ transaction }}"; if (topic.search("fraud") > 0) { topic = topic.fontcolor("red") } else { topic = topic.fontcolor("green") } document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction; </script> {% endfor %}</body></html>