Movatterモバイル変換


[0]ホーム

URL:


Detecting Fraudulent Transactions in a Streaming App using Kafka in Python

Learn how to detect and filter fraudulent transactions in a streaming application using Apache Kafka with Python API.
  · 8 min read · Updated apr 2022 ·General Python Tutorials ·Web Programming

Turn your code into any language with ourCode Converter. It's the ultimate tool for multi-language programming. Start converting now!

As systems scale and migrate, fighting fraudsters has become a paramount challenge which is best addressed using real-time stream processing technologies.

For the purpose of this tutorial, we will build from scratch a real-time fraud detection system where we will generate a stream of fictitious transactions and synchronously analyze them to detect which ones are fraudulent.

Prerequisites

As our requirements stand, we need a reliable, scalable and fault-tolerant event streaming platform that stores our input events or transactions and process results.Apache Kafka is an open source distributed streaming platform that responds exactly to this need.

Apache Kafka can be downloaded from its official site, installing and running Apache Kafka on your operating system is outside the scope of this tutorial. However, you can checkthis guide that shows how to install it on Ubuntu (or any Debian-based distribution).

After you have everything setup, you can check the following:

  • A Zookeeper instance is running onTCP port 2181.
  • A Kafka instance is running and bound toTCP port 9092.

If the above mentioned controls are fulfilled, then now you have a single-node Kafka cluster up and running.

Creating the Application Skeleton

We can start now building our real-time fraud detection application using Kafka's consumer and producer APIs.

Our application will consist of:

  • A transactions generator, on one end, which produce fictitious transactions to simulate a flow of events.
  • A fraud detector, on the other end, to filter out transactions which look suspicious.

The following process flowchart exhibits our design:

Flowchart of fraudulent transactions detector

Let's jump right into the setup. Of course, you needPython 3 installed on your system. I will be using a virtual environment where I install the needed libraries and this is undoubtedly the best approach to opt for:

  • Create a virtual environment and activate it:
    $ python -m venv fraud-detector$ source fraud-detector/bin/activate
  • Create the filerequirements.txt and add the following lines to it:
    Kafka-python==2.0.2Flask==1.1.2
  • Install the libraries:
    $ pip install -r requirements.txt

At the end of this tutorial, the folder structure will look like the following:

Final project structureWith that cleared out, let's now start writing the actual code.

First, let's initialize our parameters in oursettings.py file:

# URL for our broker used for connecting to the Kafka clusterKAFKA_BROKER_URL   = "localhost:9092"# name of the topic hosting the transactions to be processed and requiring processingTRANSACTIONS_TOPIC = "queuing.transactions"# these 2 variables will control the amount of transactions automatically generatedTRANSACTIONS_PER_SECOND = float("2.0")SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND# name of the topic hosting the legitimate transactionsLEGIT_TOPIC = "queuing.legit"# name of the topic hosting the suspicious transactionsFRAUD_TOPIC = "queuing.fraud"

Note: For the sake of brevity, I hardcoded the configuration parameters insettings.py, but it's recommended to store these parameters in a separate file (e.g .env)

Second, let's create a Python file calledtransactions.py for creating randomized transactions on the fly:

from random import choices, randintfrom string import ascii_letters, digitsaccount_chars: str = digits + ascii_lettersdef _random_account_id() -> str:    """Return a random account number made of 12 characters"""    return "".join(choices(account_chars,k=12))def _random_amount() -> float:    """Return a random amount between 1.00 and 1000.00"""    return randint(100,1000000)/100def create_random_transaction() -> dict:    """Create a fake randomised transaction."""    return {        "source":_random_account_id()       ,"target":_random_account_id()       ,"amount":_random_amount()       ,"currency":"EUR"    }

Third, let's build our transactions generator which will use to create a stream of transactions. The Python fileproducer.py will play the role of the transactions generator and will store the published transactions within the topic calledqueuing.transactions, below is the code ofproducer.py:

import osimport jsonfrom time import sleepfrom kafka import KafkaProducer# import initialization parametersfrom settings import *from transactions import create_random_transactionif __name__ == "__main__":   producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL                            #Encode all values as JSON                           ,value_serializer = lambda value: json.dumps(value).encode()                           ,)   while True:       transaction: dict = create_random_transaction()       producer.send(TRANSACTIONS_TOPIC, value= transaction)       print(transaction) #DEBUG       sleep(SLEEP_TIME)

To make sure that you're on the right track, let's test theproducer.py program. To do so, open up a terminal window and type the following:

$ python producer.py

Note: Make sure that Kafka server is running before executing the test.

You should see an output similar to the following:

Generated Fake TransactionsFourth, after we guaranteed that our producer program is up and running, let's move now into building a fraud detection mechanism to process the stream of transactions and to pinpoint the fraudulent ones.

We will develop two versions of this program:

  • Version 1detector.py: This program will filter out the queued transactions based on a specific criteria or set of criteria and outputs the results into two separate topics: one for legitimate transactionsLEGIT_TOPIC and the otherFRAUD_TOPIC for the fraudulent ones which do cater for the criteria we selected.

This program is based onKafka Python consumer API, this API allows the consumers to subscribe to specific Kafka topics and Kafka will broadcast messages automatically to them as long as these messages are being published.

Below is the code fordetector.py:

import osimport jsonfrom kafka import KafkaConsumer, KafkaProducerfrom settings import *def is_suspicious(transaction: dict) -> bool:    """Simple condition to determine whether a transaction is suspicious."""    return transaction["amount"] >= 900if __name__ == "__main__":   consumer = KafkaConsumer(       TRANSACTIONS_TOPIC      ,bootstrap_servers=KAFKA_BROKER_URL      ,value_deserializer = lambda value: json.loads(value)      ,   )   for message in consumer:       transaction: dict = message.value       topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC       print(topic,transaction) #DEBUG

For simplicity, I selected a basic conditionis_suspicious() function that is based on a simple predicate (that is, if the transaction amount is greater than or equals 900, then it is suspicious). However, in real life scenarios, many parameters might be involved, among which:

  • The initiator account for the transaction whether active, inactive or dormant.
  • The location of the transaction if initiated from an entity supposedly closed during a lockdown period.

These scenarios will constitute the core of a fraud management solution and should be carefully designed to ensure the flexibility and the responsiveness of this solution.

Let's now test bothproducer.py anddetector.py, open up a new terminal window and type the following:

$ python producer.py

Concurrently, open up another window and type:

$ python detector.py

Note: Make sure that Kafka server is running before executing the test.

You'll see a similar output to this ondetector.py:

Detected fraudulent transactionsThe debug print included within the program will output the transactions to the console and will indicate the target queue based on the condition we specified and related the amount of the transaction.

  • Version 2appdetector.py: This is an advanced version of the detector.py script, which will enable the streaming of the transactions to the web using Flask micro framework.

Technologies used in this code are the following:

  • Flask: a micro web framework in Python.
  • Server-Sent events (SSE): a type of server push mechanism, where a client subscribes to a stream of updates, generated by a server, and whenever a new event occurs, a notification is sent to the client.
  • Jinja2: a modern template engine widely used in the Python ecosystem. Flask supports Jinja2 by default.

Below isappdetector.py:

from flask import Flask, Response, stream_with_context, render_template, json, url_forfrom kafka import KafkaConsumerfrom settings import *# create the flask object appapp = Flask(__name__)def stream_template(template_name, **context):    print('template name =',template_name)    app.update_template_context(context)    t = app.jinja_env.get_template(template_name)    rv = t.stream(context)    rv.enable_buffering(5)    return rvdef is_suspicious(transaction: dict) -> bool:    """Determine whether a transaction is suspicious."""    return transaction["amount"] >= 900# this router will render the template named index.html and will pass the following parameters to it:# title and Kafka stream@app.route('/')def index():    def g():        consumer = KafkaConsumer(            TRANSACTIONS_TOPIC            , bootstrap_servers=KAFKA_BROKER_URL            , value_deserializer=lambda value: json.loads(value)            ,        )        for message in consumer:            transaction: dict = message.value            topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC            print(topic, transaction)  # DEBUG            yield topic, transaction    return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))if __name__ == "__main__":   app.run(host="localhost" , debug=True)

Next, we'll define the templateindex.html HTML file that was used byindex() route function and is undertemplates folder:

<!doctype html><title> Send Javascript with template demo </title><html><head></head><body>    <div>        <h1>{{title}}</h1>    </div>    <div></div>    {% for topic, transaction in data: %}    <script>        var topic = "{{ topic }}";        var transaction = "{{ transaction }}";        if (topic.search("fraud") > 0) {            topic = topic.fontcolor("red")        } else {            topic = topic.fontcolor("green")        }        document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction;    </script>    {% endfor %}</body></html>

Theindex.html contains a Javascript enabling to iterate throughout the received stream and to display the transactions as they're received.

Now to run it, make sureproducer.py is running, and then:

$ python appdetector.py

It should start a local server on port 5000, go to your browser and accesshttp://localhost:5000 where the Flask instance is running, you'll see a continuous streaming of legitimate and fraudulent transactions as shown in the following screen:

Filtering continuous transactionsYou can check the full codehere.

This tutorial has illustrated how you can apply the stream processing paradigm in fraud detection applications.

Learn also:Recommender Systems using Association Rules Mining in Python.

Happy coding ♥

Save time and energy with ourPython Code Generator. Why start from scratch when you can generate? Give it a try!

View Full Code Assist My Coding
Sharing is caring!



Read Also


How to Use MySQL Database in Python
How to Generate and Read QR Code in Python
Introduction to Finance and Technical Indicators with Python

Comment panel

    Got a coding query or need some guidance before you comment? Check out thisPython Code Assistant for expert advice and handy tips. It's like having a coding tutor right in your fingertips!





    Ethical Hacking with Python EBook - Topic - Top


    Join 50,000+ Python Programmers & Enthusiasts like you!



    Tags


    New Tutorials

    Popular Tutorials


    Ethical Hacking with Python EBook - Topic - Bottom






    Claim your Free Chapter!

    Download a Completely Free Practical Python PDF Processing Chapter.

    See how the book can help you build handy PDF Processing tools!



    [8]ページ先頭

    ©2009-2025 Movatter.jp