Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Temporal Python SDK

License

NotificationsYou must be signed in to change notification settings

temporalio/sdk-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Temporal Python SDK

Python 3.9+PyPIMIT

Temporal is a distributed, scalable, durable, and highly available orchestration engine used toexecute asynchronous, long-running business logic in a scalable and resilient way.

"Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language.

Also see:

In addition to features common across all Temporal SDKs, the Python SDK also has the following interesting features:

Type Safe

This library uses the latest typing and MyPy support with generics to ensure all calls can be typed. For example,starting a workflow with anint parameter when it accepts astr parameter would cause MyPy to fail.

Different Activity Types

The activity worker has been developed to work withasync def, threaded, and multiprocess activities. Threaded activities are the initial recommendation, and further guidance can be found inthe docs.

Customasyncio Event Loop

The workflow implementation basically turnsasync def functions into workflows backed by a distributed, fault-tolerantevent loop. This means task management, sleep, cancellation, etc have all been developed to seamlessly integrate withasyncio concepts.

See theblog post introducing the Python SDK for aninformal introduction to the features and their implementation.


Contents

Quick Start

We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended asone of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal.For more information, check out the docs references in "Next Steps" below the quick start.

Installation

Install thetemporalio package fromPyPI.

These steps can be followed to use with a virtual environment andpip:

  • Create a virtual environment
  • Updatepip -python -m pip install -U pip
    • Needed because older versions ofpip may not pick the right wheel
  • Install Temporal SDK -python -m pip install temporalio

The SDK is now ready for use. To build from source, see "Building" near the end of this documentation.

NOTE: This README is for the current branch and not necessarily what's released onPyPI.

Implementing a Workflow

Create the following inactivities.py:

fromtemporalioimportactivity@activity.defndefsay_hello(name:str)->str:returnf"Hello,{name}!"

Create the following inworkflows.py:

fromdatetimeimporttimedeltafromtemporalioimportworkflow# Import our activity, passing it through the sandboxwithworkflow.unsafe.imports_passed_through():from .activitiesimportsay_hello@workflow.defnclassSayHello:@workflow.runasyncdefrun(self,name:str)->str:returnawaitworkflow.execute_activity(say_hello,name,schedule_to_close_timeout=timedelta(seconds=5)        )

Create the following inrun_worker.py:

importasyncioimportconcurrent.futuresfromtemporalio.clientimportClientfromtemporalio.workerimportWorker# Import the activity and workflow from our other filesfrom .activitiesimportsay_hellofrom .workflowsimportSayHelloasyncdefmain():# Create client connected to server at the given addressclient=awaitClient.connect("localhost:7233")# Run the workerwithconcurrent.futures.ThreadPoolExecutor(max_workers=100)asactivity_executor:worker=Worker(client,task_queue="my-task-queue",workflows=[SayHello],activities=[say_hello],activity_executor=activity_executor,        )awaitworker.run()if__name__=="__main__":asyncio.run(main())

Assuming you have aTemporal server running on localhost, thiswill run the worker:

python run_worker.py

Running a Workflow

Create the following script atrun_workflow.py:

importasynciofromtemporalio.clientimportClient# Import the workflow from the previous codefrom .workflowsimportSayHelloasyncdefmain():# Create client connected to server at the given addressclient=awaitClient.connect("localhost:7233")# Execute a workflowresult=awaitclient.execute_workflow(SayHello.run,"my name",id="my-workflow-id",task_queue="my-task-queue")print(f"Result:{result}")if__name__=="__main__":asyncio.run(main())

Assuming you haverun_worker.py running from before, this will run the workflow:

python run_workflow.py

The output will be:

Result: Hello, my-name!

Next Steps

Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below willgive you much more information about how Temporal works with Python:

  • Code Samples - If you want to start with some code, we have providedsome pre-built samples.
  • Application Development Guide Our Python specificDeveloper's Guide will give you much more information on how to build with Temporal in your Python applications thanour SDK README ever could (or should).
  • API Documentation - Full Temporal Python SDK package documentation.

Usage

From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built aroundTemporal concepts.This section is not intended as a how-to guide -- For more how-to oriented information, check outthe links in theNext Steps section above.

Client

A client can be created and used to start a workflow like so:

fromtemporalio.clientimportClientasyncdefmain():# Create client connected to server at the given address and namespaceclient=awaitClient.connect("localhost:7233",namespace="my-namespace")# Start a workflowhandle=awaitclient.start_workflow(MyWorkflow.run,"some arg",id="my-workflow-id",task_queue="my-task-queue")# Wait for resultresult=awaithandle.result()print(f"Result:{result}")

Some things to note about the above code:

  • AClient does not have an explicit "close"
  • To enable TLS, thetls argument toconnect can be set toTrue or aTLSConfig object
  • A single positional argument can be passed tostart_workflow. If there are multiple arguments, only thenon-type-safe form ofstart_workflow can be used (i.e. the one accepting a string workflow name) and it must be intheargs keyword argument.
  • Thehandle represents the workflow that was started and can be used for more than just getting the result
  • Since we are just getting the handle and waiting on the result, we could have calledclient.execute_workflow whichdoes the same thing
  • Clients can have many more options not shown here (e.g. data converters and interceptors)
  • A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language)
  • Clients do not work across forks

Clients also provide a shallow copy of their config for use in making slightly different clients backed by the sameconnection. For instance, given theclient above, this is how to have a client in another namespace:

config=client.config()config["namespace"]="my-other-namespace"other_ns_client=Client(**config)

Data Conversion

Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of typetemporalio.converter.DataConverter can be set via thedata_converter parameter of theClient constructor. Dataconverters are a combination of payload converters, payload codecs, and failure converters. Payload converters convertPython values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).Failure converters convert exceptions to/from serialized failures.

The default data converter supports converting multiple types including:

  • None
  • bytes
  • google.protobuf.message.Message - As JSON when encoding, but has ability to decode binary proto from other languages
  • Anything that can be converted to JSON including:

To use pydantic model instances, seePydantic Support.

datetime.date anddatetime.time can only be used with the Pydantic data converter.

Although workflows, updates, signals, and queries can all be defined with multiple input parameters, users are stronglyencouraged to use a singledataclass or Pydantic model parameter, so that fields with defaults can be easily addedwithout breaking compatibility. Similar advice applies to return values.

Classes with generics may not have the generics properly resolved. The current implementation does not have generictype resolution. Users should use concrete types.

Pydantic Support

To use Pydantic model instances, install Pydantic and set the Pydantic data converter when creating client instances:

fromtemporalio.contrib.pydanticimportpydantic_data_converterclient=Client(data_converter=pydantic_data_converter, ...)

This data converter supports conversion of all types supported by Pydantic to and from JSON.

In addition to Pydantic models, these include alljson.dump-able types, various non-json.dump-able standard librarytypes such as dataclasses, types from the datetime module, sets, UUID, etc, and custom types composed of any of these.

Pydantic v1 is not supported by this data converter. If you are not yet able to upgrade from Pydantic v1, seehttps://github.com/temporalio/samples-python/tree/main/pydantic_converter/v1 for limited v1 support.

Custom Type Data Conversion

For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care hasbeen taken to support all common typings includingOptional,Union, all forms of iterables and mappings,NewType,etc in addition to the regular JSON values mentioned before.

Data converters contain a reference to a payload converter class that is used to convert to/from payloads/values. Thisis a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payloadconverter is usually aCompositePayloadConverter which contains a multipleEncodingPayloadConverters it uses to tryto serialize/deserialize payloads. Upon serialization, eachEncodingPayloadConverter is tried until one succeeds. TheEncodingPayloadConverter provides an "encoding" string serialized onto the payload so that, upon deserialization, thespecificEncodingPayloadConverter for the given "encoding" is used.

The default data converter uses theDefaultPayloadConverter which is simply aCompositePayloadConverter with a knownset of defaultEncodingPayloadConverters. To implement a custom encoding for a custom type, a newEncodingPayloadConverter can be created for the new type. For example, to supportIPv4Address types:

classIPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):@propertydefencoding(self)->str:return"text/ipv4-address"defto_payload(self,value:Any)->Optional[Payload]:ifisinstance(value,ipaddress.IPv4Address):returnPayload(metadata={"encoding":self.encoding.encode()},data=str(value).encode(),            )else:returnNonedeffrom_payload(self,payload:Payload,type_hint:Optional[Type]=None)->Any:assertnottype_hintortype_hintisipaddress.IPv4Addressreturnipaddress.IPv4Address(payload.data.decode())classIPv4AddressPayloadConverter(CompositePayloadConverter):def__init__(self)->None:# Just add ours as first before the defaultssuper().__init__(IPv4AddressEncodingPayloadConverter(),*DefaultPayloadConverter.default_encoding_payload_converters,        )my_data_converter=dataclasses.replace(DataConverter.default,payload_converter_class=IPv4AddressPayloadConverter,)

Imports are left off for brevity.

This is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSONencoding payload converter to support a new type. It is already the last encoding data converter in the list, so it'sthe fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit ofmaking the type work in lists, unions, etc.

TheJSONPlainPayloadConverter uses the Pythonjson library with anadvanced JSON encoder by default and a custom value conversion method to turnjson.loaded values to their type hints.The conversion can be customized for serialization with a customjson.JSONEncoder and deserialization with a customJSONTypeConverter. For example, to supportIPv4Address types in existing JSON conversion:

classIPv4AddressJSONEncoder(AdvancedJSONEncoder):defdefault(self,o:Any)->Any:ifisinstance(o,ipaddress.IPv4Address):returnstr(o)returnsuper().default(o)classIPv4AddressJSONTypeConverter(JSONTypeConverter):defto_typed_value(self,hint:Type,value:Any    )->Union[Optional[Any],_JSONTypeConverterUnhandled]:ifissubclass(hint,ipaddress.IPv4Address):returnipaddress.IPv4Address(value)returnJSONTypeConverter.UnhandledclassIPv4AddressPayloadConverter(CompositePayloadConverter):def__init__(self)->None:# Replace default JSON plain with our own that has our encoder and type# converterjson_converter=JSONPlainPayloadConverter(encoder=IPv4AddressJSONEncoder,custom_type_converters=[IPv4AddressJSONTypeConverter()],        )super().__init__(*[cifnotisinstance(c,JSONPlainPayloadConverter)elsejson_converterforcinDefaultPayloadConverter.default_encoding_payload_converters            ]        )my_data_converter=dataclasses.replace(DataConverter.default,payload_converter_class=IPv4AddressPayloadConverter,)

NowIPv4Address can be used in type hints including collections, optionals, etc.

Workers

Workers host workflows and/or activities. Here's how to run a worker:

importasyncioimportloggingfromtemporalio.clientimportClientfromtemporalio.workerimportWorker# Import your own workflows and activitiesfrommy_workflow_packageimportMyWorkflow,my_activityasyncdefrun_worker(stop_event:asyncio.Event):# Create client connected to server at the given addressclient=awaitClient.connect("localhost:7233",namespace="my-namespace")# Run the worker until the event is setworker=Worker(client,task_queue="my-task-queue",workflows=[MyWorkflow],activities=[my_activity])asyncwithworker:awaitstop_event.wait()

Some things to note about the above code:

  • This creates/uses the same client that is used for starting workflows
  • While this example accepts a stop event and usesasync with,run() andshutdown() may be used instead
  • Workers can have many more options not shown here (e.g. data converters and interceptors)

Workflows

Definition

Workflows are defined as classes decorated with@workflow.defn. The method invoked for the workflow is decorated with@workflow.run. Methods for signals, queries, and updates are decorated with@workflow.signal,@workflow.queryand@workflow.update respectively. Here's an example of a workflow:

importasynciofromdatetimeimporttimedeltafromtemporalioimportworkflow# Pass the activities through the sandboxwithworkflow.unsafe.imports_passed_through():from .my_activitiesimportGreetingInfo,create_greeting_activity@workflow.defnclassGreetingWorkflow:def__init__(self)->None:self._current_greeting="<unset>"self._greeting_info=GreetingInfo()self._greeting_info_update=asyncio.Event()self._complete=asyncio.Event()@workflow.runasyncdefrun(self,name:str)->str:self._greeting_info.name=namewhileTrue:# Store greetingself._current_greeting=awaitworkflow.execute_activity(create_greeting_activity,self._greeting_info,start_to_close_timeout=timedelta(seconds=5),            )workflow.logger.debug("Greeting set to %s",self._current_greeting)# Wait for salutation update or complete signal (this can be# cancelled)awaitasyncio.wait(                [asyncio.create_task(self._greeting_info_update.wait()),asyncio.create_task(self._complete.wait()),                ],return_when=asyncio.FIRST_COMPLETED,            )ifself._complete.is_set():returnself._current_greetingself._greeting_info_update.clear()@workflow.signalasyncdefupdate_salutation(self,salutation:str)->None:self._greeting_info.salutation=salutationself._greeting_info_update.set()@workflow.signalasyncdefcomplete_with_greeting(self)->None:self._complete.set()@workflow.querydefcurrent_greeting(self)->str:returnself._current_greeting@workflow.updatedefset_and_get_greeting(self,greeting:str)->str:old=self._current_greetingself._current_greeting=greetingreturnold

This assumes there's an activity inmy_activities.py like:

fromdataclassesimportdataclassfromtemporalioimportworkflow@dataclassclassGreetingInfo:salutation:str="Hello"name:str="<unknown>"@activity.defndefcreate_greeting_activity(info:GreetingInfo)->str:returnf"{info.salutation},{info.name}!"

Some things to note about the above workflow code:

  • Workflows run in a sandbox by default.
    • Users are encouraged to define workflows in files with no side effects or other complicated code or unnecessaryimports to other third party libraries.
    • Non-standard-library, non-temporalio imports should usually be "passed through" the sandbox. See theWorkflow Sandbox section for more details.
  • This workflow continually updates the queryable current greeting when signalled and can complete with the greeting ona different signal
  • Workflows are always classes and must have a single@workflow.run which is anasync def function
  • Workflow code must be deterministic. This means noset iteration, threading, no randomness, no external calls toprocesses, no network IO, and no global state mutation. All code must run in the implicitasyncio event loop and bedeterministic. Also see theAsyncio and Determinism section later.
  • @activity.defn is explained in a later section. For normal simple string concatenation, this would just be done inthe workflow. The activity is for demonstration purposes only.
  • workflow.execute_activity(create_greeting_activity, ... is actually a typed signature, and MyPy will fail if theself._greeting_info parameter is not aGreetingInfo

Here are the decorators that can be applied:

  • @workflow.defn - Defines a workflow class
    • Must be defined on the class given to the worker (ignored if present on a base class)
    • Can have aname param to customize the workflow name, otherwise it defaults to the unqualified class name
    • Can havedynamic=True which means all otherwise unhandled workflows fall through to this. If present, cannot havename argument, and run method must accept a single parameter ofSequence[temporalio.common.RawValue] type. Thepayload of the raw value can be converted viaworkflow.payload_converter().from_payload.
  • @workflow.run - Defines the primary workflow run method
    • Must be defined on the same class as@workflow.defn, not a base class (but canalso be defined on the samemethod of a base class)
    • Exactly one method name must have this decorator, no more or less
    • Must be defined on anasync def method
    • The method's arguments are the workflow's arguments
    • The first parameter must beself, followed by positional arguments. Best practice is to only take a singleargument that is an object/dataclass of fields that can be added to as needed.
  • @workflow.init - Specifies that the__init__ method accepts the workflow's arguments.
    • If present, may only be applied to the__init__ method, the parameters of which must then be identical to those ofthe@workflow.run method.
    • The purpose of this decorator is to allow operations involving workflow arguments to be performed in the__init__method, before any signal or update handler has a chance to execute.
  • @workflow.signal - Defines a method as a signal
    • Can be defined on anasync or non-async method at any point in the class hierarchy, but if the decorated methodis overridden, then the override must also be decorated.
    • The method's arguments are the signal's arguments.
    • Return value is ignored.
    • May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
    • Can have aname param to customize the signal name, otherwise it defaults to the unqualified method name.
    • Can havedynamic=True which means all otherwise unhandled signals fall through to this. If present, cannot havename argument, and method parameters must beself, a string signal name, and aSequence[temporalio.common.RawValue].
    • Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is anobject/dataclass of fields that can be added to as needed.
    • SeeSignal and update handlers below
  • @workflow.update - Defines a method as an update
    • Can be defined on anasync or non-async method at any point in the class hierarchy, but if the decorated methodis overridden, then the override must also be decorated.
    • May accept input and return a value
    • The method's arguments are the update's arguments.
    • May beasync or non-async
    • May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
    • Also accepts thename anddynamic parameters like signal, with the same semantics.
    • Update handlers may optionally define a validator method by decorating it with@update_handler_method.validator.To reject an update before any events are written to history, throw an exception in a validator. Validators cannotbeasync, cannot mutate workflow state, and return nothing.
    • SeeSignal and update handlers below
  • @workflow.query - Defines a method as a query
    • Should return a value
    • Should not beasync
    • Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow
    • Also accepts thename anddynamic parameters like signal and update, with the same semantics.

Running

To start a locally-defined workflow from a client, you can simply reference its method like so:

fromtemporalio.clientimportClientfrommy_workflow_packageimportGreetingWorkflowasyncdefcreate_greeting(client:Client)->str:# Start the workflowhandle=awaitclient.start_workflow(GreetingWorkflow.run,"my name",id="my-workflow-id",task_queue="my-task-queue")# Change the salutationawaithandle.signal(GreetingWorkflow.update_salutation,"Aloha")# Tell it to completeawaithandle.signal(GreetingWorkflow.complete_with_greeting)# Wait and return resultreturnawaithandle.result()

Some things to note about the above code:

  • This uses theGreetingWorkflow from the previous section
  • The result of calling this function is"Aloha, my name!"
  • id andtask_queue are required for running a workflow
  • client.start_workflow is typed, so MyPy would fail if"my name" were something besides a string
  • handle.signal is typed, so MyPy would fail if"Aloha" were something besides a string or if we provided aparameter to the parameterlesscomplete_with_greeting
  • handle.result is typed to the workflow itself, so MyPy would fail if we said thiscreate_greeting returnedsomething besides a string

Invoking Activities

  • Activities are started with non-asyncworkflow.start_activity() which accepts either an activity function referenceor a string name.
  • A single argument to the activity is positional. Multiple arguments are not supported in the type-safe form ofstart/execute activity and must be supplied via theargs keyword argument.
  • Activity options are set as keyword arguments after the activity arguments. At least one ofstart_to_close_timeoutorschedule_to_close_timeout must be provided.
  • The result is an activity handle which is anasyncio.Task and supports basic task features
  • An asyncworkflow.execute_activity() helper is provided which takes the same arguments asworkflow.start_activity() andawaits on the result. This should be used in most cases unless advanced taskcapabilities are needed.
  • Local activities work very similarly except the functions areworkflow.start_local_activity() andworkflow.execute_local_activity()
    • ⚠️Local activities are currently experimental
  • Activities can be methods of a class. Invokers should useworkflow.start_activity_method(),workflow.execute_activity_method(),workflow.start_local_activity_method(), andworkflow.execute_local_activity_method() instead.
  • Activities can callable classes (i.e. that define__call__). Invokers should useworkflow.start_activity_class(),workflow.execute_activity_class(),workflow.start_local_activity_class(), andworkflow.execute_local_activity_class() instead.

Invoking Child Workflows

  • Child workflows are started with asyncworkflow.start_child_workflow() which accepts either a workflow run methodreference or a string name. The arguments to the workflow are positional.
  • A single argument to the child workflow is positional. Multiple arguments are not supported in the type-safe form ofstart/execute child workflow and must be supplied via theargs keyword argument.
  • Child workflow options are set as keyword arguments after the arguments. At leastid must be provided.
  • Theawait of the start does not complete until the start has been accepted by the server
  • The result is a child workflow handle which is anasyncio.Task and supports basic task features. The handle also hassome child info and supports signalling the child workflow
  • An asyncworkflow.execute_child_workflow() helper is provided which takes the same arguments asworkflow.start_child_workflow() andawaits on the result. This should be used in most cases unless advanced taskcapabilities are needed.

Timers

  • A timer is represented by normalasyncio.sleep() or aworkflow.sleep() call
  • Timers are also implicitly started on anyasyncio calls with timeouts (e.g.asyncio.wait_for)
  • Timers are Temporal server timers, not local ones, so sub-second resolution rarely has value
  • Calls that use a specific point in time, e.g.call_at ortimeout_at, should be based on the current loop time(i.e.workflow.time()) and not an actual point in time. This is because fixed times are translated to relative onesby subtracting the current loop time which may not be the actual current time.

Conditions

  • workflow.wait_condition is an async function that doesn't return until a provided callback returns true
  • Atimeout can optionally be provided which will throw aasyncio.TimeoutError if reached (internally backed byasyncio.wait_for which uses a timer)

Asyncio and Determinism

Workflows must be deterministic. Workflows are backed by a customasyncio event loop. This means many of the commonasyncio calls workas normal. Some asyncio features are disabled such as:

  • Thread related calls such asto_thread(),run_coroutine_threadsafe(),loop.run_in_executor(), etc
  • Calls that alter the event loop such asloop.close(),loop.stop(),loop.run_forever(),loop.set_task_factory(), etc
  • Calls that use anything external such as networking, subprocesses, disk IO, etc

Also, there are someasyncio utilities that internally useset() which can make them non-deterministic from oneworker to the next. Therefore the followingasyncio functions haveworkflow-module alternatives that aredeterministic:

  • asyncio.as_completed() - useworkflow.as_completed()
  • asyncio.wait() - useworkflow.wait()

Asyncio Cancellation

Cancellation is done usingasynciotask cancellation.This means that tasks are requested to be cancelled but can catch theasyncio.CancelledError, thusallowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or todeny the cancellation entirely. It also means thatasyncio.shield() can be used toprotect tasks against cancellation.

The following tasks, when cancelled, perform a Temporal cancellation:

  • Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity
  • Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent tocancel the child workflow
  • Timers - when the task executing a timer is cancelled (whether started via sleep or timeout), the timer is cancelled

When the workflow itself is requested to cancel,Task.cancel is called on the main workflow task. Therefore,asyncio.CancelledError can be caught in order to handle the cancel gracefully.

Workflows followasyncio cancellation rules exactly which can cause confusion among Python developers. Cancelling atask doesn't always cancel the thing it created. For example, giventask = asyncio.create_task(workflow.start_child_workflow(..., callingtask.cancel does not cancel the childworkflow, it only cancels the starting of it, which has no effect if it has already started. However, cancelling theresult ofhandle = await workflow.start_child_workflow(... ortask = asyncio.create_task(workflow.execute_child_workflow(...does cancel the child workflow.

Also, due to Temporal rules, a cancellation request is a state not an event. Therefore, repeated cancellation requestsare not delivered, only the first. If the workflow chooses swallow a cancellation, it cannot be requested again.

Workflow Utilities

While running in a workflow, in addition to features documented elsewhere, the following items are available from thetemporalio.workflow package:

  • continue_as_new() - Async function to stop the workflow immediately and continue as new
  • info() - Returns information about the current workflow
  • logger - A logger for use in a workflow (properly skips logging on replay)
  • now() - Returns the "current time" from the workflow's perspective

Exceptions

  • Workflows/updates can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflowin a retrying state).
  • Exceptions that are instances oftemporalio.exceptions.FailureError will fail the workflow with that exception
    • For failing the workflow explicitly with a user exception, usetemporalio.exceptions.ApplicationError. This canbe marked non-retryable or include details as needed.
    • Other exceptions that come from activity execution, child execution, cancellation, etc are already instances ofFailureError and will fail the workflow when uncaught.
  • Update handlers are special: an instance oftemporalio.exceptions.FailureError raised in an update handler will failthe update instead of failing the workflow.
  • All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow isfixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use anApplicationError as mentioned above.

This default can be changed by providing a list of exception types toworkflow_failure_exception_types when creating aWorker orfailure_exception_types on the@workflow.defn decorator. If a workflow-thrown exception is an instanceof any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of[Exception] will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, iftemporalio.workflow.NondeterminismError (or any superclass of it) is set, non-deterministic exceptions will fail theworkflow. WARNING: These settings are experimental.

Signal and update handlers

Signal and update handlers are defined using decorated methods as shown in the exampleabove. Client codesends signals and updates usingworkflow_handle.signal,workflow_handle.execute_update, orworkflow_handle.start_update. When the workflow receives one of these requests, it starts anasyncio.Task executingthe corresponding handler method with the argument(s) from the request.

The handler methods may beasync def and can do all the async operations described above (e.g. invoking activities andchild workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executingconcurrently with respect to each other and the main workflow task. Useasyncio.Lock andasyncio.Semaphore if necessary.

Your main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. Youshould ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, youwill see a warning (the warning can be disabled via theworkflow.signal/workflow.update decorators). One way toensure that handler tasks have finished is to wait on theworkflow.all_handlers_finished condition:

awaitworkflow.wait_condition(workflow.all_handlers_finished)

External Workflows

  • workflow.get_external_workflow_handle() inside a workflow returns a handle to interact with another workflow
  • workflow.get_external_workflow_handle_for() can be used instead for a type safe handle
  • await handle.signal() can be called on the handle to signal the external workflow
  • await handle.cancel() can be called on the handle to send a cancel to the external workflow

Testing

Workflow testing can be done in an integration-test fashion against a real server, however it is hard to simulatetimeouts and other long time-based code. Using the time-skipping workflow test environment can help there.

The time-skippingtemporalio.testing.WorkflowEnvironment can be created via the static asyncstart_time_skipping().This internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,then starts the test server which has special APIs for skipping time.

NOTE: The time-skipping test environment does not work on ARM. The SDK will try to download the x64 binary on macOSfor use with the Intel emulator, but for Linux or Windows ARM there is no proper time-skipping test server at this time.

Automatic Time Skipping

Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. Tomanually advance time before waiting on the result of a workflow, theWorkflowEnvironment.sleep method can be used.

Here's a simple example of a workflow that sleeps for 24 hours:

importasynciofromtemporalioimportworkflow@workflow.defnclassWaitADayWorkflow:@workflow.runasyncdefrun(self)->str:awaitasyncio.sleep(24*60*60)return"all done"

An integration test of this workflow would be way too slow. However the time-skipping server automatically skips to thenext event when we wait on the result. Here's a test for that workflow:

fromtemporalio.testingimportWorkflowEnvironmentfromtemporalio.workerimportWorkerasyncdeftest_wait_a_day_workflow():asyncwithawaitWorkflowEnvironment.start_time_skipping()asenv:asyncwithWorker(env.client,task_queue="tq1",workflows=[WaitADayWorkflow]):assert"all done"==awaitenv.client.execute_workflow(WaitADayWorkflow.run,id="wf1",task_queue="tq1")

That test will run almost instantly. This is because by callingexecute_workflow on our client, we have asked theenvironment to automatically skip time as much as it can (basically until the end of the workflow or until an activityis run).

To disable automatic time-skipping while waiting for a workflow result, run code inside awith env.auto_time_skipping_disabled(): block.

Manual Time Skipping

Until a workflow is waited on, all time skipping in the time-skipping environment is done manually viaWorkflowEnvironment.sleep.

Here's workflow that waits for a signal or times out:

importasynciofromtemporalioimportworkflow@workflow.defnclassSignalWorkflow:def__init__(self)->None:self.signal_received=False@workflow.runasyncdefrun(self)->str:# Wait for signal or timeout in 45 secondstry:awaitworkflow.wait_condition(lambda:self.signal_received,timeout=45)return"got signal"exceptasyncio.TimeoutError:return"got timeout"@workflow.signaldefsome_signal(self)->None:self.signal_received=True

To test a normal signal, you might:

fromtemporalio.testingimportWorkflowEnvironmentfromtemporalio.workerimportWorkerasyncdeftest_signal_workflow():asyncwithawaitWorkflowEnvironment.start_time_skipping()asenv:asyncwithWorker(env.client,task_queue="tq1",workflows=[SignalWorkflow]):# Start workflow, send signal, check resulthandle=awaitenv.client.start_workflow(SignalWorkflow.run,id="wf1",task_queue="tq1")awaithandle.signal(SignalWorkflow.some_signal)assert"got signal"==awaithandle.result()

But how would you test the timeout part? Like so:

fromtemporalio.testingimportWorkflowEnvironmentfromtemporalio.workerimportWorkerasyncdeftest_signal_workflow_timeout():asyncwithawaitWorkflowEnvironment.start_time_skipping()asenv:asyncwithWorker(env.client,task_queue="tq1",workflows=[SignalWorkflow]):# Start workflow, advance time past timeout, check resulthandle=awaitenv.client.start_workflow(SignalWorkflow.run,id="wf1",task_queue="tq1")awaitenv.sleep(50)assert"got timeout"==awaithandle.result()

Also, the current time of the workflow environment can be obtained via the asyncWorkflowEnvironment.get_current_timemethod.

Mocking Activities

Activities are just functions decorated with@activity.defn. Simply write different ones and pass those to the workerto have different activities called during the test.

Workflow Sandbox

By default workflows are run in a sandbox to help avoid non-deterministic code. If a call that is known to benon-deterministic is performed, an exception will be thrown in the workflow which will "fail the task" which means theworkflow will not progress until fixed.

The sandbox is not foolproof and non-determinism can still occur. It is simply a best-effort way to catch bad codeearly. Users are encouraged to define their workflows in files with no other side effects.

The sandbox offers a mechanism to "pass through" modules from outside the sandbox. By default this already includes allstandard library modules and Temporal modules.For performance and behavior reasons, users are encouraged to passthrough all modules whose calls will be deterministic. In particular, this advice extends to modules containing theactivities to be referenced in workflows, and modules containing dataclasses and Pydantic models, which can beparticularly expensive to import. See "Passthrough Modules" below on how to do this.

How the Sandbox Works

The sandbox is made up of two components that work closely together:

  • Global state isolation
  • Restrictions preventing known non-deterministic library calls

Global state isolation is performed by usingexec. Upon workflow start, and every time that the workflow is replayed,the file that the workflow is defined in is re-imported into a new sandbox created for that workflow run. In order tokeep the sandbox performant, not all modules are re-imported in this way: instead, a known set of "passthrough modules"are obtained as references to the already-imported moduleoutside the sandbox. These modules should be side-effectfree on import and, if they make any non-deterministic calls, then these should be restricted by sandbox restrictionrules. By default the entire Python standard library,temporalio, and a couple of other modules are "passed through"in this way from outside of the sandbox. To update this list, see "Customizing the Sandbox".

Restrictions preventing known non-deterministic library calls are achieved using proxy objects on modules wrapped aroundthe custom importer set in the sandbox. Many restrictions apply at workflow import time and workflow run time, whilesome restrictions only apply at workflow run time. A default set of restrictions is included that prevents mostdangerous standard library calls. However it is known in Python that some otherwise-non-deterministic invocations, likereading a file from disk viaopen or usingos.environ, are done as part of importing modules. To customize what isand isn't restricted, see "Customizing the Sandbox".

Avoiding the Sandbox

There are three increasingly-scoped ways to avoid the sandbox. Users are discouraged from avoiding the sandbox ifpossible, except for passing through safe modules, which is recommended.

To remove restrictions around a particular block of code, usewith temporalio.workflow.unsafe.sandbox_unrestricted():.The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.

To run an entire workflow outside of a sandbox, setsandboxed=False on the@workflow.defn decorator when definingit. This will run the entire workflow outside of the sandbox which means it can share global state and other badthings.

To disable the sandbox entirely for a worker, set theWorker init'sworkflow_runner keyword argument totemporalio.worker.UnsandboxedWorkflowRunner(). This value is defaulted totemporalio.worker.workflow_sandbox.SandboxedWorkflowRunner() so by changing it to the unsandboxed runner, the sandboxwill not be used at all.

Customizing the Sandbox

⚠️ WARNING: APIs in thetemporalio.worker.workflow_sandbox module are not yet considered stable and may change infuture releases.

When creating theWorker, theworkflow_runner is defaulted totemporalio.worker.workflow_sandbox.SandboxedWorkflowRunner(). TheSandboxedWorkflowRunner's init accepts arestrictions keyword argument that is defaulted toSandboxRestrictions.default. TheSandboxRestrictions dataclassis immutable and contains three fields that can be customized, but only two have notable value. See below.

Passthrough Modules

By default the sandbox completely reloads non-standard-library and non-Temporal modules for every workflow run. To makethe sandbox quicker and use less memory when importing known-side-effect-free modules, they can be markedas passthrough modules.

For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will bedeterministic. In particular, this advice extends to modules containing the activities to be referenced in workflows,and modules containing dataclasses and Pydantic models, which can be particularly expensive to import.

One way to pass through a module is at import time in the workflow file using theimports_passed_through contextmanager like so:

# my_workflow_file.pyfromtemporalioimportworkflowwithworkflow.unsafe.imports_passed_through():importpydantic@workflow.defnclassMyWorkflow:    ...

Alternatively, this can be done at worker creation time by customizing the runner's restrictions. For example:

my_worker=Worker(  ...,workflow_runner=SandboxedWorkflowRunner(restrictions=SandboxRestrictions.default.with_passthrough_modules("pydantic")  ))

In both of these cases, now thepydantic module will be passed through from outside of the sandbox instead ofbeing reloaded for every workflow run.

If users are sure that no imports they use in workflow files will ever need to be sandboxed (meaning all calls withinare deterministic and never mutate shared, global state), thepassthrough_all_modules option can be set on therestrictions or thewith_passthrough_all_modules helper can by used, for example:

my_worker=Worker(  ...,workflow_runner=SandboxedWorkflowRunner(restrictions=SandboxRestrictions.default.with_passthrough_all_modules()  ))

Note, some calls from the module may still be checked for invalid calls at runtime for certain builtins.

Invalid Module Members

SandboxRestrictions.invalid_module_members contains a root matcher that applies to all module members. This alreadyhas a default set which includes things likedatetime.date.today() which should never be called from a workflow. Toremove this restriction:

my_restrictions=dataclasses.replace(SandboxRestrictions.default,invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted("datetime","date","today",    ),)my_worker=Worker(...,workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))

Restrictions can also be added by|'ing together matchers, for example to restrict thedatetime.date class frombeing used altogether:

my_restrictions=dataclasses.replace(SandboxRestrictions.default,invalid_module_members=SandboxRestrictions.invalid_module_members_default|SandboxMatcher(children={"datetime":SandboxMatcher(use={"date"})},    ),)my_worker=Worker(...,workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))

See the API for more details on exact fields and their meaning.

Known Sandbox Issues

Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved.

Global Import/Builtins

Currently the sandbox references/alters the globalsys.modules andbuiltins fields while running workflow code. Inorder to prevent affecting other sandboxed code, thread locals are leveraged to only intercept these values during theworkflow thread running. Therefore, technically if top-level import code starts a thread, it may lose sandboxprotection.

Sandbox is not Secure

The sandbox is built to catch many non-deterministic and state sharing issues, but it is not secure. Some known badcalls are intercepted, but for performance reasons, every single attribute get/set cannot be checked. Therefore a simplecall likesetattr(temporalio.common, "__my_key", "my value") will leak across sandbox runs.

The sandbox is only a helper, it does not provide full protection.

Sandbox Performance

The sandbox does not add significant CPU or memory overhead for workflows that are in files which only import standardlibrary modules. This is because they are passed through from outside of the sandbox. However, everynon-standard-library import that is performed at the top of the same file the workflow is in will add CPU overhead (themodule is re-imported every workflow run) and memory overhead (each module independently cached as part of the workflowrun for isolation reasons). This becomes more apparent for large numbers of workflow runs.

To mitigate this, users should:

  • Define workflows in files that have as few non-standard-library imports as possible
  • Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large
  • Set third-party libraries as passthrough modules if they are known to be side-effect free
Extending Restricted Classes

Extending a restricted class causes Python to instantiate the restricted metaclass which is unsupported. Therefore ifyou attempt to use a class in the sandbox that extends a restricted class, it will fail. For example, if you have aclass MyZipFile(zipfile.ZipFile) and try to use that class inside a workflow, it will fail.

Classes used inside the workflow should not extend restricted classes. For situations where third-party modules need toat import time, they should be marked as pass through modules.

Certain Standard Library Calls on Restricted Objects

If an object is restricted, internal C Python validation may fail in some cases. For example, runningdict.items(os.__dict__) will fail with:

descriptor 'items' for 'dict' objects doesn't apply to a '_RestrictedProxy' object

This is a low-level check that cannot be subverted. The solution is to not use restricted objects inside the sandbox.For situations where third-party modules need to at import time, they should be marked as pass through modules.

is_subclass of ABC-based Restricted Classes

Due tohttps://bugs.python.org/issue44847, classes that are wrapped and thenchecked to see if they are subclasses of another viais_subclass may fail (see alsothis wrapt issue).

Activities

Definition

Activities are decorated with@activity.defn like so:

fromtemporalioimportactivity@activity.defndefsay_hello_activity(name:str)->str:returnf"Hello,{name}!"

Some things to note about activity definitions:

  • Thesay_hello_activity is synchronous which is the recommended activity type (see "Types of Activities" below), butit can beasync
  • A custom name for the activity can be set with a decorator argument, e.g.@activity.defn(name="my activity")
  • Long running activities should regularly heartbeat and handle cancellation
  • Activities can only have positional arguments. Best practice is to only take a single argument that is anobject/dataclass of fields that can be added to as needed.
  • Activities can be defined on methods instead of top-level functions. This allows the instance to carry state that anactivity may need (e.g. a DB connection). The instance method should be what is registered with the worker.
  • Activities can also be defined on callable classes (i.e. classes with__call__). An instance of the class should bewhat is registered with the worker.
  • The@activity.defn can havedynamic=True set which means all otherwise unhandled activities fall through to this.If present, cannot havename argument, and the activity function must accept a single parameter ofSequence[temporalio.common.RawValue]. The payload of the raw value can be converted viaactivity.payload_converter().from_payload.

Types of Activities

There are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronousmultiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables.

Synchronous Activities

Synchronous activities, i.e. functions that do not haveasync def, can be used with workers, but theactivity_executor worker parameter must be set with aconcurrent.futures.Executor instance to use for executing theactivities.

All long running, non-local activities should heartbeat so they can be cancelled. Cancellation in threaded activitiesthrows but multiprocess/other activities does not. The sections below on each synchronous type explain further. Thereare also calls on the context that can check for cancellation. For more information, see "Activity Context" and"Heartbeating and Cancellation" sections later.

Note, all calls from an activity to functions in thetemporalio.activity package are powered bycontextvars. Therefore, new threads startinginside ofactivities mustcopy_context() and then.run() manually to ensuretemporalio.activity calls likeheartbeat stillfunction in the new threads.

If any activity ever throws aconcurrent.futures.BrokenExecutor, the failure is consisted unrecoverable and the workerwill fail and shutdown.

Synchronous Multithreaded Activities

Ifactivity_executor is set to an instance ofconcurrent.futures.ThreadPoolExecutor then the synchronous activitiesare considered multithreaded activities. Ifmax_workers is not set to at least the worker'smax_concurrent_activities setting a warning will be issued. Besidesactivity_executor, no other worker parametersare required for synchronous multithreaded activities.

By default, cancellation of a synchronous multithreaded activity is done via atemporalio.exceptions.CancelledErrorthrown into the activity thread. Activities that do not wish to have cancellation thrown can setno_thread_cancel_exception=True in the@activity.defn decorator.

Code that wishes to be temporarily shielded from the cancellation exception can run insidewith activity.shield_thread_cancel_exception():. But once the last nested form of that block is finished, even ifthere is a return statement within, it will throw the cancellation if there was one. Atry +except temporalio.exceptions.CancelledError would have to surround thewith to handle the cancellation explicitly.

Synchronous Multiprocess/Other Activities

Ifactivity_executor is set to an instance ofconcurrent.futures.Executor that isnotconcurrent.futures.ThreadPoolExecutor, then the synchronous activities are considered multiprocess/other activities.Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raiseon cancellation.

These require special primitives for heartbeating and cancellation. Theshared_state_manager worker parameter must beset to an instance oftemporalio.worker.SharedStateManager. The most common implementation can be created by passing amultiprocessing.managers.SyncManager (i.e. result ofmultiprocessing.managers.Manager()) totemporalio.worker.SharedStateManager.create_from_multiprocessing().

Also, all of these activity functions must be"picklable".

Asynchronous Activities

Asynchronous activities are functions defined withasync def. Asynchronous activities are often much more performantthan synchronous ones. When using asynchronous activities no special worker parameters are needed.

⚠️ WARNING: Do not block the thread inasync def Python functions. This can stop the processing of the rest of theTemporal.

Cancellation for asynchronous activities is done viaasyncio.Task.cancel. This means thatasyncio.CancelledError will be raised (and can be caught, but it is not recommended). A non-local activity mustheartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and"Heartbeating and Cancellation" later).

Activity Context

During activity execution, an implicit activity context is set as acontext variable. The context variable itself is not visible, butcalls in thetemporalio.activity package make use of it. Specifically:

  • in_activity() - Whether an activity context is present
  • info() - Returns the immutable info of the currently running activity
  • heartbeat(*details) - Record a heartbeat
  • is_cancelled() - Whether a cancellation has been requested on this activity
  • wait_for_cancelled() -async call to wait for cancellation request
  • wait_for_cancelled_sync(timeout) - Synchronous blocking call to wait for cancellation request
  • shield_thread_cancel_exception() - Context manager for use inwith clauses by synchronous multithreaded activitiesto prevent cancel exception from being thrown during the block of code
  • is_worker_shutdown() - Whether the worker has started graceful shutdown
  • wait_for_worker_shutdown() -async call to wait for start of graceful worker shutdown
  • wait_for_worker_shutdown_sync(timeout) - Synchronous blocking call to wait for start of graceful worker shutdown
  • raise_complete_async() - Raise an error that this activity will be completed asynchronously (i.e. after return ofthe activity function in a separate client call)

With the exception ofin_activity(), if any of the functions are called outside of an activity context, an erroroccurs. Synchronous activities cannot call any of theasync functions.

Heartbeating and Cancellation

In order for a non-local activity to be notified of cancellation requests, it must be given aheartbeat_timeout atinvocation time and invoketemporalio.activity.heartbeat() inside the activity. It is strongly recommended that allbut the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellationfor synchronous and asynchronous activities.

In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the serverfor retrieval during activity retry. If an activity callstemporalio.activity.heartbeat(123, 456) and then fails andis retried,temporalio.activity.info().heartbeat_details will return an iterable containing123 and456 on thenext run.

Heartbeating has no effect on local activities.

Worker Shutdown

An activity can react to a worker shutdown. Usingis_worker_shutdown or one of thewait_for_worker_shutdownfunctions an activity can react to a shutdown.

When thegraceful_shutdown_timeout worker parameter is given adatetime.timedelta, on shutdown the worker willnotify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will performcancellation of all outstanding activities.

Theshutdown() invocation will wait on all activities to complete, so if a long-running activity does not at leastrespect cancellation, the shutdown may never complete.

Testing

Unit testing an activity or any code that could run in an activity is done via thetemporalio.testing.ActivityEnvironment class. Simply instantiate this and any callable + params passed torun willbe invoked inside the activity context. The following are attributes/methods on the environment that can be used toaffect calls activity code might make to functions on thetemporalio.activity package.

  • info property can be set to customize what is returned fromactivity.info()
  • on_heartbeat property can be set to handleactivity.heartbeat() calls
  • cancel() can be invoked to simulate a cancellation of the activity
  • worker_shutdown() can be invoked to simulate a worker shutdown during execution of the activity

Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,assuminghistory_str is populated with a JSON string history either exported from the web UI or fromtctl, thefollowing function will replay it:

fromtemporalio.clientimportWorkflowHistoryfromtemporalio.workerimportReplayerasyncdefrun_replayer(history_str:str):replayer=Replayer(workflows=[SayHello])awaitreplayer.replay_workflow(WorkflowHistory.from_json(history_str))

This will throw an error if any non-determinism is detected.

Replaying from workflow history is a powerful concept that many use to test that workflow alterations won't causenon-determinisms with past-complete workflows. The following code will make sure that all workflow histories for acertain workflow type (i.e. workflow class) are safe with the current code.

fromtemporalio.clientimportClient,WorkflowHistoryfromtemporalio.workerimportReplayerasyncdefcheck_past_histories(my_client:Client):replayer=Replayer(workflows=[SayHello])awaitreplayer.replay_workflows(awaitmy_client.list_workflows("WorkflowType = 'SayHello'").map_histories(),  )

Observability

Seehttps://github.com/temporalio/samples-python/tree/main/open_telemetry for a sample demonstrating collection ofmetrics and tracing data emitted by the SDK.

Metrics

The SDK emits various metrics by default: seehttps://docs.temporal.io/references/sdk-metrics. To configure additionalattributes to be emitted with all metrics, passglobal_tags when creating theTelemetryConfig.

For emitting custom metrics, the SDK makes a metric meter available:

The attributes emitted by these default tonamespace,task_queue, andworkflow_type/activity_type; usewith_additional_attributes to create a meter emitting additional attributes.

OpenTelemetry Tracing

Tracing support requires the optionalopentelemetry dependencies which are part of theopentelemetry extra. Whenusingpip, running

pip install 'temporalio[opentelemetry]'

will install needed dependencies. Then thetemporalio.contrib.opentelemetry.TracingInterceptor can be created and setas an interceptor on theinterceptors argument ofClient.connect. When set, spans will be created for all clientcalls and for all activity and workflow invocations on the worker, spans will be created and properly serialized throughthe server to give one proper trace for a workflow execution.

Protobuf 3.x vs 4.x

Python currently has two somewhat-incompatible protobuf library versions - the 3.x series and the 4.x series. Pythoncurrently recommends 4.x and that is the primary supported version. Some libraries likePulumi require 4.x. Other libraries such asONNX andStreamlit, for one reason or another, have/will not leave 3.x.

To support these, Temporal Python SDK allows any protobuf library >= 3.19. However, the C extension in older Pythonversions can cause issues with the sandbox due to global state sharing. Temporal strongly recommends using the latestprotobuf 4.x library unless you absolutely cannot at which point some proto libraries may have to be marked asPassthrough Modules.

Known Compatibility Issues

Below are known compatibility issues with the Python SDK.

gevent Patching

When usinggevent.monkey.patch_all(), asyncio event loops can get messed up, especially those using custom event loopslike Temporal. Seethis gevent issue. This is a known incompatibility andusers are encouraged to not use gevent in asyncio applications (including Temporal). But if you must, there isa sample showing how it is possible.

Development

The Python SDK is built to work with Python 3.9 and newer. It is built usingSDK Core which is written in Rust.

Building

Prepare

To build the SDK from source for use as a dependency, the following prerequisites are required:

Useuv to installpoe:

uv tool install poethepoet

Now clone the SDK repository recursively:

git clone --recursive https://github.com/temporalio/sdk-python.gitcd sdk-python

Install the dependencies:

uv sync --all-extras

Build

Now perform the release build:

This will take a while because Rust will compile the core project in release mode (seeLocal SDK developmentenvironment for the quicker approach to local development).

uv build

The.whl wheel file indist/ is now ready to use.

Use

The wheel can now be installed into any virtual environment.

For example,create a virtual environmentsomewhere and then run the following inside the virtual environment:

pip install wheel
pip install /path/to/cloned/sdk-python/dist/*.whl

Create this Python file atexample.py:

importasynciofromtemporalioimportworkflow,activityfromtemporalio.clientimportClientfromtemporalio.workerimportWorker@workflow.defnclassSayHello:@workflow.runasyncdefrun(self,name:str)->str:returnf"Hello,{name}!"asyncdefmain():client=awaitClient.connect("localhost:7233")asyncwithWorker(client,task_queue="my-task-queue",workflows=[SayHello]):result=awaitclient.execute_workflow(SayHello.run,"Temporal",id="my-workflow-id",task_queue="my-task-queue")print(f"Result:{result}")if__name__=="__main__":asyncio.run(main())

Assuming there is alocal Temporal server running, execute thefile withpython (orpython3 if necessary):

python example.py

It should output:

Result: Hello, Temporal!

Local SDK development environment

For local development, it is quicker to use a debug build.

Perform the same steps as the "Prepare" section above by installing the prerequisites, cloning the project, andinstalling dependencies:

git clone --recursive https://github.com/temporalio/sdk-python.gitcd sdk-pythonuv sync --all-extras

Now compile the Rust extension in develop mode which is quicker than release mode:

poe build-develop

That step can be repeated for any Rust changes made.

The environment is now ready to develop in.

Testing

To execute tests:

poetest

This runs againstTemporalite. To run against the time-skipping testserver, pass--workflow-environment time-skipping. To run against thedefault namespace of an already-runningserver, pass thehost:port to--workflow-environment. Can also use regular pytest arguments. For example, here's howto run a single test with debug logs on the console:

poetest -s --log-cli-level=DEBUG -k test_sync_activity_thread_cancel_caught

Proto Generation and Testing

To allow for backwards compatibility, protobuf code is generated on the 3.x series of the protobuf library. To generateprotobuf code, you must be on Python <= 3.10, and then runuv add "protobuf<4" +uv sync --all-extras. Then theprotobuf files can be generated viapoe gen-protos. Tests can be run for protobuf version 3 by setting theTEMPORAL_TEST_PROTO3 env var to1 prior to running tests.

Do not commituv.lock orpyproject.toml changes. To go back from this downgrade, restore both of those files and runuv sync --all-extras. Make sure youpoe format the results.

For a less system-intrusive approach, you can:

docker build -f scripts/_proto/Dockerfile.docker run --rm -v"${PWD}/temporalio/api:/api_new" -v"${PWD}/temporalio/bridge/proto:/bridge_new"<just built image sha>poe format

Style

  • MostlyGoogle Style Guide. Notable exceptions:
    • We useruff for formatting, so that takes precedence
    • In tests and example code, can import individual classes/functions to make it more readable. Can also do this forrarely in library code for some Python common items (e.g.dataclass orpartial), but not allowed to do this foranytemporalio packages (excepttemporalio.types) or any classes/functions that aren't clear when unqualified.
    • We allow relative imports for private packages
    • We allow@staticmethod

[8]ページ先頭

©2009-2025 Movatter.jp