diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 378cfc011..866d37f7d 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -322,6 +322,7 @@ def activate( activation_err: Optional[Exception] = None try: + asyncio._set_running_loop(self) # Split into job sets with patches, then signals + updates, then # non-queries, then queries job_sets: List[ @@ -368,6 +369,8 @@ def activate( "Ignoring exception while deleting workflow", exc_info=True ) activation_err = None + finally: + asyncio._set_running_loop(None) # If we're deleting, there better be no more tasks. It is important for # the integrity of the system that we check this. If there are tasks @@ -1732,37 +1735,32 @@ def _register_task( setattr(task, "_log_destroy_pending", False) def _run_once(self, *, check_conditions: bool) -> None: - try: - asyncio._set_running_loop(self) - - # We instantiate the workflow class _inside_ here because __init__ - # needs to run with this event loop set - if not self._object: - self._object = self._defn.cls() - - # Run while there is anything ready + # We instantiate the workflow class _inside_ here because __init__ + # needs to run with this event loop set + if not self._object: + self._object = self._defn.cls() + + # Run while there is anything ready + while self._ready: + # Run and remove all ready ones while self._ready: - # Run and remove all ready ones - while self._ready: - handle = self._ready.popleft() - handle._run() - - # Must throw here if not deleting. Only really set inside - # _run_top_level_workflow_function. - if self._current_activation_error and not self._deleting: - raise self._current_activation_error - - # Check conditions which may add to the ready list. Also remove - # conditions whose futures have already cancelled (e.g. when - # timed out). - if check_conditions: - self._conditions[:] = [ - t - for t in self._conditions - if not t[1].done() and not self._check_condition(*t) - ] - finally: - asyncio._set_running_loop(None) + handle = self._ready.popleft() + handle._run() + + # Must throw here if not deleting. Only really set inside + # _run_top_level_workflow_function. + if self._current_activation_error and not self._deleting: + raise self._current_activation_error + + # Check conditions which may add to the ready list. Also remove + # conditions whose futures have already cancelled (e.g. when + # timed out). + if check_conditions: + self._conditions[:] = [ + t + for t in self._conditions + if not t[1].done() and not self._check_condition(*t) + ] # This is used for the primary workflow function and signal handlers in # order to apply common exception handling to each