Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A Python interpreter embedded in Redpanda Connect

License

NotificationsYou must be signed in to change notification settings

voutilad/rp-connect-python

Repository files navigation

Build without CGO_ENABLED

A redpanda & a python sipping tea together as friends.

Adds an embedded Python interpreter to Redpanda Connect, so you canwrite your integration and transformation logic in pure Python:

# rot13.yamlinput:stdin:{}pipeline:processors:    -python:exe:python3script:|          import codecs          msg = content().decode()          root.original = msg          root.encoded = codecs.encode(msg, "rot_13")output:stdout:{}logger:level:OFF
$ echo My voice is my passport | ./rp-connect-python run examples/rot13.yaml{"original": "My voice is my passport", "encoded": "Zl ibvpr vf zl cnffcbeg"}

Requirements

  • Python 3.12 (hard requirement, currently!)
  • setuptools (makes it so much easier to findlibpython, justpip installit.)
    • On macOS, if you usedbrew to install Python, it can fall back to usingotool to find the dynamic library.
    • On Linux...sorry! You must usesetuptools.
  • Go 1.22 or newer

Building

Building `rp-connect-python is simple as it's using pure Go code:

CGO_ENABLED=0 go build

That's it! A variety of tests are provided and looking at the current GitHubactionfile shows some examples.

Component Types

This project provides the following new Python component types:

  1. Input -- for generating data using Python
  2. Processor -- for transforming data with Python
  3. Output -- for sinking data with Python

Input

Thepython input allows you to generate or acquire data using Python. Yourscript can provide one of the following data generation approaches based onthe type of object you target when setting thename configuration property:

  • object
    • If you provide a single Python object, it can be passed as a single input.
  • list ortuple
    • A list or tuple will have each item extracted and provided to the pipeline.
  • generator
    • Items will be produced from the generator until it's exhausted.
  • function
    • Any function provided will be called repeatedly until it returnsNone.
    • Functions may take an optional kwargstate, adict, and use itto keep state between invocations.

Input Serialization

By default, the input will serialize data either as native Go values (in thecase ofstring,number,bytes) and will convert to JSON in the case ofPython container typesdict,list, andtuple.

Serialization viapickle can be done manually, but if you setpickle: truethe input will convert the produced Python object usingpickle.dumps()automatically, storing the output as raw bytes on the Redpanda ConnectMessage.

Input Configuration

Common configuration with defaults for a Pythoninput:

input:label:""python:pickle:false# Enable pickle serializerbatch_size:1# How many messages to include in a single message batch.mode:global# Interpreter mode (one of "global", "isolated", "isolated_legacy")exe:"python3"# Name of python binary to use.name:# No default (required), name of generating local object.script:# No default (required), Python code to execute.

An example that uses a Python generator to emit 10 records, one every second:

input:python:name:gscript:|      import time      def producer():        for i in range(10):          time.sleep(1)          yield { "now": time.ctime(), "i": i }      g = producer()

Input Caveats

Currently, a single interpreter is used for executing the input script. If youchange themode, it will use different interpretersettings which could affectpython compatability ofyour script. Keep this in mind.

Processor

Thepython processor provides a similar experience to themapping bloblangprocessor, but in pure Python. The interpreter that runs your code provideslazy hooks back into Redpanda Connect, to mimic bloblang behavior:

  • content() -- similar to the bloblang function, it returns thebytes ofa message. This performs a lazy copy of raw bytes into the interpreter.

  • metadata(key) -- similar to the bloblang function, it provides access tothe metadata of a message using the providedkey.

  • root -- this is adict-like object in scope by default providing threeoperating modes simultaneously:

    • Assign key/values like a Pythondict, e.g.root["name"] = "Dave"
    • Use bloblang-like assignment by attribute, e.g.root.name.first = "Dave"
    • Reassign it to a new object, e.g.root = (1, 2). (Note: if you reassignroot, it loses its magic properties!)

Heads up!

If using the bloblang-like assignment, it will create the hierarchy of keyssimilar to in bloblang. `root.name.first = "Dave" will work even if "name"hasn't been assigned yet, producing a dict like:

root= {"name": {"first":"Dave" } }

For the details of howroot works, see theRoot Pythonclass.

Additionally, the following helper functions and objects improveinteroperability:

  • unpickle() -- will usepickle.loads() to deserialize the Redpanda ConnectMessage into a Python object.

  • meta -- adict that allows you to assign new metadata values to a messageor delete values (if you set the value toNone for a given key).

An example usingunpickle():

pipeline:processors:    -python:script:|          # these are logically equivalent          import pickle          this = pickle.loads(content())          this = unpickle()          root = this.call_some_method()          # if relying on Redpanda Connect structured data, use JSON.          import json          this = json.loads(content().decode())          root = this["a_key"]

The processor does not currently support automatic deserialization ofincoming data in an effort to keep as much of the expensive hooks back intoGo as lazy as possible so you only pay for what you use.

Processor Configuration

Common configuration with defaults for a Pythonprocessor:

pipeline:processors:    -python:exe:"python3"# Name of python binary to use.mode:"global"# Interpreter mode (one of "global", "isolated", "isolated_legacy")script:# No default (required), Python script to execute

Processor Demo

A simple demo usingrequests which will enrich amessage with a callout to an external web service illustrates many of the priorconcepts of using a Python processor:

input:generate:count:3interval:1smapping:|      root.title = "this is a test"      root.uuid = uuid_v4()      root.i = counter()pipeline:processors:    -python:exe:./venv/bin/python3script:|          import json          import requests          import time          data = content()          try:            msg = json.loads(data)["title"]          except:            msg = "nothing :("          root.msg = f"You said: '{msg}'"          root.at = time.ctime()          try:            root.ip = requests.get("https://api.ipify.org").text          except:            root.ip = "no internet?"output:stdout:{}

To run the demo, you need a Python environment with therequests moduleinstalled. This is easy to do with a virtual environment:

# Create a new virtual environment.python3 -m venv venv# Update pip, install setuptools, and install requests into the virtual env../venv/bin/pip install --quiet -U pip setuptools requests# Run the example../rp-connect-python run --log.level=off examples/requests.yaml

You should get output similar to:

{"msg": "You said: 'this is a test'", "at": "Fri Aug  9 19:07:29 2024", "ip": "192.0.1.210"}{"msg": "You said: 'this is a test'", "at": "Fri Aug  9 19:07:30 2024", "ip": "192.0.1.210"}{"msg": "You said: 'this is a test'", "at": "Fri Aug  9 19:07:31 2024", "ip": "192.0.1.210"}

Output

Presently, the Pythonoutput is a bit of a hack and really just a Pythonprocessor configured to use a single interpreter instance.

This means all the configuration and behavior is the same as in theprocessor configuration.

When theoutput improves and warrants further discussion, check this space!

For now, a simpleexample that simply writes theprovided message tostdout:

input:generate:count:5interval:mapping:|      root = "hello world"output:python:script:|      msg = content().decode()      print(f"you said: '{msg}'")http:enabled:false

Interpreter Modes

rp-connect-python now supports multiple interpreter modes that may be setseparately on eachinput,processor, andoutput instance.

  • global (the default)

    • Uses a global interpreter (i.e. no sub-interpreters) for all execution.
    • Allows passing pointers to Python objects between components, avoidingcostly serialization/deserialization.
    • Provides the most compatability at expense of throughput as your code willrely on the global main interpreter for memory management and the GIL.
  • isolated

    • Uses multiple isolated sub-interpreters with their own memory allocatorsand GILs.
    • Provides the best throughput performance for pure-Python use cases thatdon't leverage Python modules that use native code (e.g.numpy).
    • Require serializing/deserializing data as it leaves the context of theinterpreter.
  • isolated_legacy

    • Same asisolated, but instead of distinct GIL and memory allocators, usesa shared GIL and allocator.
    • Balances compatability with performance. Some Python modules might notsupport full isolation, butwill work in a shared GIL mode.

A more detailed discussion for the nerds follows.

Isolated & Isolated Legacy Modes

Most pure Python code should "just work" withisolated mode andisolated_legacy mode. Some older Python extensions, written in C or thelike, may not work inisolated mode and requireisolated_legacy mode.

If you see issues usingisolated (e.g. crashes), switch toisolated_legacy.

In general, crashes shouldnot happen. The most common causes are bugsinrp-connect-python related touse-after-free's in the Pythonintegration layer. If it's not that, it's an interpreter state issue,which is also a bug most likely inrp-connect-python. However, given theimmaturity of multi-interpreter support in Python, if the issue "goes away"by switching modes (e.g. to "legacy"), it's possible it's deeper than justrp-connect-python.

In some cases,isolated_legacy can perform as well orslightly better thanisolated even though it uses a shared GIL. It's very workload dependent, soit's worth experimenting.

Global Mode

Usingglopbal mode for a runtime will execute the Python code in thecontext of the "main" interpreter. (Inisolated andisolated_legacy modes,sub-interpreters derive from the "main" interpreter.) This is the traditionalmethod of embedding Python into an application.

While you may scale out yourglobal mode components, only a singlecomponent instance may utilize the "main" interpreter at a time. This isirrespective of the GIL as Python's C implementation relies heavily onthread-local storage for interpreter state.

Go was design by people that think programmers can't handle managingthreads. (Multi-threading is hard, but that's why we're paid the bigbucks, right?) As a result, the Go runtime does its own scheduling of Goroutines to some number of OS threads to achieve parallelism andconcurrency. Python does not jibe with this and the vibes are off, so alot of therp-connect-python internals are for managing how tocouple Python's thread-oriented approach with Go's go-routine world.

A lot of scientific software that uses external non-Python native codemay run best inglobal mode. This includes, but is not limited to:

  • numpy
  • pandas
  • pyarrow

A benefit toglobal mode is it's one interpreter state across all components,so you can create a Python object in one component (e.g. aninput) andeasily use it in aprocessor stage without mucking about with serialization.This is great for workloads that create large, in-memory objects, like PandasDataFrames or PyArrow Tables. In these cases, avoiding serialization may meanglobal mode is more efficient even if there's fighting over the interpreterlock.

The current design assumes arbitrary Go routines will need to acquireownership of the global ("main") interpreter and fight over a mutex. It'sentirely possible the mutex is held at points where the GIL is actuallyreleased or releasable, meaning other Python codecould run safely. It'sfuture work to figure out how to orchestrate this efficiently.

Python Compatability

This is en evolving list of notes/tips related to using certainpopular Python modules:

requests

Works best inisolated_legacy mode. Currently, can panicisolated mode onsome systems.

Whilerequests is pure Python, it does hook into some modules thatare not. Still identifying a race condition causing memory corruptioninisolated mode.

numpy

Recommendsglobal mode as explicitly does not support Pythonsub-interpreters. May work inisolated_legacy, but be careful.

pandas

Depends onnumpy, so might be best used inglobal mode if stability is aconcern. Works fine with thepickle support for passing DataFrames, but mightnot be the most efficient way for passing data around a long pipeline, soglobal might be preferable to isolated modes.

Anexample that shows filtering a DataFrame and usingpickle to pass it from theinput to theprocessor:

input:python:mode:globalname:dfpickle:truescript:|      import pandas as pd      df = pd.DataFrame.from_dict({"name": ["Maple", "Moxie"], "age": [8, 3]})pipeline:processors:    -python:mode:globalscript:|          import pickle          df = unpickle()          root = df.to_dict("list")output:stdout:{}

Note the use ofmode: global!

pyarrow

Works fine inglobal mode.

An example follows using thepyarrow.dataset capabilities to read from GCS.

Note the use ofserializer: none as it prevents data copying/duplication.

input:python:name:batchesserializer:nonescript:|      import pyarrow as pa      from pyarrow import fs      import pyarrow.dataset as ds      gcs = fs.GcsFileSystem()      dataset = ds.dataset("my-bucket/", format="parquet", filesystem=gcs)      # need special handling to raw cython generators, so for now      # wrap with a pure python generator      def take_all():        for batch in dataset.to_batches():          yield batch      batches = take_all()pipeline:processors:    -python:script:|          # 'this' is now a PyArrow RecordBatch          root.nbytes = this.nbytes          root.num_rows = this.num_rowsoutput:stdout:{}

pillow

Seems to work ok inisolated_legacy mode, but doesn't supportsub-interpreters, so recommended to run inglobal mode.

An example of a directory scanner that identifies types of JPEGs:

input:file:paths:[ ./*.jpg ]scanner:to_the_end:{}pipeline:processors:    -python:exe:./venv/bin/python3mode:globalscript:|          from PIL import Image          from io import BytesIO          infile = BytesIO(content())          try:            with Image.open(infile) as im:              root.format = im.format              root.size = im.size              root.mode = im.mode              root.path = metadata("path")          except OSError:            passoutput:stdout:{}

Assuming youpip install the dependencies ofsetuptools andpillow:

$  python3 -m venv venv$  ./venv/bin/pip install --quiet -U pip setuptools pillow$  ./rp-connect-python run --log.level=off examples/pillow.yaml{"format":"JPEG","mode":"RGB","path":"rpcn_and_python.jpg","size":[1024,1024]}

Known Issues / Limitations

  • Tested on macOS/arm64 and Linux/{arm64,amd64}.
    • Not expected to work on Windows. Requiresgogopython updates.
  • You can only use one Python binary across all Python processors.
  • Hardcoded still for Python 3.12. Should be portable to 3.13 and,in cases ofglobal mode, earlier versions. Requires changes togogopython I haven't made yet.

License and Supportability

Source code in this project is licensed under the Apache v2 license unlessnoted otherwise.

This software is provided without warranty or support. It is not part ofRedpanda Data's enterprise offering and not supported by Redpanda Data.


[8]ページ先頭

©2009-2025 Movatter.jp