Skip to content

Commit 3d19c2a

Browse files
authored
Handle eventhub cardinality (#24)
1 parent a9bfbc6 commit 3d19c2a

File tree

3 files changed

+129
-23
lines changed

3 files changed

+129
-23
lines changed

azure/functions/eventhub.py

Lines changed: 114 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,70 @@ def check_output_type_annotation(cls, pytype) -> bool:
2222
)
2323

2424
@classmethod
25-
def decode(cls, data: meta.Datum, *,
26-
trigger_metadata) -> _eventhub.EventHubEvent:
25+
def decode(
26+
cls, data: meta.Datum, *, trigger_metadata
27+
) -> typing.Union[_eventhub.EventHubEvent,
28+
typing.List[_eventhub.EventHubEvent]]:
2729
data_type = data.type
2830

29-
if data_type == 'string':
30-
body = data.value.encode('utf-8')
31-
32-
elif data_type == 'bytes':
33-
body = data.value
31+
if (data_type == 'string' or data_type == 'bytes'
32+
or data_type == 'json'):
33+
return cls.decode_single_event(data, trigger_metadata)
3434

35-
elif data_type == 'json':
36-
body = data.value.encode('utf-8')
35+
elif (data_type == 'collection_bytes'
36+
or data_type == 'collection_string'):
37+
return cls.decode_multiple_events(data, trigger_metadata)
3738

3839
else:
3940
raise NotImplementedError(
4041
f'unsupported event data payload type: {data_type}')
4142

43+
@classmethod
44+
def decode_single_event(cls, data,
45+
trigger_metadata) -> _eventhub.EventHubEvent:
46+
if data.type == 'string':
47+
body = data.value.encode('utf-8')
48+
49+
elif data.type == 'bytes':
50+
body = data.value
51+
52+
elif data.type == 'json':
53+
body = data.value.encode('utf-8')
54+
4255
return _eventhub.EventHubEvent(body=body)
4356

57+
@classmethod
58+
def decode_multiple_events(
59+
cls, data, trigger_metadata
60+
) -> typing.List[_eventhub.EventHubEvent]:
61+
if data.type == 'collection_bytes':
62+
parsed_data = data.value.bytes
63+
64+
elif data.type == 'collection_string':
65+
parsed_data = data.value.string
66+
67+
events = []
68+
for i in range(len(parsed_data)):
69+
event = _eventhub.EventHubEvent(body=parsed_data[i])
70+
events.append(event)
71+
72+
return events
73+
4474
@classmethod
4575
def encode(cls, obj: typing.Any, *,
46-
expected_type: typing.Optional[type]) -> meta.Datum:
76+
expected_type: typing.Optional[type]
77+
) -> meta.Datum:
78+
data = meta.Datum(type=None, value=None)
79+
4780
if isinstance(obj, str):
4881
data = meta.Datum(type='string', value=obj)
4982

5083
elif isinstance(obj, bytes):
5184
data = meta.Datum(type='bytes', value=obj)
5285

86+
elif isinstance(obj, int):
87+
data = meta.Datum(type='int', value=obj)
88+
5389
elif isinstance(obj, list):
5490
data = meta.Datum(type='json', value=json.dumps(obj))
5591

@@ -58,25 +94,37 @@ def encode(cls, obj: typing.Any, *,
5894

5995
class EventHubTriggerConverter(EventHubConverter,
6096
binding='eventHubTrigger', trigger=True):
61-
6297
@classmethod
63-
def decode(cls, data: meta.Datum, *,
64-
trigger_metadata) -> _eventhub.EventHubEvent:
98+
def decode(
99+
cls, data: meta.Datum, *, trigger_metadata
100+
) -> typing.Union[_eventhub.EventHubEvent,
101+
typing.List[_eventhub.EventHubEvent]]:
65102
data_type = data.type
66103

67-
if data_type == 'string':
68-
body = data.value.encode('utf-8')
69-
70-
elif data_type == 'bytes':
71-
body = data.value
104+
if (data_type == 'string' or data_type == 'bytes'
105+
or data_type == 'json'):
106+
return cls.decode_single_event(data, trigger_metadata)
72107

73-
elif data_type == 'json':
74-
body = data.value.encode('utf-8')
108+
elif (data_type == 'collection_bytes'
109+
or data_type == 'collection_string'):
110+
return cls.decode_multiple_events(data, trigger_metadata)
75111

76112
else:
77113
raise NotImplementedError(
78114
f'unsupported event data payload type: {data_type}')
79115

116+
@classmethod
117+
def decode_single_event(cls, data,
118+
trigger_metadata) -> _eventhub.EventHubEvent:
119+
if data.type == 'string':
120+
body = data.value.encode('utf-8')
121+
122+
elif data.type == 'bytes':
123+
body = data.value
124+
125+
elif data.type == 'json':
126+
body = data.value.encode('utf-8')
127+
80128
iothub_metadata = {}
81129
for f in trigger_metadata:
82130
if f.startswith('iothub-'):
@@ -96,3 +144,49 @@ def decode(cls, data: meta.Datum, *,
96144
trigger_metadata, 'Offset', python_type=str),
97145
iothub_metadata=iothub_metadata
98146
)
147+
148+
@classmethod
149+
def decode_multiple_events(
150+
cls, data, trigger_metadata
151+
) -> typing.List[_eventhub.EventHubEvent]:
152+
if data.type == 'collection_bytes':
153+
parsed_data = data.value.bytes
154+
155+
elif data.type == 'collection_string':
156+
parsed_data = data.value.string
157+
158+
sys_props = trigger_metadata.get('SystemPropertiesArray')
159+
160+
parsed_sys_props = json.loads(sys_props.value)
161+
162+
if len(parsed_data) != len(parsed_sys_props):
163+
raise AssertionError('Number of bodies and metadata mismatched')
164+
165+
events = []
166+
for i in range(len(parsed_data)):
167+
enqueued_time = parsed_sys_props[i].get('EnqueuedTimeUtc')
168+
partition_key = cls.encode(
169+
parsed_sys_props[i].get('PartitionKey'),
170+
expected_type=str)
171+
sequence_number = cls.encode(
172+
parsed_sys_props[i].get('SequenceNumber'),
173+
expected_type=int)
174+
offset = cls.encode(
175+
parsed_sys_props[i].get('Offset'),
176+
expected_type=int)
177+
178+
event = _eventhub.EventHubEvent(
179+
body=parsed_data[i],
180+
enqueued_time=cls._parse_datetime(enqueued_time),
181+
partition_key=cls._decode_typed_data(
182+
partition_key, python_type=str),
183+
sequence_number=cls._decode_typed_data(
184+
sequence_number, python_type=int),
185+
offset=cls._decode_typed_data(
186+
offset, python_type=int),
187+
iothub_metadata={}
188+
)
189+
190+
events.append(event)
191+
192+
return events

azure/functions/meta.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ def _decode_typed_data(
105105
elif data_type == 'double':
106106
result = data.value
107107

108+
elif data_type == 'collection_bytes':
109+
result = data.value
110+
111+
elif data_type == 'collection_string':
112+
result = data.value
113+
114+
elif data_type == 'collection_sint64':
115+
result = data.value
116+
117+
elif data_type is None:
118+
return None
119+
108120
else:
109121
raise ValueError(
110122
f'unsupported type of {context}: {data_type}')
@@ -223,7 +235,7 @@ def check_output_type_annotation(cls, pytype: type) -> bool:
223235

224236
@abc.abstractclassmethod
225237
def encode(cls, obj: typing.Any, *,
226-
expeced_type: typing.Optional[type]) -> Datum:
238+
expected_type: typing.Optional[type]) -> typing.Optional[Datum]:
227239
raise NotImplementedError
228240

229241

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
setup(
55
name='azure-functions',
6-
version='1.0.0b5',
6+
version='1.0.3',
77
description='Azure Functions for Python',
88
author='Microsoft Corporation',
99
author_email='[email protected]',
@@ -15,7 +15,7 @@
1515
'Operating System :: POSIX',
1616
'Operating System :: MacOS :: MacOS X',
1717
'Environment :: Web Environment',
18-
'Development Status :: 3 - Alpha',
18+
'Development Status :: 5 - Production/Stable',
1919
],
2020
license='MIT',
2121
packages=['azure.functions'],

0 commit comments

Comments
 (0)