Movatterモバイル変換


[0]ホーム

URL:



Code forDetecting Fraudulent Transactions in a Streaming App using Kafka in Python Tutorial


View on Github

settings.py

# URL for our broker used for connecting to the Kafka clusterKAFKA_BROKER_URL   = "localhost:9092"# name of the topic hosting the transactions to be processed and requiring processingTRANSACTIONS_TOPIC = "queuing.transactions"# these 2 variables will control the amount of transactions automatically generatedTRANSACTIONS_PER_SECOND = float("2.0")SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND# name of the topic hosting the legitimate transactionsLEGIT_TOPIC = "queuing.legit"# name of the topic hosting the suspicious transactionsFRAUD_TOPIC = "queuing.fraud"

transactions.py

from random import choices, randintfrom string import ascii_letters, digitsaccount_chars: str = digits + ascii_lettersdef _random_account_id() -> str:    """Return a random account number made of 12 characters"""    return "".join(choices(account_chars,k=12))def _random_amount() -> float:    """Return a random amount between 1.00 and 1000.00"""    return randint(100,1000000)/100def create_random_transaction() -> dict:    """Create a fake randomised transaction."""    return {        "source":_random_account_id()       ,"target":_random_account_id()       ,"amount":_random_amount()       ,"currency":"EUR"    }

producer.py

import osimport jsonfrom time import sleepfrom kafka import KafkaProducer# import initialization parametersfrom settings import *from transactions import create_random_transactionif __name__ == "__main__":   producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL                            #Encode all values as JSON                           ,value_serializer = lambda value: json.dumps(value).encode()                           ,)   while True:       transaction: dict = create_random_transaction()       producer.send(TRANSACTIONS_TOPIC, value= transaction)       print(transaction) #DEBUG       sleep(SLEEP_TIME)

detector.py

import osimport jsonfrom kafka import KafkaConsumer, KafkaProducerfrom settings import *def is_suspicious(transaction: dict) -> bool:    """Simple condition to determine whether a transaction is suspicious."""    return transaction["amount"] >= 900if __name__ == "__main__":   consumer = KafkaConsumer(       TRANSACTIONS_TOPIC      ,bootstrap_servers=KAFKA_BROKER_URL      ,value_deserializer = lambda value: json.loads(value)      ,   )   for message in consumer:       transaction: dict = message.value       topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC       print(topic,transaction) #DEBUG

appdetector.py

from flask import Flask, Response, stream_with_context, render_template, json, url_forfrom kafka import KafkaConsumerfrom settings import *# create the flask object appapp = Flask(__name__)def stream_template(template_name, **context):    print('template name =',template_name)    app.update_template_context(context)    t = app.jinja_env.get_template(template_name)    rv = t.stream(context)    rv.enable_buffering(5)    return rvdef is_suspicious(transaction: dict) -> bool:    """Determine whether a transaction is suspicious."""    return transaction["amount"] >= 900# this router will render the template named index.html and will pass the following parameters to it:# title and Kafka stream@app.route('/')def index():    def g():        consumer = KafkaConsumer(            TRANSACTIONS_TOPIC            , bootstrap_servers=KAFKA_BROKER_URL            , value_deserializer=lambda value: json.loads(value)            ,        )        for message in consumer:            transaction: dict = message.value            topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC            print(topic, transaction)  # DEBUG            yield topic, transaction    return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))if __name__ == "__main__":   app.run(host="localhost" , debug=True)

templates/index.html

<!doctype html><title> Send Javascript with template demo </title><html><head></head><body>    <div>        <h1>{{title}}</h1>    </div>    <div></div>    {% for topic, transaction in data: %}    <script>        var topic = "{{ topic }}";        var transaction = "{{ transaction }}";        if (topic.search("fraud") > 0) {            topic = topic.fontcolor("red")        } else {            topic = topic.fontcolor("green")        }        document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction;    </script>    {% endfor %}</body></html>

Ethical Hacking with Python EBook - Author - Top


Join 50,000+ Python Programmers & Enthusiasts like you!



Tags

Mastering YOLO - Yacine Author - Middle


New Tutorials

Popular Tutorials


Practical Python PDF Processing EBook - Abdou Author - Bottom







[8]ページ先頭

©2009-2025 Movatter.jp