Skip to content

Expose OrchestratorGeneratorWrapper in SDK #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions azure-functions-durable-python.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.2.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{5D20AA90-6969-D8BD-9DCD-8634F4692FDA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "extensions", "samples\aml_monitoring\extensions.csproj", "{33E598B8-4178-679F-9B92-BE8D8A64F1A5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{33E598B8-4178-679F-9B92-BE8D8A64F1A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33E598B8-4178-679F-9B92-BE8D8A64F1A5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33E598B8-4178-679F-9B92-BE8D8A64F1A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33E598B8-4178-679F-9B92-BE8D8A64F1A5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{33E598B8-4178-679F-9B92-BE8D8A64F1A5} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AEA3AC93-4361-47CD-A8C7-CA280ABE1BDC}
EndGlobalSection
EndGlobal
4 changes: 3 additions & 1 deletion azure/durable_functions/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .DurableHttpRequest import DurableHttpRequest
from .TokenSource import ManagedIdentityTokenSource
from .DurableEntityContext import DurableEntityContext
from .Task import TaskBase

__all__ = [
'DurableOrchestrationBindings',
Expand All @@ -20,5 +21,6 @@
'OrchestratorState',
'OrchestrationRuntimeStatus',
'PurgeHistoryResult',
'RetryOptions'
'RetryOptions',
'TaskBase'
]
38 changes: 38 additions & 0 deletions azure/durable_functions/testing/OrchestratorGeneratorWrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Generator, Any, Union

from azure.durable_functions.models import TaskBase

def orchestrator_generator_wrapper(generator: Generator[TaskBase, Any, Any]) -> Generator[Union[TaskBase, Any], None, None]:
"""Wraps a user-defined orchestrator function to simulate the Durable replay logic.

Parameters
----------
generator: Generator[TaskBase, Any, Any]
Generator orchestrator as defined in the user function app. This generator is expected
to yield a series of TaskBase objects and receive the results of these tasks until
returning the result of the orchestrator.

Returns
-------
Generator[Union[TaskBase, Any], None, None]
A simplified version of the orchestrator which takes no inputs. This generator will
yield back the TaskBase objects that are yielded from the user orchestrator as well
as the final result of the orchestrator. Exception handling is also simulated here
in the same way as replay, where tasks returning exceptions are thrown back into the
orchestrator.
"""
previous = next(generator)
yield previous
while True:
try:
previous_result = None
try:
previous_result = previous.result
except Exception as e: # Simulated activity exceptions, timer interrupted exceptions, anytime a task would throw.
previous = generator.throw(e)
else:
previous = generator.send(previous_result)
yield previous
except StopIteration as e:
yield e.value
return
6 changes: 6 additions & 0 deletions azure/durable_functions/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Unit testing utilities for Azure Durable functions."""
from .OrchestratorGeneratorWrapper import orchestrator_generator_wrapper

__all__ = [
'orchestrator_generator_wrapper'
]
39 changes: 8 additions & 31 deletions samples-v2/blueprint/tests/test_my_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,29 @@
from datetime import timedelta
import unittest
from unittest.mock import Mock, call, patch
from azure.durable_functions.testing import orchestrator_generator_wrapper

from durable_blueprints import my_orchestrator

# A way to wrap an orchestrator generator to simplify calling it and getting the results.
# Because orchestrators in Durable Functions always accept the result of the previous activity for the next send() call,
# we can unwrap the orchestrator generator using this method to simplify per-test code.
def orchestrator_generator_wrapper(generator):
previous = next(generator)
yield previous
while True:
try:
previous_result = None
try:
previous_result = previous.result
except Exception as e: # Simulated activity exceptions, timer interrupted exceptions, anytime a task would throw.
previous = generator.throw(e)
else:
previous = generator.send(previous_result)
yield previous
except StopIteration as e:
yield e.value
return


class MockTask():
def __init__(self, result=None):
self.result = result


def mock_activity(activity_name, input):
@patch('azure.durable_functions.models.TaskBase')
def mock_activity(activity_name, input, task):
if activity_name == "say_hello":
return MockTask(f"Hello {input}!")
task.result = f"Hello {input}!"
return task
raise Exception("Activity not found")


class TestFunction(unittest.TestCase):
@patch('azure.durable_functions.DurableOrchestrationContext')
def test_chaining_orchestrator(self, context):
def test_my_orchestrator(self, context):
# Get the original method definition as seen in the function_app.py file
func_call = my_orchestrator.build().get_user_function().orchestrator_function

context.call_activity = Mock(side_effect=mock_activity)
# Create a generator using the method and mocked context
user_orchestrator = func_call(context)

# Use a method defined above to get the values from the generator. Quick unwrap for easy access
# Use orchestrator_generator_wrapper to get the values from the generator.
# Processes the orchestrator in a way that is equivalent to the Durable replay logic
values = [val for val in orchestrator_generator_wrapper(user_orchestrator)]

expected_activity_calls = [call('say_hello', 'Tokyo'),
Expand Down
45 changes: 14 additions & 31 deletions samples-v2/fan_in_fan_out/tests/test_E2_BackupSiteContent.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,20 @@
import unittest
from unittest.mock import Mock, call, patch
from azure.durable_functions.testing import orchestrator_generator_wrapper

from function_app import E2_BackupSiteContent

# A way to wrap an orchestrator generator to simplify calling it and getting the results.
# Because orchestrators in Durable Functions always accept the result of the previous activity for the next send() call,
# we can unwrap the orchestrator generator using this method to simplify per-test code.
def orchestrator_generator_wrapper(generator):
previous = next(generator)
yield previous
while True:
try:
previous_result = None
try:
previous_result = previous.result
except Exception as e: # Simulated activity exceptions, timer interrupted exceptions, anytime a task would throw.
previous = generator.throw(e)
else:
previous = generator.send(previous_result)
yield previous
except StopIteration as e:
yield e.value
return


class MockTask():
def __init__(self, result=None):
self.result = result


def mock_activity(activity_name, input):

@patch('azure.durable_functions.models.TaskBase')
def create_mock_task(result, task):
task.result = result
return task


def mock_activity(activity_name, input, task):
if activity_name == "E2_GetFileList":
return MockTask(["C:/test/E2_Activity.py", "C:/test/E2_Orchestrator.py"])
return MockTask(input)
return create_mock_task(["C:/test/E2_Activity.py", "C:/test/E2_Orchestrator.py"])
raise Exception("Activity not found")


class TestFunction(unittest.TestCase):
Expand All @@ -43,12 +25,13 @@ def test_E2_BackupSiteContent(self, context):

context.get_input = Mock(return_value="C:/test")
context.call_activity = Mock(side_effect=mock_activity)
context.task_all = Mock(return_value=MockTask([100, 200, 300]))
context.task_all = Mock(return_value=create_mock_task([100, 200, 300]))

# Execute the function code
user_orchestrator = func_call(context)

# Use a method defined above to get the values from the generator. Quick unwrap for easy access
# Use orchestrator_generator_wrapper to get the values from the generator.
# Processes the orchestrator in a way that is equivalent to the Durable replay logic
values = [val for val in orchestrator_generator_wrapper(user_orchestrator)]

expected_activity_calls = [call('E2_GetFileList', 'C:/test'),
Expand Down
38 changes: 8 additions & 30 deletions samples-v2/function_chaining/tests/test_my_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,15 @@
from datetime import timedelta
import unittest
from unittest.mock import Mock, call, patch
from azure.durable_functions.testing import orchestrator_generator_wrapper

from function_app import my_orchestrator

# A way to wrap an orchestrator generator to simplify calling it and getting the results.
# Because orchestrators in Durable Functions always accept the result of the previous activity for the next send() call,
# we can unwrap the orchestrator generator using this method to simplify per-test code.
def orchestrator_generator_wrapper(generator):
previous = next(generator)
yield previous
while True:
try:
previous_result = None
try:
previous_result = previous.result
except Exception as e: # Simulated activity exceptions, timer interrupted exceptions, anytime a task would throw.
previous = generator.throw(e)
else:
previous = generator.send(previous_result)
yield previous
except StopIteration as e:
yield e.value
return


class MockTask():
def __init__(self, result=None):
self.result = result


def mock_activity(activity_name, input):

@patch('azure.durable_functions.models.TaskBase')
def mock_activity(activity_name, input, task):
if activity_name == "say_hello":
return MockTask(f"Hello {input}!")
task.result = f"Hello {input}!"
return task
raise Exception("Activity not found")


Expand All @@ -47,7 +24,8 @@ def test_chaining_orchestrator(self, context):
# Create a generator using the method and mocked context
user_orchestrator = func_call(context)

# Use a method defined above to get the values from the generator. Quick unwrap for easy access
# Use orchestrator_generator_wrapper to get the values from the generator.
# Processes the orchestrator in a way that is equivalent to the Durable replay logic
values = [val for val in orchestrator_generator_wrapper(user_orchestrator)]

expected_activity_calls = [call('say_hello', 'Tokyo'),
Expand Down