Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Putting a PyTorch ML model into production with Redpanda Connect

License

NotificationsYou must be signed in to change notification settings

voutilad/redpanda-pytorch-demo

Repository files navigation

A redpanda & a python exploring a cave while carrying a torch.

Who's that little guy in the background?No idea!

This is an example of rapidly deploying a tuned classification model by usingRedpanda Connect with Python. It leverages Python modules fromHugging Face andPyTorch witha pre-tuned sentiment classifier for financial news derived from Meta'sRoBERTa base model.

Two examples are provided:

  • anAPI service that provides an HTTP API for scoring contentwhile also caching and persisting classifier output in-memory and,optionally, to a Redpanda topic for others to consume

  • anstream analytics pipeline that takes data from one Redpandatopic, classifies it, and routes output to a destination topicwhilereusing the same pipeline from the API approach

The model used is originally from Hugging Face usermrm8448 and providesa fine-tuned financial news implementation of Meta's RoBERTatransformer-based language model:

https://huggingface.co/mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis

It's included as a git submodule, but if you're viewing this READMEvia Github's web UI and trying to click the submodule link, theysadly don't support links out to non-Github submodules!

Requirements

You must have:

Optionally, you should have:

  • jq (optional)
  • Docker

Installation

Two methods are provided: local (recommended so you can follow all theexamples) and Docker-based. If you don't have a local copy of Python 3.12,you should use the Docker approach.

Local Installation

On macOS or Linux distros, you can copy and paste these commands toget up and running quickly:

  1. Clone the project and its submodules:
git clone https://github.com/voutilad/redpanda-pytorch-democd redpanda-pytorch-demogit submodule update --init --recursive
  1. Install a Python virtualenv and dependencies:
python3 -m venv venvsource venv/bin/activatepip install -U pippip install -r requirements.txt
  1. Build the Redpanda Connect w/ embedded Python fork:
CGO_ENABLED=0 go build -C rpcp

Docker

A providedDockerfile makes it easy to package up the model,Redpanda Connect, and the Python environment. This is great if you don't havePython 3.12 locally (like on Debian 12 distros) or want to actually deploythis thing somewhere in the cloud.

Usedocker to build our image and tag it asredpanda-torch:

docker build. -t redpanda-torch

The built image is pre-set to run theHTTP server, soyou just need to expose the TCP port:

docker run --rm -it -p 8080:8080 redpanda-torch

In the walkthrough below, you'll use environment variables to configureruntime settings in Redpanda Connect. Just use the Docker conventions,setting them via--env or-e.

For running the streaming enrichmentexample,override the command line args:

docker run --rm -it -p 8080:8080 redpanda-torch \  run -r python.yaml enrichment.yaml

Those yaml files are inside the Docker image, by the way.

Preparing Redpanda

We need a few topics created for our examples. Assuming you've installedrpk and havea profile that's authenticated to your Redpanda cluster, you can run thefollowing commands:

rpk topic create \  news positive-news negative-news neutral-news unknown-news -p 5

If using Redpanda Serverless, you should be able to userpk auth loginto create your profile.

If using a Redpanda instance that requires authentication, such as RedpandaServerless, create a Kafka user and ACLs that allow the principal to bothproduce and consume from the above topics as well as create a consumer group:

rpk security user create demo --password demorpk security acl create \  --allow-principal"User:demo" \  --operation read,write,describe \  --topic news,positive-news,negative-news,neutral-news,unknown-news \  --group sentiment-analyzer

Feel free to use a different password!

The HTTP API Server

The HTTP API server example demonstrates some awesome features of RedpandaConnect:

  • Avoiding costly compute bycaching results
  • Distributing data to multiple outputs viafan out
  • Providingsynchronous responses to HTTP clients for an interactive API
  • Reusing Redpanda Components viacomposable resources to reduce code
  • Using runtime data inspection toroute based on ML output

Running the HTTP Service

This example relies on environment variables for some runtime configuration.You'll need to set a few depending on where you're running Redpanda:

  • REDPANDA_BROKERS: list of seed brokers (defaults to "localhost:9092")
  • REDPANDA_TLS: boolean flag for enabling TLS (defaults to "false")
  • REDPANDA_SASL_USERNAME: Redpanda Kafka API principal name (no default)
  • REDPANDA_SASL_PASSWORD: Redpanda Kafka API principal name (no default)
  • REDPANDA_SASL_MECHANISM: SASL mechanism to use (defaults to "none")
  • REDPANDA_TOPIC: Base name of the topics (defaults to "news")

To run in a mode that accepts HTTP POSTs of content to classify, use theprovidedhttp-server.yaml and an HTTP client likecurl.

  1. Set any of your environment variables to make things easier:
export REDPANDA_BROKERS=tktktktktkt.any.us-east-1.mpx.prd.cloud.redpanda.com:9092export REDPANDA_TLS=trueexport REDPANDA_SASL_USERNAME=demoexport REDPANDA_SASL_PASSWORD=demoexport REDPANDA_SASL_MECHANISM=SCRAM-SHA-256

The above is a faux config for Redpanda Serverless and matches the details wecreated inPreparing Redpanda above.

  1. With your virtualenv active, start up the HTTP service:
./rpcp/rp-connect-python run -r python.yaml http-server.yaml
  1. From another terminal, fire off a request withcurl (and pipe tojqif you have it):
curl -s -X POST \    -d"The latest recall of Happy Fun Ball has sent ACME's stock plummeting." \'http://localhost:8080/sentiment'| jq

You should get something like this in response:

{"label":"negative","metadata": {"cache_hit":false,"sha1":"d7452c7cc882d1c690635cac92945e815947708d"  },"score":0.9984525442123413,"text":"The latest recall of Happy Fun Ball has sent ACME's stock plummeting."}

On the Redpanda side, you'll notice we don't get anything written to thetopics! The next section will go into more detail, but for now restart theservice with a new environment variable:

REDPANDA_OUTPUT_MODE=both ./rpcp/rp-connect-python \  run -r python.yaml http-server.yaml

Now, submit the same data as before:

curl -s -X POST \    -d"The latest recall of Happy Fun Ball has sent ACME's stock plummeting." \'http://localhost:8080/sentiment'| jq

You should get the same JSON reply back.So what's different?

Userpk and consume from our topics:

rpk topic consume positive-news neutral-news negative-news --offset :end

You should see a result from ournegative-news topic:

{"topic":"negative-news","key":"d7452c7cc882d1c690635cac92945e815947708d","value":"{\"label\":\"negative\",\"metadata\":{\"cache_hit\":false,\"sha1\":\"d7452c7cc882d1c690635cac92945e815947708d\"},\"score\":0.9984525442123413,\"text\":\"The latest recall of Happy Fun Ball has sent ACME's stock plummeting.\"}","timestamp":1725628216383,"partition":4,"offset":0}

Under the Covers

Now, for a guided walkthrough of how it works! This section breaks down howthe configuration inhttp-server.yaml does what itdoes.

Receiving HTTP POSTs

The pipeline starts off with anhttp_server input, which provides the APIsurface area for interacting with clients:

input:http_server:address:${HOST:127.0.0.1}:${PORT:8080}path:/sentiment

Thehttp_server can do a lot more than this, including support TLS forsecure communication as well as support websocket connections. In this case,we keep it simple: clients need to POST a body of text to the/sentimentpath on our local web server.

You'l also notice the our first usage ofenvironment variable interpolation.More will be said about it in coming sections, but for now just view usingHOST andPORT environment variables as a way to deviate from our defaultlisten address of127.0.0.1 and port8080. (This is how the providedDockerfile changes the defaultHOST to0.0.0.0.)

Using Caching to Reduce Stress on the Model

Next, we have anmemory_cache resource. In some situations, you may wantothercache backends, like Redis/Valkey, but this simply uses local memory.

Caches are designed to be access from multiple components, so they start ofdefined in acache_resources list:

cache_resources:  -label:memory_cachememory:default_ttl:5mcompaction_interval:60s

Here we're defining a single cache, calledmemory_cache. You can call it(almost) anything you want. We'll use thelabel to refer to the cacheinstance.

Cache Lookups

If we now look at the first stage in the pipeline, we'll see the first stepis to utilize the cache for a lookup:

pipeline:processors:    -cache:resource:memory_cacheoperator:getkey:'${!content().string().hash("sha1").encode("hex")}'

Here thecacheprocessor uses our cache resource we defined, referencedby name/label.

It computes a key on the fly by decoding the content of the HTTP POST bodyinto a string and hashing it with the SHA-1 algorithm, all done via bloblanginterpolation.If we have a hit, the message is replaced with the value from the cache.

But what about if wedon't have a cache hit?

Cache Misses

We use a conditionalbranch stage to handle cache misses. It checksif the error flag is set by the previous stage (in this case, a cache missresults in the error flag being set, soerrored() evaluates totrue). Ifwe've errored, we create a temporary message from thecontent() of theincoming message. Otherwise, we usedeleted() to emit nothing.

-branch:request_map:|      # on error, we had a cache miss.      root = if errored() { content() } else { deleted() }processors:# these run only on the temporary messages from `request_map` evaluation# ...

This can be a tad confusing at first. Essentially, you're defining/creatinga temporary message to pass to a totally different pipeline of processors.In practice, this message will be based on the actual incoming message...but it doesn't have to be!

This temporary message is then passed into the innerprocessors.

We'll talk about updated the cache momentarily.

Analyzing Sentiment with PyTorch / Hugging Face

The first inner processor is where our Python enrichment occurs. You'll noticeit looks super boring!

processors:          -resource:python

In this case, we're referencing aprocessor resource that's definedelsewhere. In this case, it's thepython.yaml youpassed with the-r argument to Redpanda Connect.

If you look in that file, you'll see a resource definition in a similar formatto how our cache resource was defined. The important parts are repeated below:

python:script:|    from classifier import get_pipeline    device = environ.get("DEMO_PYTORCH_DEVICE", "cpu")    text = content().decode()    pipeline = get_pipeline(device=device)    root.text = text    scores = pipeline(text)    if scores:      root.label = scores[0]["label"]      root.score = scores[0]["score"]    else:      root.label = "unlabeled"      root.score = 0.0

Using thePython integration,we can leverage PyTorch and Hugging Face tools in just a few lines of inlinecode.

There's a runtime import of a local helper moduleclassifierthat wires up the pre-trained model and tokenizer.

Q: What about GPUs? Does this work with GPUs?A: Yes. The code is defaulting right now to a "cpu" device, but you canchange the argument toget_pipeline() in the Python code and passan appropriate value that PyTorch can use. For instance, if you'reon macOS with Apple Silicon, use"mps". See thetorch.devicedocs for details on supported values. To do this in the demo, youcan set the environment variableDEMO_PYTORCH_DEVICE to the typeyou want to use.

For more details on how Python integrates with Redpanda Connect, see thehttps://github.com/voutilad/rp-connect-python project on the nuances ofthe bloblang-like features embedded in Python. It's beyond the scope ofthis demonstration.

At this point, we've taken what was our boring message of just text andcreated astructured message with multiple fields that looks like:

{"text":"The original text!","label":"positive","score":0.999 }

Updating the Cache

Now that we've done the computationally heavy part of applying the ML model,we want to update the cache with the results so we don't have to repeatourselves for the same input.

In this case, we do it in a two step process for reasons we'll see later:

          -mutation:|              # compute a sha1 hash as a key              root.metadata.sha1 = this.text.hash("sha1").encode("hex")          -cache:resource:memory_cacheoperator:setkey:'${!this.metadata.sha1}'value:'${!content()}'

The first step above is computing the sha-1 hash of the text we saved fromthe original message. We tuck this in a nested field.

Then, we have another instance of acache processor that references thesame cache resource as before. (See how handy resources are?) In thiscase, however, we're using aset operationand providing the newvalue to store. The key to use is a simple bloblang interpolationthat points to our just-computed sha-1 hash.

The tricky thing is the value: we usecontent() to store the fullpayload of the message. It's not intuitive! Thecache processor doesn'tuse the message itself...you need to interpolate the message content intoa value to insert into the cache. Confusing!

Rejoining from our Branch

If we had a cache miss, we're now at the end of our branch operation andwe need to convert that temporary message to something permanent. Did youforget we've been working with atemporary messsage? I bet you did.

The tail end of thebranch config tells the processor how to convert thattemporary message, if it exists, into a real message to pass onwards:

result_map:|          root = this          root.metadata.cache_hit =false

In this case it's simple: we're copyingthis (the temporary message) to thenew message (i.e.root) and also setting a new nested field at the same time.In this case, we mention we had a cache miss. This way we can see if we'reactually hitting the cache or not so all your work won't be for naught.

Last Stop before Output

Lastly, there's a trivialmutation step to set the nestedcache_hit fieldif it doesn't exist. Pretty simple. If it's non-existent, then we never wentdown the branch path...which means we must have had a cache hit:

    -mutation:|        root.metadata.cache_hit = this.metadata.cache_hit |true

Getting Data to its Final Destination

Here we use more resource magic to make the outputs toggle-able via theenvironment variableDEMO_OUTPUT_MODE. We start off with a trivialoutput definition that just references our resource:

output:resource:${DEMO_OUTPUT_MODE:http}

Using interpolation, we pull the value from the environment. If it'snot defined, we default to"http" as the value.

Now we can define ouroutput_resources. You could put these in their ownfile, but that's an exercise left to the reader.

Let's take a look at them individually.

HTTP Response

Since this is an HTTP API, it's following what some call arequest/replyprotocol. The client sends some data (via a POST, in this case) and expectsa response back. To do this, we use thesync_response component whichwill do this automatically:

output_resources:# Send the HTTP response back to the client.  -label:httpsync_response:{}
Sinking Data into Multiple Redpanda Topics

Other applications might benefit from our work enriching this data, so let'sput the data in Redpanda. We can make everyone's lives easier by sorting thedata based on the sentiment label:positive,negative, or (in the eventof a failure)neutral. This is where our multiple topics comes into play!

# Send the data to Redpanda.  -label:redpandakafka_franz:seed_brokers:        -${REDPANDA_BROKERS:localhost}topic:"${!this.label | unknown}-${REDPANDA_TOPIC:news}"key:${!this.metadata.sha1}batching:count:1000period:5stls:enabled:${REDPANDA_TLS:false}sasl:        -mechanism:${REDPANDA_SASL_MECHANISM:none}username:${REDPANDA_SASL_USERNAME:}password:${REDPANDA_SASL_PASSWORD:}

You can read the details on configuring thekafka_franz connector in thedocsso I won't go into detail here. The important part is thetopic configuration.

You should notice this is a combination ofbloblang and environment variableinterpolation. This lets the output component programmatically define thetarget topic and lets us route messages.

Lastly, we're reusing that sha-1 hash as the key to demonstrate how that, too,can be programmatic via interpolation.

Why Not Both? Using Fan Out.

Let's say we want toboth reply to the client (to be helpful and polite) aswell as save the data in Redpanda for others. We can use abroker outputthat lets us define the pattern of routing messages across multiple outputs.

In this case, we usefan_out to duplicate messages to all defined outputs.

Since we already defined our two outputs above as part of ouroutput_resources, this is super simple! We can just useresource outputsthat take a named resource by label so we don't have to repeat ourselves.

# Do both: send to Redpanda and reply to the client.  -label:bothbroker:pattern:fan_outoutputs:        -resource:http        -resource:redpanda

In this current configuration, usingboth will cause an initial cold-startlatency spike as the connection to the Redpanda cluster is made whileprocessing the first request. This will appear as a delay to the http clientcalling the service, but subsequent requests won't have this penalty.

The Data Enrichment Approach

Using what you learned above, we can easily build adata enrichment pipelinesourcing data from an input Redpanda topic, performing the same sentimentanalysis we configured inpython.yaml, and route the outputto different topics just like before.

In this case, we use bothkafka_franzinputandoutput. Mostimportantly, we canreuse the same Python pipeline component as it's alreadydefined in a separate resource file.

Running this example is similar to the previous. Just change the pipeline file:

./rpcp/rp-connect-python run -r python.yaml enrichment.yaml

For testing, you can produce data to your input topic usingrpk:

echo'The Dow closed at a record high today on news that aliens are real' \| rpk topic produce news

And consume the output:

rpk topic consume \  positive-news negative-news neutral-news unknown-news \  --offset :end

Sourcing Data

This is pretty simple using akafka_franz input. You'll notice that the realdifference here is theconsumer_group setting. This will let us properlyscale up if needed and help with tracking committed offsets in the stream.

input:kafka_franz:seed_brokers:      -${REDPANDA_BROKERS:localhost:9092}topics:      -${REDPANDA_TOPIC:news}consumer_group:${REDPANDA_CONSUMER_GROUP:sentiment-analyzer}batching:count:1000period:5stls:enabled:${REDPANDA_TLS:false}sasl:      -mechanism:${REDPANDA_SASL_MECHANISM:none}username:${REDPANDA_SASL_USERNAME:}password:${REDPANDA_SASL_PASSWORD:}

It's worth pointing out thebatching section. Thepython component canprocess batches more efficiently than single messages, so it's recommended tobatch when you can.

The Enrichment Pipeline

Our pipeline logic becomes trivial thanks to resources:

pipeline:processors:    -resource:python

That's it! It'sthat easy.

Sinking Data

We use the same interpolation approaches as before with one exception. Seeif you can spot it:

output:kafka_franz:seed_brokers:      -${REDPANDA_BROKERS:localhost}topic:"${!this.label | unknown}-${REDPANDA_TOPIC:news}"key:${!meta("kafka_key")}batching:count:1000period:5stls:enabled:${REDPANDA_TLS:false}sasl:      -mechanism:${REDPANDA_SASL_MECHANISM:none}username:${REDPANDA_SASL_USERNAME:}password:${REDPANDA_SASL_PASSWORD:}

Instead of a sha-1 hash, which we don't really need or care about, we re-usethe original key (if any) from the incoming message. If data is produced toour input topic with a key, we'll re-use that key.

Wrapping Up

Hopefully this is helpful in both explaining the intricacies of RedpandaConnect end-to-end as well as illustrating a useful example of using alow-code approach to building enrichment services and pipelines!

About the Banner Image

The cute Redpanda and Python exploring a cave was created byBing Image Creator.

About

Putting a PyTorch ML model into production with Redpanda Connect

Topics

Resources

License

Stars

Watchers

Forks


[8]ページ先頭

©2009-2025 Movatter.jp