Skip to content

Add activity trigger encode/decode logic for durable function #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions azure/functions/durable_functions.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,91 @@
from typing import Any
import typing
import json

from azure.functions import _durable_functions

from . import meta


# Durable Function Orchestration Trigger
class OrchestrationTriggerConverter(meta.InConverter,
meta.OutConverter,
binding='orchestrationTrigger',
trigger=True):
@classmethod
def check_input_type_annotation(cls, pytype):
return issubclass(pytype, _durable_functions.OrchestrationContext)

@classmethod
def check_output_type_annotation(cls, pytype):
# Implicit output should accept any return type
return True

@classmethod
def decode(cls,
data: meta.Datum, *,
trigger_metadata) -> _durable_functions.OrchestrationContext:
return _durable_functions.OrchestrationContext(data.value)

@classmethod
def encode(cls, obj: typing.Any, *,
expected_type: typing.Optional[type]) -> meta.Datum:
# Durable function context should be a json
return meta.Datum(type='json', value=obj)

@classmethod
def has_implicit_output(cls) -> bool:
return True


# Durable Function Activity Trigger
class ActivityTriggerConverter(meta.InConverter,
meta.OutConverter,
binding='activityTrigger',
trigger=True):
@classmethod
def check_input_type_annotation(cls, pytype):
# Activity Trigger's arguments should accept any types
return True

@classmethod
def check_output_type_annotation(cls, pytype):
# The activity trigger should accept any JSON serializable types
return True

@classmethod
def decode(cls,
data: meta.Datum, *,
trigger_metadata) -> Any:
if getattr(data, 'value', None) is not None:
return data.value
trigger_metadata) -> typing.Any:
data_type = data.type

# Durable functions extension always returns a string of json
# See durable functions library's call_activity_task docs
if data_type == 'string' or data_type == 'json':
try:
result = json.loads(data.value)
except json.JSONDecodeError:
# String failover if the content is not json serializable
result = data.value
except Exception:
raise ValueError(
'activity trigger input must be a string or a '
f'valid json serializable ({data.value})')
else:
raise NotImplementedError(
f'unsupported activity trigger payload type: {data_type}')

return result

@classmethod
def encode(cls, obj: typing.Any, *,
expected_type: typing.Optional[type]) -> meta.Datum:
try:
result = json.dumps(obj)
except TypeError:
raise ValueError(
f'activity trigger output must be json serializable ({obj})')

return data
return meta.Datum(type='json', value=result)

@classmethod
def has_implicit_output(cls) -> bool:
Expand Down
126 changes: 113 additions & 13 deletions tests/test_durable_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,119 @@ def test_orchestration_trigger_has_implicit_return(self):
OrchestrationTriggerConverter.has_implicit_output()
)

def test_activity_trigger_accepts_any_types(self):
datum_set = {
Datum('string', str),
Datum(123, int),
Datum(1234.56, float),
Datum('string'.encode('utf-8'), bytes),
Datum(Datum('{ "json": true }', str), Datum)
}

for datum in datum_set:
out = ActivityTriggerConverter.decode(datum, trigger_metadata=None)
self.assertEqual(out, datum.value)
self.assertEqual(type(out), datum.type)
def test_activity_trigger_inputs(self):
# Activity Trigger only accept string type from durable extensions
# It will be JSON deserialized into expected data type
data = [
{
'input': Datum('sample', 'string'),
'expected_value': 'sample',
'expected_type': str
},
{
'input': Datum('123', 'string'),
'expected_value': 123,
'expected_type': int
},
{
'input': Datum('1234.56', 'string'),
'expected_value': 1234.56,
'expected_type': float
},
{
'input': Datum('[ "do", "re", "mi" ]', 'string'),
'expected_value': ["do", "re", "mi"],
'expected_type': list
},
{
'input': Datum('{ "number": "42" }', 'string'),
'expected_value': {"number": "42"},
'expected_type': dict
}
]

for datum in data:
decoded = ActivityTriggerConverter.decode(
data=datum['input'],
trigger_metadata=None)
self.assertEqual(decoded, datum['expected_value'])
self.assertEqual(type(decoded), datum['expected_type'])

def test_activity_trigger_encode(self):
# Activity Trigger allow any JSON serializable as outputs
# The return value will be carried back to the Orchestrator function
data = [
{
'output': str('sample'),
'expected_value': Datum('"sample"', 'json'),
},
{
'output': int(123),
'expected_value': Datum('123', 'json'),
},
{
'output': float(1234.56),
'expected_value': Datum('1234.56', 'json')
},
{
'output': list(["do", "re", "mi"]),
'expected_value': Datum('["do", "re", "mi"]', 'json')
},
{
'output': dict({"number": "42"}),
'expected_value': Datum('{"number": "42"}', 'json')
}
]

for datum in data:
encoded = ActivityTriggerConverter.encode(
obj=datum['output'],
expected_type=type(datum['output']))
self.assertEqual(encoded, datum['expected_value'])

def test_activity_trigger_decode(self):
# Activity Trigger allow inputs to be any JSON serializables
# The input values to the trigger should be passed into arguments
data = [
{
'input': Datum('sample_string', 'string'),
'expected_value': str('sample_string')
},
{
'input': Datum('"sample_json_string"', 'json'),
'expected_value': str('sample_json_string')
},
{
'input': Datum('{ "invalid": "json"', 'json'),
'expected_value': str('{ "invalid": "json"')
},
{
'input': Datum('true', 'json'),
'expected_value': bool(True),
},
{
'input': Datum('123', 'json'),
'expected_value': int(123),
},
{
'input': Datum('1234.56', 'json'),
'expected_value': float(1234.56)
},
{
'input': Datum('["do", "re", "mi"]', 'json'),
'expected_value': list(["do", "re", "mi"])
},
{
'input': Datum('{"number": "42"}', 'json'),
'expected_value': dict({"number": "42"})
}
]

for datum in data:
decoded = ActivityTriggerConverter.decode(
data=datum['input'],
trigger_metadata=None)
self.assertEqual(decoded, datum['expected_value'])

def test_activity_trigger_has_implicit_return(self):
self.assertTrue(
Expand Down