Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
This repository was archived by the owner on Sep 18, 2023. It is now read-only.
/piperPublic archive

piper - a distributed workflow engine

License

NotificationsYou must be signed in to change notification settings

runabol/piper

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

I no longer maintain this project. You might want to check outtork as an alternative solution.

Introduction

Piper is an open-source, distributed workflow engine built on Spring Boot, designed to be dead simple.

Piper can run on one or a thousand machines depending on your scaling needs.

In Piper, work to be done is defined as a set of tasks called a Pipeline. Pipelines can be sourced from many locations but typically they live on a Git repository where they can be versioned and tracked.

Piper was originally built to support the need to transcode massive amounts of video in parallel. Since transcoding video is a CPU and time instensive process I had to scale horizontally. Moreover, I needed a way to monitor these long running jobs, auto-retry them and otherwise control their execution.

Tasks

Tasks are the basic building blocks of a pipeline. Each task has atype property which maps to aTaskHandler implementation, responsible for carrying out the task.

For example here's theRandomIntTaskHandler implementation:

  public class RandomInt implements TaskHandler<Object> {    @Override    public Object handle(Task aTask) throws Exception {      int startInclusive = aTask.getInteger("startInclusive", 0);      int endInclusive = aTask.getInteger("endInclusive", 100);      return RandomUtils.nextInt(startInclusive, endInclusive);    }  }

While it doesn't do much beyond generating a random integer, it does demonstrate how aTaskHandler works. aTask instance is passed as an argument totheTaskHandler which contains all the Key-Value pairs of that task.

TheTaskHandler is then responsible for executing the task using this input and optionally returning an output which can be used by other pipeline tasks downstream.

Pipelines

Piper pipelines are authored in YAML, a JSON superset.

Here is an example of a basic pipeline definition.

name: Hello Demoinputs:                --+  - name: yourName       |    label: Your Name     | - This defines the inputs    type: string         |   expected by the pipeline    required: true     --+outputs:                 --+  - name: myMagicNumber    | - You can output any of the job's    value: ${randomNumber} |   variable as the job's output.                         --+tasks:  - name: randomNumber               --+    label: Generate a random number    |    type: random/int                   | - This is a task    startInclusive: 0                  |    endInclusive: 10000              --+  - label: Print a greeting    type: io/print    text: Hello ${yourName}  - label: Sleep a little    type: time/sleep        --+    millis: ${randomNumber}   | - tasks may refer to the result of a previous task                            --+  - label: Print a farewell    type: io/print    text: Goodbye ${yourName}

So tasks are nothing but a collection of key-value pairs. At a minimum each task contains atype property which maps to an appropriateTaskHandler that needs to execute it.

Tasks may also specify aname property which can be used to name the output of the task so it can be used later in the pipeline.

Thelabel property is used to give a human-readble description for the task.

Thenode property can be used to route tasks to work queues other than the defaulttasks queue. This allows one to design a cluster of worker nodes of different types, of different capacity, different 3rd party software dependencies and so on.

Theretry property can be used to specify the number of times that a task is allowed to automatically retry in case of a failure.

Thetimeout property can be used to specify the number of seconds/minutes/hours that a task may execute before it is cancelled.

Theoutput property can be used to modify the output of the task in some fashion. e.g. convert it to an integer.

All other key-value pairs are task-specific and may or may not be required depending on the specific task.

Architecture

Piper is composed of the following components:

Coordinator: The Coordinator is the like the central nervous system of Piper. It keeps tracks of jobs, dishes out work to be done by Worker machines, keeps track of failures, retries and other job-level details. Unlike Worker nodes, it does not execute actual work but delegate all task activities to Worker instances.

Worker: Workers are the work horses of Piper. These are the Piper nodes that actually execute tasks requested to be done by the Coordinator machine. Unlike the Coordinator, the workers are stateless, which by that is meant that they do not interact with a database or keep any state in memory about the job or anything else. This makes it very easy to scale up and down the number of workers in the system without fear of losing application state.

Message Broker: All communication between the Coordinator and the Worker nodes is done through a messaging broker. This has many advantages:

  1. if all workers are busy the message broker will simply queue the message until they can handle it.
  2. when workers boot up they subscribe to the appropriate queues for the type of work they are intended to handle
  3. if a worker crashes the task will automatically get re-queued to be handle by another worker.
  4. Last but not least, workers andTaskHandler implementations can be written in any language since they decoupled completely through message passing.

Database: This piece holds all the jobs state in the system, what tasks completed, failed etc. It is used by the Coordinator as its "mind".

Pipeline Repository: The component where pipelines (workflows) are created, edited etc. by pipeline engineers.

Control Flow

Piper support the following constructs to control the flow of execution:

Each

Applies the functioniteratee to each item inlist, in parallel. Note, that since this function applies iteratee to each item in parallel, there is no guarantee that theiteratee functions will complete in order.

- type: each  list: [1000,2000,3000]  iteratee:    type: time/sleep    millis: ${item}

This will generate three parallel tasks, one for each items in the list, which willsleep for 1, 2 and 3 seconds respectively.

Parallel

Run thetasks collection of functions in parallel, without waiting until the previous function has completed.

- type: parallel  tasks:    - type: io/print      text: hello    - type: io/print      text: goodbye

Fork/Join

Executes each branch in thebranches as a seperate and isolated sub-flow. Branches are executed internally in sequence.

- type: fork  branches:     - - name: randomNumber                 <-- branch 1 start here         label: Generate a random number         type: random/int         startInclusive: 0         endInclusive: 5000       - type: time/sleep         millis: ${randomNumber}     - - name: randomNumber                 <-- branch 2 start here         label: Generate a random number         type: random/int         startInclusive: 0         endInclusive: 5000       - type: time/sleep         millis: ${randomNumber}

Switch

Executes one and only one branch of execution based on theexpression value.

- type: switch  expression: ${selector} <-- determines which case will be executed  cases:     - key: hello                 <-- case 1 start here       tasks:         - type: io/print           text: hello world     - key: bye                   <-- case 2 start here       tasks:         - type: io/print           text: goodbye world  default:    - tasks:        -type: io/print         text: something else

Map

Produces a new collection of values by mapping each value inlist through theiteratee function. Theiteratee is called with an item fromlist in parallel. When theiteratee is finished executing on all items themap task will return a list of execution results in an order which corresponds to the order of the sourcelist.

- name: fileSizes  type: map  list: ["/path/to/file1.txt","/path/to/file2.txt","/path/to/file3.txt"]  iteratee:    type: io/filesize    file: ${item}

Subflow

Starts a new job as a sub-flow of the current job. Output of the sub-flow job is the output of the task.

- type: subflow  pipelineId: copy_files  inputs:    - source: /path/to/source/dir    - destination: /path/to/destination/dir

Pre/Post/Finalize

Each task can define a set of tasks that will be executed prior to its execution (pre),after its succesful execution (post) and at the end of the task's lifecycle regardless of the outcome of the task'sexecution (finalize).

pre/post/finalize tasks always execute on the same node which will execute the task itself and are considered to be an atomic part of the task. That is, failure in any of thepre/post/finalize tasks is considered a failure of the entire task.

  - label: 240p    type: media/ffmpeg    options: [      "-y",      "-i",      "/some/input/video.mov",      "-vf","scale=w=-2:h=240",      "${workDir}/240p.mp4"    ]    pre:      - name: workDir        type: core/var        value: "${temptDir()}/${uuid()}"      - type: io/mkdir        path: "${workDir}"    post:      - type: s3/putObject        uri: s3://my-bucket/240p.mp4    finalize:      - type: io/rm        path: ${workDir}

Webhooks

Piper provide the ability to register HTTP webhooks to receieve notifications for certain events.

Registering webhooks is done when creating the job. E.g.:

{  "pipelineId": "demo/hello",  "inputs": {    ...  },  "webhooks": [{    "type": "job.status",    "url": "http://example.com",    "retry": {   # optional configuration for retry attempts in case of webhook failure      "initialInterval":"3s" # default 2s      "maxInterval":"10s" # default 30s      "maxAttempts": 4 # default 5      "multiplier": 2.5 # default 2.0    }  }]}

type is the type of event you would like to be notified on andurl is the URL that Piper would be calling when the event occurs.

Supported types arejob.status andtask.started.

Task Handlers

core/var

  name: pi  type: core/var  value: 3.14159

io/createTempDir

  name: tempDir  type: io/create-temp-dir

io/filepath

  name: myFilePath  type: io/filepath  filename: /path/to/my/file.txt

io/ls

  name: listOfFiles  type: io/ls  recursive: true # default: false  path: /path/to/directory

io/mkdir

  type: io/mkdir  path: /path/to/directory

io/print

  type: io/print  text: hello world

io/rm

  type: io/rm  path: /some/directory

media/dar

  name: myDar  type: media/dar  input: /path/to/my/video/mp4

media/ffmpeg

  type: media/ffmpeg  options: [    -y,    -i, "${input}",    "-pix_fmt","yuv420p",    "-codec:v","libx264",    "-preset","fast",    "-b:v","500k",    "-maxrate","500k",    "-bufsize","1000k",    "-vf","scale=-2:${targetHeight}",    "-b:a","128k",    "${output}"  ]

media/ffprobe

  name: ffprobeResults  type: media/ffprobe  input: /path/to/my/media/file.mov

media/framerate

  name: framerate  type: media/framerate  input: /path/to/my/video/file.mov

media/mediainfo

  name: mediainfoResult  type: media/mediainfo  input: /path/to/my/media/file.mov

media/vduration

  name: duration  type: media/vduration  input: /path/to/my/video/file.mov

media/vsplit

  name: chunks  type: media/vsplit  input: /path/to/my/video.mp4  chunkSize: 30s

media/vstitch

  type: media/vstitch  chunks:    - /path/to/chunk_001.mp4    - /path/to/chunk_002.mp4    - /path/to/chunk_003.mp4    - /path/to/chunk_004.mp4  output: /path/to/stitched/file.mp4

random/int

  name: someRandomNumber  type: random/int  startInclusive: 1000 # default 0  endInclusive: 9999 # default 100

random/rogue

  type: random/rogue  probabilty: 0.25 # default 0.5

s3/getObject

  type: s3/getObject  uri: s3://my-bucket/path/to/file.mp4  filepath: /path/to/my/file.mp4

s3/listObjects

  type: s3/listObjects  bucket: my-bucket  prefix: some/path/

s3/getUrl

  type: s3/getUrl  uri: s3://my-bucket/path/to/file.mp4

s3/presignGetObject

  name: url  type: s3/presignGetObject  uri: s3://my-bucket/path/to/file.mp4  signatureDuration: 60s

s3/putObject

  type: s3/putObject  uri: s3://my-bucket/path/to/file.mp4  filepath: /path/to/my/file.mp4

shell/bash

  name: listOfFiles  type: shell/bash  script: |        for f in /tmp        do          echo "$f"        done

time/sleep

  type: time/sleep  millis: 60000

Expression Functions

boolean

  type: core/var  value: "${boolean('false')}"

byte

  type: core/var  value: "${byte('42')}"

char

  type: core/var  value: "${char('1')}"

short

  type: core/var  value: "${short('42')}"

int

  type: core/var  value: "${int('42')}"

long

  type: core/var  value: "${long('42')}"

float

  type: core/var  value: "${float('4.2')}"

double

  type: core/var  value: "${float('4.2')}"

systemProperty

  type: core/var  value: "${systemProperty('java.home')}"

range

  type: core/var  value: "${range(0,100)}" # [0,1,...,100]

join

  type: core/var  value: "${join('A','B','C')}" # ABC

concat

  type: core/var  value: "${join('A','B','C')"}

concat

  type: core/var  value: ${concat(['A','B'],['C'])} # ['A','B','C']

flatten

  type: core/var  value: ${flatten([['A'],['B']])} # ['A','B']

sort

  type: core/var  value: ${sort([3,1,2])} # [1,2,3]

tempDir

  type: core/var  value: "${tempDir()}"  # e.g. /tmp

uuid

  name: workDir  type: core/var  value: "${tempDir()}/${uuid()}"

stringf

  type: core/var  value: "${stringf('%03d',5)}"  # 005

now

  type: core/var  value: "${dateFormat(now(),'yyyy')}"  # e.g. 2020

timestamp

  type: core/var  value: "${timestamp()}"  # e.g. 1583268621423

dateFormat

  type: core/var  value: "${dateFormat(now(),'yyyy')}"  # e.g. 2020

config

  type: core/var  value: "${config('some.config.property')}"

Tutorials

Hello World

Start a local Postgres database:

./scripts/database.sh

Start a local RabbitMQ instance:

./scripts/rabbit.sh

Build Piper:

./scripts/build.sh

Start Piper:

./scripts/development.sh

Go to the browser athttp://localhost:8080/jobs

Which should give you something like:

{  number: 0,  totalItems: 0,  size: 0,  totalPages: 0,  items: [ ]}

The/jobs endpoint lists all jobs that are either running or were previously run on Piper.

Start a demo job:

curl -s \     -X POST \     -H Content-Type:application/json \     -d '{"pipelineId":"demo/hello","inputs":{"yourName":"Joe Jones"}}' \     http://localhost:8080/jobs

Which should give you something like this as a response:

{  "createTime": "2017-07-05T16:56:27.402+0000",  "webhooks": [],  "inputs": {    "yourName": "Joe Jones"  },  "id": "8221553af238431ab006cc178eb59129",  "label": "Hello Demo",  "priority": 0,  "pipelineId": "demo/hello",  "status": "CREATED",  "tags": []}

If you'll refresh your browser page now you should see the executing job.

In case you are wondering, thedemo/hello pipeline is located athere

Writing your first pipeline

Create the directory~/piper/pipelines and create a file in there calledmypipeline.yaml.

Edit the file and the following text:

label: My Pipelineinputs:  - name: name    type: string    required: truetasks:  - label: Print a greeting    type: io/print    text: Hello ${name}  - label: Print a farewell    type: io/print    text: Goodbye ${name}

Execute your workflow

curl -s -X POST -H Content-Type:application/json -d '{"pipelineId":"mypipeline","inputs":{"name":"Arik"}}' http://localhost:8080/jobs

You can make changes to your pipeline and execute the./scripts/clear.sh to clear the cache to reload the pipeline.

Scaling Piper

Depending on your workload you will probably exhaust the ability to run Piper on a single node fairly quickly. Good, because that's where the fun begins.

Start RabbitMQ:

./scripts/rabbit.sh

Start the Coordinator:

./scripts/coordinator.sh

From another terminal window, start a Worker:

./scripts/worker.sh

Execute the demo pipeline:

curl -s \     -X POST \     -H Content-Type:application/json \     -d '{"pipelineId":"demo/hello","inputs":{"yourName":"Joe Jones"}}' \     http://localhost:8080/jobs

Transcoding a Video

Note: You must haveffmpeg installed on your worker machine to get this demo to work

Transcode a source video to an SD (480p) output:

curl -s \     -X POST \     -H Content-Type:application/json \     -d '{"pipelineId":"video/transcode","inputs":{"input":"/path/to/video/input.mov","output":"/path/to/video/output.mp4","profile":"sd"}}' \     http://localhost:8080/jobs

Transcode a source video to an HD (1080p) output:

curl -s \     -X POST \     -H Content-Type:application/json \     -d '{"pipelineId":"video/transcode","inputs":{"input":"/path/to/video/input.mov","output":"/path/to/video/output.mp4","profile":"hd"}}' \     http://localhost:8080/jobs

Transcoding a Video (Split & Stitch)

SeeTranscoding video at scale with Piper

Adaptive Streaming

SeeAdaptive Streaming with Piper

Using Git as a Pipeline Repository backend

Rather than storing the pipelines in your local file system you can use Git to store them for you. This has great advantages, not the least of which is pipeline versioning, Pull Requests and everything else Git has to offer.

To enable Git as a pipeline repository set thepiper.pipeline-repository.git.enabled flag totrue in./scripts/development.sh and restart Piper. By default, Piper will use the demo repositorypiper-pipelines.

You can change it by using thepiper.pipeline-repository.git.url andpiper.pipeline-repository.git.search-paths configuration parameters.

Configuration

# messaging provider between Coordinator and Workers (jms | amqp | kafka) default: jmspiper.message-broker.provider=jms# turn on the Coordinator processpiper.coordinator.enabled=true# turn on the Worker process and listen to tasks.piper.worker.enabled=true# when worker is enabled, subscribe to the default "tasks" queue with 5 concurrent consumers.# you may also route pipeline tasks to other arbitrarilty named task queues by specifying the "node"# property on any give task.# E.g. node: captions will route to the captions queue which a worker would subscribe to with piper.worker.subscriptions.captions# note: queue must be created before tasks can be routed to it. Piper will create the queue if it isn't already there when the worker# bootstraps.piper.worker.subscriptions.tasks=5# enable a git-based pipeline repositorypiper.pipeline-repository.git.enabled=true# The URL to the Git Repopiper.pipeline-repository.git.url=https://github.com/myusername/my-pipelines.gitpiper.pipeline-repository.git.branch=masterpiper.pipeline-repository.git.username=mepiper.pipeline-repository.git.password=secret# folders within the git repo that are scanned for pipelines.piper.pipeline-repository.git.search-paths=demo/,video/# enable file system based pipeline repositorypiper.pipeline-repository.filesystem.enabled=true# location of pipelines on the file system.piper.pipeline-repository.filesystem.location-pattern=$HOME/piper/**/*.yaml# data sourcespring.datasource.platform=postgres# only postgres is supported at the momentspring.datasource.url=jdbc:postgresql://localhost:5432/piperspring.datasource.username=piperspring.datasource.password=piperspring.datasource.initialization-mode=never# change to always when bootstrapping the database for the first time

Docker

creactiviti/piperHello World in Docker:

Start a local Postgres database:

./scripts/database.sh

Create an empty directory:

mkdir pipelinescd pipelines

Create a simple pipeline file --hello.yaml -- and paste the following to it:

label: Hello Worldinputs:  - name: name    label: Your Name    type: core/var    required: truetasks:  - label: Print Hello Message    type: io/print    text: "Hello ${name}!"
docker run \  --name=piper \  --link postgres:postgres \  --rm \  -it \  -e spring.datasource.url=jdbc:postgresql://postgres:5432/piper \  -e spring.datasource.initialization-mode=always \  -e piper.worker.enabled=true \  -e piper.coordinator.enabled=true \  -e piper.worker.subscriptions.tasks=1 \  -e piper.pipeline-repository.filesystem.enabled=true \  -e piper.pipeline-repository.filesystem.location-pattern=/pipelines/**/*.yaml \  -v $PWD:/pipelines \  -p 8080:8080 \  creactiviti/piper
curl -s \     -X POST \     -H Content-Type:application/json \     -d '{"pipelineId":"hello","inputs":{"name":"Joe Jones"}}' \     http://localhost:8080/jobs

License

Piper is released under version 2.0 of theApache License.

Releases

No releases published

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp