- Notifications
You must be signed in to change notification settings - Fork1
A Python interpreter embedded in Redpanda Connect
License
voutilad/rp-connect-python
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Adds an embedded Python interpreter to Redpanda Connect, so you canwrite your integration and transformation logic in pure Python:
# rot13.yamlinput:stdin:{}pipeline:processors: -python:exe:python3script:| import codecs msg = content().decode() root.original = msg root.encoded = codecs.encode(msg, "rot_13")output:stdout:{}logger:level:OFF
$ echo My voice is my passport | ./rp-connect-python run examples/rot13.yaml{"original": "My voice is my passport", "encoded": "Zl ibvpr vf zl cnffcbeg"}
- Python 3.12 (hard requirement, currently!)
setuptools
(makes it so much easier to findlibpython
, justpip install
it.)- On macOS, if you used
brew
to install Python, it can fall back to usingotool
to find the dynamic library. - On Linux...sorry! You must use
setuptools
.
- On macOS, if you used
- Go 1.22 or newer
Building `rp-connect-python is simple as it's using pure Go code:
CGO_ENABLED=0 go build
That's it! A variety of tests are provided and looking at the current GitHubactionfile shows some examples.
This project provides the following new Python component types:
- Input -- for generating data using Python
- Processor -- for transforming data with Python
- Output -- for sinking data with Python
Thepython
input allows you to generate or acquire data using Python. Yourscript can provide one of the following data generation approaches based onthe type of object you target when setting thename
configuration property:
object
- If you provide a single Python object, it can be passed as a single input.
list
ortuple
- A list or tuple will have each item extracted and provided to the pipeline.
generator
- Items will be produced from the generator until it's exhausted.
function
- Any function provided will be called repeatedly until it returns
None
. - Functions may take an optional kwarg
state
, adict
, and use itto keep state between invocations.
- Any function provided will be called repeatedly until it returns
By default, the input will serialize data either as native Go values (in thecase ofstring
,number
,bytes
) and will convert to JSON in the case ofPython container typesdict
,list
, andtuple
.
Serialization viapickle
can be done manually, but if you setpickle: true
the input will convert the produced Python object usingpickle.dumps()
automatically, storing the output as raw bytes on the Redpanda ConnectMessage
.
Common configuration with defaults for a Pythoninput
:
input:label:""python:pickle:false# Enable pickle serializerbatch_size:1# How many messages to include in a single message batch.mode:global# Interpreter mode (one of "global", "isolated", "isolated_legacy")exe:"python3"# Name of python binary to use.name:# No default (required), name of generating local object.script:# No default (required), Python code to execute.
An example that uses a Python generator to emit 10 records, one every second:
input:python:name:gscript:| import time def producer(): for i in range(10): time.sleep(1) yield { "now": time.ctime(), "i": i } g = producer()
Currently, a single interpreter is used for executing the input script. If youchange themode, it will use different interpretersettings which could affectpython compatability ofyour script. Keep this in mind.
Thepython
processor provides a similar experience to themapping
bloblangprocessor, but in pure Python. The interpreter that runs your code provideslazy hooks back into Redpanda Connect, to mimic bloblang behavior:
content()
-- similar to the bloblang function, it returns thebytes
ofa message. This performs a lazy copy of raw bytes into the interpreter.metadata(key)
-- similar to the bloblang function, it provides access tothe metadata of a message using the providedkey
.root
-- this is adict
-like object in scope by default providing threeoperating modes simultaneously:- Assign key/values like a Python
dict
, e.g.root["name"] = "Dave"
- Use bloblang-like assignment by attribute, e.g.
root.name.first = "Dave"
- Reassign it to a new object, e.g.
root = (1, 2)
. (Note: if you reassignroot
, it loses its magic properties!)
- Assign key/values like a Python
Heads up!
If using the bloblang-like assignment, it will create the hierarchy of keyssimilar to in bloblang. `root.name.first = "Dave" will work even if "name"hasn't been assigned yet, producing a dict like:
root= {"name": {"first":"Dave" } }
For the details of howroot
works, see theRoot
Pythonclass.
Additionally, the following helper functions and objects improveinteroperability:
unpickle()
-- will usepickle.loads()
to deserialize the Redpanda ConnectMessage
into a Python object.meta
-- adict
that allows you to assign new metadata values to a messageor delete values (if you set the value toNone
for a given key).
An example usingunpickle()
:
pipeline:processors: -python:script:| # these are logically equivalent import pickle this = pickle.loads(content()) this = unpickle() root = this.call_some_method() # if relying on Redpanda Connect structured data, use JSON. import json this = json.loads(content().decode()) root = this["a_key"]
The processor does not currently support automatic deserialization ofincoming data in an effort to keep as much of the expensive hooks back intoGo as lazy as possible so you only pay for what you use.
Common configuration with defaults for a Pythonprocessor
:
pipeline:processors: -python:exe:"python3"# Name of python binary to use.mode:"global"# Interpreter mode (one of "global", "isolated", "isolated_legacy")script:# No default (required), Python script to execute
A simple demo usingrequests which will enrich amessage with a callout to an external web service illustrates many of the priorconcepts of using a Python processor:
input:generate:count:3interval:1smapping:| root.title = "this is a test" root.uuid = uuid_v4() root.i = counter()pipeline:processors: -python:exe:./venv/bin/python3script:| import json import requests import time data = content() try: msg = json.loads(data)["title"] except: msg = "nothing :(" root.msg = f"You said: '{msg}'" root.at = time.ctime() try: root.ip = requests.get("https://api.ipify.org").text except: root.ip = "no internet?"output:stdout:{}
To run the demo, you need a Python environment with therequests
moduleinstalled. This is easy to do with a virtual environment:
# Create a new virtual environment.python3 -m venv venv# Update pip, install setuptools, and install requests into the virtual env../venv/bin/pip install --quiet -U pip setuptools requests# Run the example../rp-connect-python run --log.level=off examples/requests.yaml
You should get output similar to:
{"msg": "You said: 'this is a test'", "at": "Fri Aug 9 19:07:29 2024", "ip": "192.0.1.210"}{"msg": "You said: 'this is a test'", "at": "Fri Aug 9 19:07:30 2024", "ip": "192.0.1.210"}{"msg": "You said: 'this is a test'", "at": "Fri Aug 9 19:07:31 2024", "ip": "192.0.1.210"}
Presently, the Pythonoutput
is a bit of a hack and really just a Pythonprocessor
configured to use a single interpreter instance.
This means all the configuration and behavior is the same as in theprocessor configuration.
When theoutput
improves and warrants further discussion, check this space!
For now, a simpleexample that simply writes theprovided message tostdout
:
input:generate:count:5interval:mapping:| root = "hello world"output:python:script:| msg = content().decode() print(f"you said: '{msg}'")http:enabled:false
rp-connect-python
now supports multiple interpreter modes that may be setseparately on eachinput
,processor
, andoutput
instance.
global
(the default)- Uses a global interpreter (i.e. no sub-interpreters) for all execution.
- Allows passing pointers to Python objects between components, avoidingcostly serialization/deserialization.
- Provides the most compatability at expense of throughput as your code willrely on the global main interpreter for memory management and the GIL.
isolated
- Uses multiple isolated sub-interpreters with their own memory allocatorsand GILs.
- Provides the best throughput performance for pure-Python use cases thatdon't leverage Python modules that use native code (e.g.
numpy
). - Require serializing/deserializing data as it leaves the context of theinterpreter.
isolated_legacy
- Same as
isolated
, but instead of distinct GIL and memory allocators, usesa shared GIL and allocator. - Balances compatability with performance. Some Python modules might notsupport full isolation, butwill work in a shared GIL mode.
- Same as
A more detailed discussion for the nerds follows.
Most pure Python code should "just work" withisolated
mode andisolated_legacy
mode. Some older Python extensions, written in C or thelike, may not work inisolated
mode and requireisolated_legacy
mode.
If you see issues usingisolated
(e.g. crashes), switch toisolated_legacy
.
In general, crashes shouldnot happen. The most common causes are bugsin
rp-connect-python
related touse-after-free's in the Pythonintegration layer. If it's not that, it's an interpreter state issue,which is also a bug most likely inrp-connect-python
. However, given theimmaturity of multi-interpreter support in Python, if the issue "goes away"by switching modes (e.g. to "legacy"), it's possible it's deeper than justrp-connect-python
.
In some cases,isolated_legacy
can perform as well orslightly better thanisolated
even though it uses a shared GIL. It's very workload dependent, soit's worth experimenting.
Usingglopbal
mode for a runtime will execute the Python code in thecontext of the "main" interpreter. (Inisolated
andisolated_legacy
modes,sub-interpreters derive from the "main" interpreter.) This is the traditionalmethod of embedding Python into an application.
While you may scale out yourglobal
mode components, only a singlecomponent instance may utilize the "main" interpreter at a time. This isirrespective of the GIL as Python's C implementation relies heavily onthread-local storage for interpreter state.
Go was design by people that think programmers can't handle managingthreads. (Multi-threading is hard, but that's why we're paid the bigbucks, right?) As a result, the Go runtime does its own scheduling of Goroutines to some number of OS threads to achieve parallelism andconcurrency. Python does not jibe with this and the vibes are off, so alot of the
rp-connect-python
internals are for managing how tocouple Python's thread-oriented approach with Go's go-routine world.
A lot of scientific software that uses external non-Python native codemay run best inglobal
mode. This includes, but is not limited to:
numpy
pandas
pyarrow
A benefit toglobal
mode is it's one interpreter state across all components,so you can create a Python object in one component (e.g. aninput
) andeasily use it in aprocessor
stage without mucking about with serialization.This is great for workloads that create large, in-memory objects, like PandasDataFrames or PyArrow Tables. In these cases, avoiding serialization may meanglobal
mode is more efficient even if there's fighting over the interpreterlock.
The current design assumes arbitrary Go routines will need to acquireownership of the global ("main") interpreter and fight over a mutex. It'sentirely possible the mutex is held at points where the GIL is actuallyreleased or releasable, meaning other Python codecould run safely. It'sfuture work to figure out how to orchestrate this efficiently.
This is en evolving list of notes/tips related to using certainpopular Python modules:
Works best inisolated_legacy
mode. Currently, can panicisolated
mode onsome systems.
While
requests
is pure Python, it does hook into some modules thatare not. Still identifying a race condition causing memory corruptioninisolated
mode.
Recommendsglobal
mode as explicitly does not support Pythonsub-interpreters. May work inisolated_legacy
, but be careful.
Depends onnumpy
, so might be best used inglobal
mode if stability is aconcern. Works fine with thepickle
support for passing DataFrames, but mightnot be the most efficient way for passing data around a long pipeline, soglobal
might be preferable to isolated modes.
Anexample that shows filtering a DataFrame and usingpickle
to pass it from theinput
to theprocessor
:
input:python:mode:globalname:dfpickle:truescript:| import pandas as pd df = pd.DataFrame.from_dict({"name": ["Maple", "Moxie"], "age": [8, 3]})pipeline:processors: -python:mode:globalscript:| import pickle df = unpickle() root = df.to_dict("list")output:stdout:{}
Note the use of
mode: global
!
Works fine inglobal
mode.
An example follows using thepyarrow.dataset
capabilities to read from GCS.
Note the use of
serializer: none
as it prevents data copying/duplication.
input:python:name:batchesserializer:nonescript:| import pyarrow as pa from pyarrow import fs import pyarrow.dataset as ds gcs = fs.GcsFileSystem() dataset = ds.dataset("my-bucket/", format="parquet", filesystem=gcs) # need special handling to raw cython generators, so for now # wrap with a pure python generator def take_all(): for batch in dataset.to_batches(): yield batch batches = take_all()pipeline:processors: -python:script:| # 'this' is now a PyArrow RecordBatch root.nbytes = this.nbytes root.num_rows = this.num_rowsoutput:stdout:{}
Seems to work ok inisolated_legacy
mode, but doesn't supportsub-interpreters, so recommended to run inglobal
mode.
An example of a directory scanner that identifies types of JPEGs:
input:file:paths:[ ./*.jpg ]scanner:to_the_end:{}pipeline:processors: -python:exe:./venv/bin/python3mode:globalscript:| from PIL import Image from io import BytesIO infile = BytesIO(content()) try: with Image.open(infile) as im: root.format = im.format root.size = im.size root.mode = im.mode root.path = metadata("path") except OSError: passoutput:stdout:{}
Assuming youpip install
the dependencies ofsetuptools
andpillow
:
$ python3 -m venv venv$ ./venv/bin/pip install --quiet -U pip setuptools pillow$ ./rp-connect-python run --log.level=off examples/pillow.yaml{"format":"JPEG","mode":"RGB","path":"rpcn_and_python.jpg","size":[1024,1024]}
- Tested on macOS/arm64 and Linux/{arm64,amd64}.
- Not expected to work on Windows. Requires
gogopython
updates.
- Not expected to work on Windows. Requires
- You can only use one Python binary across all Python processors.
- Hardcoded still for Python 3.12. Should be portable to 3.13 and,in cases of
global
mode, earlier versions. Requires changes togogopython
I haven't made yet.
Source code in this project is licensed under the Apache v2 license unlessnoted otherwise.
This software is provided without warranty or support. It is not part ofRedpanda Data's enterprise offering and not supported by Redpanda Data.
About
A Python interpreter embedded in Redpanda Connect