|
6 | 6 | importcollections |
7 | 7 | importcontextvars |
8 | 8 | importinspect |
| 9 | +importjson |
9 | 10 | importlogging |
10 | 11 | importrandom |
11 | 12 | importsys |
12 | 13 | importtraceback |
13 | | -importtyping |
14 | 14 | importwarnings |
15 | 15 | fromabcimportABC,abstractmethod |
16 | 16 | fromcontextlibimportcontextmanager |
|
25 | 25 | Dict, |
26 | 26 | Generator, |
27 | 27 | Generic, |
| 28 | +Iterable, |
28 | 29 | Iterator, |
29 | 30 | List, |
30 | 31 | Mapping, |
@@ -240,6 +241,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: |
240 | 241 | self._queries=dict(self._defn.queries) |
241 | 242 | self._updates=dict(self._defn.updates) |
242 | 243 |
|
| 244 | +# We record in-progress signals and updates in order to support waiting for handlers to |
| 245 | +# finish, and issuing warnings when the workflow exits with unfinished handlers. Since |
| 246 | +# signals lack a unique per-invocation identifier, we introduce a sequence number for the |
| 247 | +# purpose. |
| 248 | +self._handled_signals_seq=0 |
| 249 | +self._in_progress_signals:Dict[int,HandlerExecution]= {} |
| 250 | +self._in_progress_updates:Dict[str,HandlerExecution]= {} |
| 251 | + |
243 | 252 | # Add stack trace handler |
244 | 253 | # TODO(cretz): Is it ok that this can be forcefully overridden by the |
245 | 254 | # workflow author? They could technically override in interceptor |
@@ -406,12 +415,15 @@ def activate( |
406 | 415 | command.HasField("complete_workflow_execution") |
407 | 416 | orcommand.HasField("continue_as_new_workflow_execution") |
408 | 417 | orcommand.HasField("fail_workflow_execution") |
| 418 | +orcommand.HasField("cancel_workflow_execution") |
409 | 419 | ) |
410 | 420 | elifnotcommand.HasField("respond_to_query"): |
411 | 421 | delself._current_completion.successful.commands[i] |
412 | 422 | continue |
413 | 423 | i+=1 |
414 | 424 |
|
| 425 | +ifseen_completion: |
| 426 | +self._warn_if_unfinished_handlers() |
415 | 427 | returnself._current_completion |
416 | 428 |
|
417 | 429 | def_apply( |
@@ -490,6 +502,9 @@ async def run_update() -> None: |
490 | 502 | f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. " |
491 | 503 | f"known updates: [{' '.join(known_updates)}]" |
492 | 504 | ) |
| 505 | +self._in_progress_updates[job.id]=HandlerExecution( |
| 506 | +job.name,defn.unfinished_policy,job.id |
| 507 | + ) |
493 | 508 | args=self._process_handler_args( |
494 | 509 | job.name, |
495 | 510 | job.input, |
@@ -572,6 +587,8 @@ async def run_update() -> None: |
572 | 587 | ) |
573 | 588 | return |
574 | 589 | raise |
| 590 | +finally: |
| 591 | +self._in_progress_updates.pop(job.id,None) |
575 | 592 |
|
576 | 593 | self.create_task( |
577 | 594 | run_update(), |
@@ -869,6 +886,9 @@ def _apply_update_random_seed( |
869 | 886 | #### _Runtime direct workflow call overrides #### |
870 | 887 | # These are in alphabetical order and all start with "workflow_". |
871 | 888 |
|
| 889 | +defworkflow_all_handlers_finished(self)->bool: |
| 890 | +returnnotself._in_progress_updatesandnotself._in_progress_signals |
| 891 | + |
872 | 892 | defworkflow_continue_as_new( |
873 | 893 | self, |
874 | 894 | *args:Any, |
@@ -1596,6 +1616,31 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool: |
1596 | 1616 | ) |
1597 | 1617 | ) |
1598 | 1618 |
|
| 1619 | +def_warn_if_unfinished_handlers(self)->None: |
| 1620 | +defwarnable(handler_executions:Iterable[HandlerExecution]): |
| 1621 | +return [ |
| 1622 | +ex |
| 1623 | +forexinhandler_executions |
| 1624 | +ifex.unfinished_policy |
| 1625 | +==temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON |
| 1626 | + ] |
| 1627 | + |
| 1628 | +warnable_updates=warnable(self._in_progress_updates.values()) |
| 1629 | +ifwarnable_updates: |
| 1630 | +warnings.warn( |
| 1631 | +temporalio.workflow.UnfinishedUpdateHandlersWarning( |
| 1632 | +_make_unfinished_update_handler_message(warnable_updates) |
| 1633 | + ) |
| 1634 | + ) |
| 1635 | + |
| 1636 | +warnable_signals=warnable(self._in_progress_signals.values()) |
| 1637 | +ifwarnable_signals: |
| 1638 | +warnings.warn( |
| 1639 | +temporalio.workflow.UnfinishedSignalHandlersWarning( |
| 1640 | +_make_unfinished_signal_handler_message(warnable_signals) |
| 1641 | + ) |
| 1642 | + ) |
| 1643 | + |
1599 | 1644 | def_next_seq(self,type:str)->int: |
1600 | 1645 | seq=self._curr_seqs.get(type,0)+1 |
1601 | 1646 | self._curr_seqs[type]=seq |
@@ -1646,10 +1691,21 @@ def _process_signal_job( |
1646 | 1691 | input=HandleSignalInput( |
1647 | 1692 | signal=job.signal_name,args=args,headers=job.headers |
1648 | 1693 | ) |
1649 | | -self.create_task( |
| 1694 | + |
| 1695 | +self._handled_signals_seq+=1 |
| 1696 | +id=self._handled_signals_seq |
| 1697 | +self._in_progress_signals[id]=HandlerExecution( |
| 1698 | +job.signal_name,defn.unfinished_policy |
| 1699 | + ) |
| 1700 | + |
| 1701 | +defdone_callback(f): |
| 1702 | +self._in_progress_signals.pop(id,None) |
| 1703 | + |
| 1704 | +task=self.create_task( |
1650 | 1705 | self._run_top_level_workflow_function(self._inbound.handle_signal(input)), |
1651 | 1706 | name=f"signal:{job.signal_name}", |
1652 | 1707 | ) |
| 1708 | +task.add_done_callback(done_callback) |
1653 | 1709 |
|
1654 | 1710 | def_register_task( |
1655 | 1711 | self, |
@@ -2686,3 +2742,55 @@ def set( |
2686 | 2742 |
|
2687 | 2743 | class_WorkflowBeingEvictedError(BaseException): |
2688 | 2744 | pass |
| 2745 | + |
| 2746 | + |
| 2747 | +@dataclass |
| 2748 | +classHandlerExecution: |
| 2749 | +"""Information about an execution of a signal or update handler.""" |
| 2750 | + |
| 2751 | +name:str |
| 2752 | +unfinished_policy:temporalio.workflow.HandlerUnfinishedPolicy |
| 2753 | +id:Optional[str]=None |
| 2754 | + |
| 2755 | + |
| 2756 | +def_make_unfinished_update_handler_message( |
| 2757 | +handler_executions:List[HandlerExecution], |
| 2758 | +)->str: |
| 2759 | +message=""" |
| 2760 | +Workflow finished while update handlers are still running. This may have interrupted work that the |
| 2761 | +update handler was doing, and the client that sent the update will receive a 'workflow execution |
| 2762 | +already completed' RPCError instead of the update result. You can wait for all update and signal |
| 2763 | +handlers to complete by using `await workflow.wait_condition(lambda: |
| 2764 | +workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the update |
| 2765 | +are okay with interrupting running handlers when the workflow finishes, and causing clients to |
| 2766 | +receive errors, then you can disable this warning via the update handler decorator: |
| 2767 | +`@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. |
| 2768 | +""".replace( |
| 2769 | +"\n"," " |
| 2770 | + ).strip() |
| 2771 | +return ( |
| 2772 | +f"{message} The following updates were unfinished (and warnings were not disabled for their handler): " |
| 2773 | ++json.dumps([{"name":ex.name,"id":ex.id}forexinhandler_executions]) |
| 2774 | + ) |
| 2775 | + |
| 2776 | + |
| 2777 | +def_make_unfinished_signal_handler_message( |
| 2778 | +handler_executions:List[HandlerExecution], |
| 2779 | +)->str: |
| 2780 | +message=""" |
| 2781 | +Workflow finished while signal handlers are still running. This may have interrupted work that the |
| 2782 | +signal handler was doing. You can wait for all update and signal handlers to complete by using |
| 2783 | +`await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both |
| 2784 | +you and the clients sending the signal are okay with interrupting running handlers when the workflow |
| 2785 | +finishes, and causing clients to receive errors, then you can disable this warning via the signal |
| 2786 | +handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. |
| 2787 | +""".replace( |
| 2788 | +"\n"," " |
| 2789 | + ).strip() |
| 2790 | +names=collections.Counter(ex.nameforexinhandler_executions) |
| 2791 | +return ( |
| 2792 | +f"{message} The following signals were unfinished (and warnings were not disabled for their handler): " |
| 2793 | ++json.dumps( |
| 2794 | + [{"name":name,"count":count}forname,countinnames.most_common()] |
| 2795 | + ) |
| 2796 | + ) |