Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Concurrent Python made simple

License

NotificationsYou must be signed in to change notification settings

pyper-dev/pyper

Repository files navigation

Pyper

Concurrent Python made simple

TestCoveragePackage versionSupported Python versions


Pyper is a flexible framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🔀ETL Systems, ⚙️Data Microservices, and 🌐Data Collection

See theDocumentation

Key features:

  • 💡Intuitive API: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.
  • 🚀Functional Paradigm: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.
  • 🛡️Safety: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling.
  • Efficiency: Designed from the ground up for lazy execution, using queues, workers, and generators.
  • Pure Python: Lightweight, with zero sub-dependencies.

Installation

Install the latest version usingpip:

$pip install python-pyper

Note thatpython-pyper is thepypi registered package.

Usage

In Pyper, thetask decorator is used to transform functions into composable pipelines.

Let's simulate a pipeline that performs a series of transformations on some data.

importasyncioimporttimefrompyperimporttaskdefget_data(limit:int):foriinrange(limit):yieldiasyncdefstep1(data:int):awaitasyncio.sleep(1)print("Finished async wait",data)returndatadefstep2(data:int):time.sleep(1)print("Finished sync wait",data)returndatadefstep3(data:int):foriinrange(10_000_000):_=i*iprint("Finished heavy computation",data)returndataasyncdefmain():# Define a pipeline of tasks using `pyper.task`pipeline=task(get_data,branch=True) \|task(step1,workers=20) \|task(step2,workers=20) \|task(step3,workers=20,multiprocess=True)# Call the pipelinetotal=0asyncforoutputinpipeline(limit=20):total+=outputprint("Total:",total)if__name__=="__main__":asyncio.run(main())

Pyper provides an elegant abstraction of the execution of each task, allowing you to focus on building out thelogical functions of your program. In themain function:

  • pipeline defines a function; this takes the parameters of its first task (get_data) and yields each output from its last task (step3)
  • Tasks are piped together using the| operator (motivated by Unix's pipe operator) as a syntactic representation of passing inputs/outputs between tasks.

In the pipeline, we are executing three different types of work:

  • task(step1, workers=20) spins up 20asyncio.Tasks to handle asynchronous IO-bound work

  • task(step2, workers=20) spins up 20threads to handle synchronous IO-bound work

  • task(step3, workers=20, multiprocess=True) spins up 20processes to handle synchronous CPU-bound work

task acts as one intuitive API for unifying the execution of each different type of function.

Each task has workers that submit outputs to the next task within the pipeline via queue-based data structures; this is the mechanism underpinning how concurrency and parallelism are achieved. See thedocs for a breakdown of what a pipeline looks like under the hood.


See a non-async example

Pyper pipelines are by default non-async, as long as their tasks are defined as synchronous functions. For example:

importtimefrompyperimporttaskdefget_data(limit:int):foriinrange(limit):yieldidefstep1(data:int):time.sleep(1)print("Finished sync wait",data)returndatadefstep2(data:int):foriinrange(10_000_000):_=i*iprint("Finished heavy computation",data)returndatadefmain():pipeline=task(get_data,branch=True) \|task(step1,workers=20) \|task(step2,workers=20,multiprocess=True)total=0foroutputinpipeline(limit=20):total+=outputprint("Total:",total)if__name__=="__main__":main()

A pipeline consisting ofat least one asynchronous function becomes anAsyncPipeline, which exposes the same usage API, providedasync andawait syntax in the obvious places. This makes it effortless to combine synchronously defined and asynchronously defined functions where need be.

Examples

To explore more of Pyper's features, see some furtherexamples

Dependencies

Pyper is implemented in pure Python, with no sub-dependencies. It is built on top of the well-established built-in Python modules:

License

This project is licensed under the terms of the MIT license.


[8]ページ先頭

©2009-2025 Movatter.jp