pd.flow.suspend
andpd.flow.rerun
to pause a workflow and resume it later.This is useful when you want to:pd.flow.suspend
pd.flow.suspend
when you want to pause a workflow and proceed with the remaining steps only when manually approved or cancelled.For example, you can suspend a workflow and send yourself a link to manually resume or cancel the rest of the workflow:def handler(pd:'pipedream'): urls= pd.flow.suspend() pd.send.email( subject="Please approve this important workflow", text=f"Click here to approve the workflow: ${urls["resume_url"]}, and cancel here: ${urls["cancel_url"]}" ) # Pipedream suspends your workflow at the end of the step
resume_url
andcancel_url
pd.flow.suspend
returns acancel_url
andresume_url
that lets you cancel or resume paused executions. Since Pipedream pauses your workflow at theend of the step, you can pass these URLs to any external service before the workflow pauses. If that service accepts a callback URL, it can trigger theresume_url
when its work is complete.These URLs are specific to a single execution of your workflow. While the workflow is paused, you can load these in your browser or send any HTTP request to them:cancel_url
will cancel that executionresume_url
will resume that execution$resume_data
step export of the suspended step. For example, if you callpd.flow.suspend
within a step namedcode
, the$resume_data
export should contain the data sent in theresume_url
request:pd.flow.suspend
will automatically cancel the workflow after 24 hours. You can set your own timeout (in milliseconds) as the first argument:def handler(pd:'pipedream'): # 7 days TIMEOUT = 1000 * 60 * 60 * 24 * 7 pd.flow.suspend(TIMEOUT)
pd.flow.rerun
pd.flow.rerun
when you want to run a specific step of a workflow multiple times. This is useful when you need to start a job in an external API and poll for its completion, or have the service call back to the step and let you process the HTTP request within the step.pd.flow.rerun
lets you rerun a specific step multiple times:import requestsdef handler(pd:'pipedream'): MAX_RETRIES = 3 # 10 seconds DELAY = 1000 * 10 run= pd.context['run'] print(pd.context) # pd.context.run.runs starts at 1 and increments when the step is rerun if run['runs']== 1: # pd.flow.rerun(delay, context (discussed below), max retries) pd.flow.rerun(DELAY,None,MAX_RETRIES) elif run['runs']== MAX_RETRIES + 1: raise Exception("Max retries exceeded") else: # Poll external API for status response= requests.get("https://example.com/status") # If we're done, continue with the rest of the workflow if response.json().status== "DONE": return response.json() # Else retry later pd.flow.rerun(DELAY,None,MAX_RETRIES)
pd.flow.rerun
accepts the following arguments:pd.flow.rerun( delay,# The number of milliseconds until the step will be rerun context,# JSON-serializable data you need to pass between runs maxRetries,# The total number of times the step will rerun. Defaults to 10)
pd.flow.retry
:import requestsdef handler(pd:'pipedream'): TIMEOUT = 86400 * 1000 run= pd.context['run'] # pd.context['run']['runs'] starts at 1 and increments when the step is rerun if run['runs']== 1: links= pd.flow.rerun(TIMEOUT,None,1) # links contains a dictionary with two entries: resume_url and cancel_url # Send resume_url to external service await request.post("your callback URL",json=links) # When the external service calls back into the resume_url, you have access to # the callback data within pd.context.run['callback_request'] elif 'callback_request' in run: return run['callback_request']
context
topd.flow.rerun
pd.context.run.context
contains thecontext
passed from the prior call torerun
. This lets you pass data from one run to another. For example, if you call:pd.flow.rerun(1000, {"hello": "world" })
pd.context.run.context
will contain:maxRetries
maxRetries
is10.When you exceedmaxRetries
, the workflow proceeds to the next step. If you need to handle this case with an exception,raise
an Exception from the step:def handler(pd:'pipedream'): MAX_RETRIES = 3 run= pd.context['run'] if run['runs']== 1: pd.flow.rerun(1000,None,MAX_RETRIES) else if (run['runs']== MAX_RETRIES + 1): raise Exception("Max retries exceeded")
pd.flow.suspend
orpd.flow.rerun
, it will not suspend the workflow, and you’ll see a message like the following:Workflow execution canceled — this may be due topd.flow.suspend()
usage (not supported in test)
These functions will only suspend and resume when run in production.pd.flow.suspend
/pd.flow.rerun
pd.flow.suspend
orpd.flow.rerun
. Only when workflows are resumed will compute time count towardcredit usage.Was this page helpful?