Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2c1ac54

Browse files
authored
New API to wait for handler executions to complete and warnings on unfinished handler executions (temporalio#556)
* Implement warning on unfinished signals and updates* Implement all_handlers_finished
1 parent2331aa4 commit2c1ac54

File tree

4 files changed

+570
-27
lines changed

4 files changed

+570
-27
lines changed

‎pyproject.toml‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,19 @@ env = { TEMPORAL_INTEGRATION_TEST = "1" }
9999
cmd ="pip uninstall temporalio -y"
100100

101101
[tool.pytest.ini_options]
102-
addopts ="-p no:warnings"
103102
asyncio_mode ="auto"
104103
log_cli =true
105104
log_cli_level ="INFO"
106105
log_cli_format ="%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
107106
testpaths = ["tests"]
108107
timeout =600
109108
timeout_func_only =true
109+
filterwarnings = [
110+
"error::temporalio.workflow.UnfinishedUpdateHandlersWarning",
111+
"error::temporalio.workflow.UnfinishedSignalHandlersWarning",
112+
"ignore::pytest.PytestDeprecationWarning",
113+
"ignore::DeprecationWarning",
114+
]
110115

111116
[tool.cibuildwheel]
112117
# We only want the 3.8 64-bit build of each type. However, due to

‎temporalio/worker/_workflow_instance.py‎

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
importcollections
77
importcontextvars
88
importinspect
9+
importjson
910
importlogging
1011
importrandom
1112
importsys
1213
importtraceback
13-
importtyping
1414
importwarnings
1515
fromabcimportABC,abstractmethod
1616
fromcontextlibimportcontextmanager
@@ -25,6 +25,7 @@
2525
Dict,
2626
Generator,
2727
Generic,
28+
Iterable,
2829
Iterator,
2930
List,
3031
Mapping,
@@ -240,6 +241,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
240241
self._queries=dict(self._defn.queries)
241242
self._updates=dict(self._defn.updates)
242243

244+
# We record in-progress signals and updates in order to support waiting for handlers to
245+
# finish, and issuing warnings when the workflow exits with unfinished handlers. Since
246+
# signals lack a unique per-invocation identifier, we introduce a sequence number for the
247+
# purpose.
248+
self._handled_signals_seq=0
249+
self._in_progress_signals:Dict[int,HandlerExecution]= {}
250+
self._in_progress_updates:Dict[str,HandlerExecution]= {}
251+
243252
# Add stack trace handler
244253
# TODO(cretz): Is it ok that this can be forcefully overridden by the
245254
# workflow author? They could technically override in interceptor
@@ -406,12 +415,15 @@ def activate(
406415
command.HasField("complete_workflow_execution")
407416
orcommand.HasField("continue_as_new_workflow_execution")
408417
orcommand.HasField("fail_workflow_execution")
418+
orcommand.HasField("cancel_workflow_execution")
409419
)
410420
elifnotcommand.HasField("respond_to_query"):
411421
delself._current_completion.successful.commands[i]
412422
continue
413423
i+=1
414424

425+
ifseen_completion:
426+
self._warn_if_unfinished_handlers()
415427
returnself._current_completion
416428

417429
def_apply(
@@ -490,6 +502,9 @@ async def run_update() -> None:
490502
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. "
491503
f"known updates: [{' '.join(known_updates)}]"
492504
)
505+
self._in_progress_updates[job.id]=HandlerExecution(
506+
job.name,defn.unfinished_policy,job.id
507+
)
493508
args=self._process_handler_args(
494509
job.name,
495510
job.input,
@@ -572,6 +587,8 @@ async def run_update() -> None:
572587
)
573588
return
574589
raise
590+
finally:
591+
self._in_progress_updates.pop(job.id,None)
575592

576593
self.create_task(
577594
run_update(),
@@ -869,6 +886,9 @@ def _apply_update_random_seed(
869886
#### _Runtime direct workflow call overrides ####
870887
# These are in alphabetical order and all start with "workflow_".
871888

889+
defworkflow_all_handlers_finished(self)->bool:
890+
returnnotself._in_progress_updatesandnotself._in_progress_signals
891+
872892
defworkflow_continue_as_new(
873893
self,
874894
*args:Any,
@@ -1596,6 +1616,31 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool:
15961616
)
15971617
)
15981618

1619+
def_warn_if_unfinished_handlers(self)->None:
1620+
defwarnable(handler_executions:Iterable[HandlerExecution]):
1621+
return [
1622+
ex
1623+
forexinhandler_executions
1624+
ifex.unfinished_policy
1625+
==temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON
1626+
]
1627+
1628+
warnable_updates=warnable(self._in_progress_updates.values())
1629+
ifwarnable_updates:
1630+
warnings.warn(
1631+
temporalio.workflow.UnfinishedUpdateHandlersWarning(
1632+
_make_unfinished_update_handler_message(warnable_updates)
1633+
)
1634+
)
1635+
1636+
warnable_signals=warnable(self._in_progress_signals.values())
1637+
ifwarnable_signals:
1638+
warnings.warn(
1639+
temporalio.workflow.UnfinishedSignalHandlersWarning(
1640+
_make_unfinished_signal_handler_message(warnable_signals)
1641+
)
1642+
)
1643+
15991644
def_next_seq(self,type:str)->int:
16001645
seq=self._curr_seqs.get(type,0)+1
16011646
self._curr_seqs[type]=seq
@@ -1646,10 +1691,21 @@ def _process_signal_job(
16461691
input=HandleSignalInput(
16471692
signal=job.signal_name,args=args,headers=job.headers
16481693
)
1649-
self.create_task(
1694+
1695+
self._handled_signals_seq+=1
1696+
id=self._handled_signals_seq
1697+
self._in_progress_signals[id]=HandlerExecution(
1698+
job.signal_name,defn.unfinished_policy
1699+
)
1700+
1701+
defdone_callback(f):
1702+
self._in_progress_signals.pop(id,None)
1703+
1704+
task=self.create_task(
16501705
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
16511706
name=f"signal:{job.signal_name}",
16521707
)
1708+
task.add_done_callback(done_callback)
16531709

16541710
def_register_task(
16551711
self,
@@ -2686,3 +2742,55 @@ def set(
26862742

26872743
class_WorkflowBeingEvictedError(BaseException):
26882744
pass
2745+
2746+
2747+
@dataclass
2748+
classHandlerExecution:
2749+
"""Information about an execution of a signal or update handler."""
2750+
2751+
name:str
2752+
unfinished_policy:temporalio.workflow.HandlerUnfinishedPolicy
2753+
id:Optional[str]=None
2754+
2755+
2756+
def_make_unfinished_update_handler_message(
2757+
handler_executions:List[HandlerExecution],
2758+
)->str:
2759+
message="""
2760+
Workflow finished while update handlers are still running. This may have interrupted work that the
2761+
update handler was doing, and the client that sent the update will receive a 'workflow execution
2762+
already completed' RPCError instead of the update result. You can wait for all update and signal
2763+
handlers to complete by using `await workflow.wait_condition(lambda:
2764+
workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the update
2765+
are okay with interrupting running handlers when the workflow finishes, and causing clients to
2766+
receive errors, then you can disable this warning via the update handler decorator:
2767+
`@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
2768+
""".replace(
2769+
"\n"," "
2770+
).strip()
2771+
return (
2772+
f"{message} The following updates were unfinished (and warnings were not disabled for their handler): "
2773+
+json.dumps([{"name":ex.name,"id":ex.id}forexinhandler_executions])
2774+
)
2775+
2776+
2777+
def_make_unfinished_signal_handler_message(
2778+
handler_executions:List[HandlerExecution],
2779+
)->str:
2780+
message="""
2781+
Workflow finished while signal handlers are still running. This may have interrupted work that the
2782+
signal handler was doing. You can wait for all update and signal handlers to complete by using
2783+
`await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both
2784+
you and the clients sending the signal are okay with interrupting running handlers when the workflow
2785+
finishes, and causing clients to receive errors, then you can disable this warning via the signal
2786+
handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
2787+
""".replace(
2788+
"\n"," "
2789+
).strip()
2790+
names=collections.Counter(ex.nameforexinhandler_executions)
2791+
return (
2792+
f"{message} The following signals were unfinished (and warnings were not disabled for their handler): "
2793+
+json.dumps(
2794+
[{"name":name,"count":count}forname,countinnames.most_common()]
2795+
)
2796+
)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp