Skip to content

Commit b27dd33

Browse files
authored
prevent already scheduled tasks from being added to opentasks list (#457)
1 parent 635c3cf commit b27dd33

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,9 @@ def history_to_string(event):
661661

662662
def _add_to_open_tasks(self, task: TaskBase):
663663

664+
if task._is_scheduled:
665+
return
666+
664667
if isinstance(task, AtomicTask):
665668
if task.id is None:
666669
task.id = self._sequence_number

tests/orchestrator/test_sequential_orchestrator.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ def generator_function_reducing_when_all(context):
9595
yield context.call_activity("Hello", "London")
9696
return ""
9797

98+
99+
def generator_function_reuse_task_in_whenany(context):
100+
task1 = context.call_activity("Hello", "Tokyo")
101+
task2 = context.call_activity("Hello", "Seattle")
102+
pending_tasks = [task1, task2]
103+
104+
# Yield until first task is completed
105+
finished_task1 = yield context.task_any(pending_tasks)
106+
107+
# Remove completed task from pending tasks
108+
pending_tasks.remove(finished_task1)
109+
110+
task3 = context.call_activity("Hello", "London")
111+
tasks = pending_tasks + [task3]
112+
113+
# Yield remaining tasks
114+
yield context.task_any(tasks)
115+
return ""
116+
98117
def generator_function_compound_tasks(context):
99118
yield context.call_activity("Hello", "Tokyo")
100119

@@ -731,6 +750,31 @@ def test_reducing_when_any_pattern():
731750

732751
assert_orchestration_state_equals(expected, result)
733752

753+
def test_reducing_when_any_pattern():
754+
"""Tests that a user can call when_any on a progressively smaller list of already scheduled tasks"""
755+
context_builder = ContextBuilder('generator_function_reuse_task_in_whenany', replay_schema=ReplaySchema.V2)
756+
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
757+
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
758+
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")
759+
760+
result = get_orchestration_state_result(
761+
context_builder, generator_function_reuse_task_in_whenany)
762+
763+
# this scenario is only supported for V2 replay
764+
expected_state = base_expected_state("",replay_schema=ReplaySchema.V2)
765+
expected_state._actions = [
766+
[WhenAnyAction(
767+
[CallActivityAction("Hello", "Seattle"), CallActivityAction("Hello", "Tokyo")]),
768+
WhenAnyAction(
769+
[CallActivityAction("Hello", "London")])
770+
]
771+
]
772+
773+
expected_state._is_done = True
774+
expected = expected_state.to_json()
775+
776+
assert_orchestration_state_equals(expected, result)
777+
734778
def test_compound_tasks_return_single_action_in_V2():
735779
"""Tests that compound tasks, in the v2 replay schema, are represented as a single "deep" action"""
736780
context_builder = ContextBuilder('test_v2_replay_schema', replay_schema=ReplaySchema.V2)

0 commit comments

Comments
 (0)