Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Oct 21, 2021. It is now read-only.

Python DSL for Argo Workflows

License

NotificationsYou must be signed in to change notification settings

argoproj-labs/argo-python-dsl

Repository files navigation

License  CI  

Python DSL forArgo Workflows

If you're new to Argo, we recommend checking out the examples in pure YAML. The language is descriptive and the Argoexamples provide an exhaustive explanation.

For a more experienced audience, this DSL grants you the ability to programatically define Argo Workflows in Python which is then translated to the Argo YAML specification.

The DSL makes use of the Argo models defined in theArgo Python client repository. Combining the two approaches we are given the whole low-level control over Argo Workflows.

Installation

pip install argo-workflows-dsl

Getting Started

Hello World

This example demonstrates the simplest functionality. Defining aWorkflow by subclassing theWorkflow class and a single template with the@template decorator.

The entrypoint to the workflow is defined as anentrypoint class property.

Argo YAMLArgo Python

# @file: hello-world.yamlapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:hello-worldgenerateName:hello-world-spec:entrypoint:whalesaytemplates:  -name:whalesaycontainer:name:whalesayimage:docker/whalesay:latestcommand:[cowsay]args:["hello world"]

fromargo.workflows.dslimportWorkflowfromargo.workflows.dslimporttemplatefromargo.workflows.dsl.templatesimportV1ContainerclassHelloWorld(Workflow):entrypoint="whalesay"@templatedefwhalesay(self)->V1Container:container=V1Container(image="docker/whalesay:latest",name="whalesay",command=["cowsay"],args=["hello world"]        )returncontainer

DAG: Tasks

This example demonstrates tasks defined via dependencies forming adiamond structure. Tasks are defined using the@task decorator and theymust return a valid template.

The entrypoint is automatically created asmain for the top-level tasks of theWorkflow.

Argo YAMLArgo Python

# @file: dag-diamond.yaml# The following workflow executes a diamond workflow##   A#  / \# B   C#  \ /#   DapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:dag-diamondgenerateName:dag-diamond-spec:entrypoint:maintemplates:  -name:maindag:tasks:      -name:Atemplate:echoarguments:parameters:[{name: message, value: A}]      -name:Bdependencies:[A]template:echoarguments:parameters:[{name: message, value: B}]      -name:Cdependencies:[A]template:echoarguments:parameters:[{name: message, value: C}]      -name:Ddependencies:[B, C]template:echoarguments:parameters:[{name: message, value: D}]# @task: [A, B, C, D]  -name:echoinputs:parameters:      -name:messagecontainer:name:echoimage:alpine:3.7command:[echo, "{{inputs.parameters.message}}"]

fromargo.workflows.dslimportWorkflowfromargo.workflows.dsl.tasksimport*fromargo.workflows.dsl.templatesimport*classDagDiamond(Workflow):@task@parameter(name="message",value="A")defA(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="B")@dependencies(["A"])defB(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="C")@dependencies(["A"])defC(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="D")@dependencies(["B","C"])defD(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@template@inputs.parameter(name="message")defecho(self,message:V1alpha1Parameter)->V1Container:container=V1Container(image="alpine:3.7",name="echo",command=["echo","{{inputs.parameters.message}}"],        )returncontainer

Artifacts

Artifacts can be passed similarly toparameters in three forms:arguments,inputs andoutputs, wherearguments is the default one (simply@artifact or@parameter).

I.e.:inputs.artifact(...)

Both artifacts and parameters are passedone by one, which means that for multiple artifacts (parameters), one should call:

@inputs.artifact(name="artifact", ...)@inputs.parameter(name="parameter_a", ...)@inputs.parameter(...)deffoo(self,artifact:V1alpha1Artifact,prameter_b:V1alpha1Parameter, ...):pass

A complete example:

Argo YAMLArgo Python

# @file: artifacts.yamlapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:artifact-passinggenerateName:artifact-passing-spec:entrypoint:maintemplates:  -name:maindag:tasks:      -name:generate-artifacttemplate:whalesay      -name:consume-artifacttemplate:print-messagearguments:artifacts:# bind message to the hello-art artifact# generated by the generate-artifact step          -name:messagefrom:"{{tasks.generate-artifact.outputs.artifacts.hello-art}}"  -name:whalesaycontainer:name:"whalesay"image:docker/whalesay:latestcommand:[sh, -c]args:["cowsay hello world | tee /tmp/hello_world.txt"]outputs:artifacts:# generate hello-art artifact from /tmp/hello_world.txt# artifacts can be directories as well as files      -name:hello-artpath:/tmp/hello_world.txt  -name:print-messageinputs:artifacts:# unpack the message input artifact# and put it at /tmp/message      -name:messagepath:/tmp/messagecontainer:name:"print-message"image:alpine:latestcommand:[sh, -c]args:["cat", "/tmp/message"]

fromargo.workflows.dslimportWorkflowfromargo.workflows.dsl.tasksimport*fromargo.workflows.dsl.templatesimport*classArtifactPassing(Workflow):@taskdefgenerate_artifact(self)->V1alpha1Template:returnself.whalesay()@task@artifact(name="message",_from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"    )defconsume_artifact(self,message:V1alpha1Artifact)->V1alpha1Template:returnself.print_message(message=message)@template@outputs.artifact(name="hello-art",path="/tmp/hello_world.txt")defwhalesay(self)->V1Container:container=V1Container(name="whalesay",image="docker/whalesay:latest",command=["sh","-c"],args=["cowsay hello world | tee /tmp/hello_world.txt"]        )returncontainer@template@inputs.artifact(name="message",path="/tmp/message")defprint_message(self,message:V1alpha1Artifact)->V1Container:container=V1Container(name="print-message",image="alpine:latest",command=["sh","-c"],args=["cat","/tmp/message"],        )returncontainer


Going further:closure andscope

This is where it gets quite interesting. So far, we've only scratched the benefits that the Python implementation provides.

What if we want to use native Python code and execute it as a step in the Workflow. What are our options?

Option A) is to reuse the existing mindset, dump the code in a string, pass it as the source to theV1ScriptTemplate model and wrap it with thetemplate decorator.This is illustrated in the following code block:

importtextwrapclassScriptsPython(Workflow):    ...@templatedefgen_random_int(self)->V1alpha1ScriptTemplate:source=textwrap.dedent("""\          import random          i = random.randint(1, 100)          print(i)        """)template=V1alpha1ScriptTemplate(image="python:alpine3.6",name="gen-random-int",command=["python"],source=source        )returntemplate

Which results in:

api_version:argoproj.io/v1alpha1kind:Workflowmetadata:generate_name:scripts-python-name:scripts-pythonspec:entrypoint:main...templates:  -name:gen-random-intscript:command:      -pythonimage:python:alpine3.6name:gen-random-intsource:'import random\ni = random.randint(1, 100)\nprint(i)\n'

Not bad, but also not living up to the full potential. Since we're already writing Python, why would we wrap the code in a string? This is where we introduceclosures.

closures

The logic ofclosures is quite simple. Just wrap the function you want to execute in a container in the@closure decorator. Theclosure then takes care of the rest and returns atemplate (just as the@template decorator).

The only thing we need to take care of is to provide it an image which has the necessary Python dependencies installed and is present in the cluster.

There is a plan to eliminate even this step in the future, but currently it is inavoidable.

Following the previous example:

classScriptsPython(Workflow):    ...@closure(image="python:alpine3.6"    )defgen_random_int()->V1alpha1ScriptTemplate:importrandomi=random.randint(1,100)print(i)

The closure implements theV1alpha1ScriptTemplate, which means that you can pass in things likeresources,env, etc...

Also, make sure that youimport whatever library you are using, the context is not preserved ---closure behaves as a staticmethod and issandboxed from the module scope.

scope

Now, what if we had a function (or a whole script) which is quite big. Wrapping it in a single Python function is not very Pythonic and it gets tedious. This is where we can make use ofscopes.

Say that we, for example, wanted to initialize logging before running ourgen_random_int function.

    ...@closure(scope="main",image="python:alpine3.6"    )defgen_random_int(main)->V1alpha1ScriptTemplate:importrandommain.init_logging()i=random.randint(1,100)print(i)@scope(name="main")definit_logging(level="DEBUG"):importlogginglogging_level=getattr(logging,level,"INFO")logging.getLogger("__main__").setLevel(logging_level)

Notice the 3 changes that we've made:

@closure(scope="main",# <--- provide the closure a scopeimage="python:alpine3.6"    )defgen_random_int(main):# <--- use the scope name
@scope(name="main")# <--- add function to a scopedefinit_logging(level="DEBUG"):

Each function in the given scope is then namespaced by the scope name and injected to the closure.

I.e. the resulting YAML looks like this:

...spec:...templates:    -name:gen-random-intscript:command:        -pythonimage:python:alpine3.6name:gen-random-intsource:|-          import logging          import random          class main:            """Scoped objects injected from scope 'main'."""            @staticmethod            def init_logging(level="DEBUG"):              logging_level = getattr(logging, level, "INFO")              logging.getLogger("__main__").setLevel(logging_level)          main.init_logging()          i = random.randint(1, 100)          print(i)

Submitting with dsl

Assume we are runningkubectl -n argo port-forward deployment/argo-server 2746:2746

Workflow

fromargo.workflows.clientimport (ApiClient,Configuration,WorkflowServiceApi,V1alpha1WorkflowCreateRequest)fromargo.workflows.dslimportWorkflowfromargo.workflows.dsl.tasksimport*fromargo.workflows.dsl.templatesimport*classDagDiamond(Workflow):@task@parameter(name="message",value="A")defA(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="B")@dependencies(["A"])defB(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="C")@dependencies(["A"])defC(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="D")@dependencies(["B","C"])defD(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@template@inputs.parameter(name="message")defecho(self,message:V1alpha1Parameter)->V1Container:container=V1Container(image="alpine:3.7",name="echo",command=["echo","{{inputs.parameters.message}}"],        )returncontainerif__name__=="__main__":wf=DagDiamond()config=Configuration(host="http://localhost:2746")client=ApiClient(configuration=config)wf.submit(client,'argo')

CronWorkflow

fromargo.workflows.clientimportConfiguration,ApiClientfromargo.workflows.dslimporttemplatefromargo.workflows.dslimportCronWorkflowfromargo.workflows.dsl.templatesimportV1ContainerclassHelloCron(CronWorkflow):entrypoint="whalesay"schedule="0 0 1 1 *"@templatedefwhalesay(self)->V1Container:container=V1Container(image="docker/whalesay:latest",name="whalesay",command=["cowsay"],args=["hello world"],        )returncontainerif__name__=="__main__":wf=HelloCron()print(wf)config=Configuration(host="http://localhost:2746")client=ApiClient(configuration=config)wf.submit(client,"argo")

The compilation also takes all imports to the front and remove duplicates for convenience and more natural look so that you don't feel like poking your eyes when you look at the resulting YAML.


For more examples see theexamples folder.



Authors:

About

Python DSL for Argo Workflows

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors8


[8]ページ先頭

©2009-2025 Movatter.jp