Skip to content

Commit 31927c1

Browse files
authored
Restore IoTHub metadata in EventHub (#42)
* Add iothub metadata in single & multiple * Fix format * Add unit tests * Fix linting * Trim off unnecessary changes * Change cardinary to cardinality
1 parent 1f9f90c commit 31927c1

File tree

3 files changed

+272
-15
lines changed

3 files changed

+272
-15
lines changed

.flake8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
ignore = W503,E402,E731
33
exclude =
44
.git, __pycache__, build, dist, .eggs, .github, .local,
5-
Samples, azure/functions/_thirdparty, docs/
5+
Samples, azure/functions/_thirdparty, docs/, .venv*/, .env*/, .vscode/

azure/functions/eventhub.py

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,10 @@ def decode(
105105
typing.List[_eventhub.EventHubEvent]]:
106106
data_type = data.type
107107

108-
if (data_type == 'string' or data_type == 'bytes'
109-
or data_type == 'json'):
108+
if cls._is_cardinality_one(trigger_metadata):
110109
return cls.decode_single_event(data, trigger_metadata)
111110

112-
elif (data_type == 'collection_bytes'
113-
or data_type == 'collection_string'):
111+
elif cls._is_cardinality_many(trigger_metadata):
114112
return cls.decode_multiple_events(data, trigger_metadata)
115113

116114
else:
@@ -129,13 +127,6 @@ def decode_single_event(cls, data,
129127
elif data.type == 'json':
130128
body = data.value.encode('utf-8')
131129

132-
iothub_metadata = {}
133-
for f in trigger_metadata:
134-
if f.startswith('iothub-'):
135-
v = cls._decode_trigger_metadata_field(
136-
trigger_metadata, f, python_type=str)
137-
iothub_metadata[f[len('iothub-'):]] = v
138-
139130
return _eventhub.EventHubEvent(
140131
body=body,
141132
enqueued_time=cls._parse_datetime_metadata(
@@ -146,7 +137,7 @@ def decode_single_event(cls, data,
146137
trigger_metadata, 'SequenceNumber', python_type=int),
147138
offset=cls._decode_trigger_metadata_field(
148139
trigger_metadata, 'Offset', python_type=str),
149-
iothub_metadata=iothub_metadata
140+
iothub_metadata=cls._decode_iothub_metadata(trigger_metadata)
150141
)
151142

152143
@classmethod
@@ -159,6 +150,10 @@ def decode_multiple_events(
159150
elif data.type == 'collection_string':
160151
parsed_data = data.value.string
161152

153+
# Input Trigger IotHub Event
154+
elif data.type == 'json':
155+
parsed_data = json.loads(data.value)
156+
162157
sys_props = trigger_metadata.get('SystemPropertiesArray')
163158

164159
parsed_sys_props = json.loads(sys_props.value)
@@ -180,17 +175,79 @@ def decode_multiple_events(
180175
expected_type=int)
181176

182177
event = _eventhub.EventHubEvent(
183-
body=parsed_data[i],
178+
body=cls._marshall_event_body(parsed_data[i], data.type),
184179
enqueued_time=cls._parse_datetime(enqueued_time),
185180
partition_key=cls._decode_typed_data(
186181
partition_key, python_type=str),
187182
sequence_number=cls._decode_typed_data(
188183
sequence_number, python_type=int),
189184
offset=cls._decode_typed_data(
190185
offset, python_type=int),
191-
iothub_metadata={}
186+
iothub_metadata=cls._extract_iothub_from_dict(
187+
parsed_sys_props[i])
192188
)
193189

194190
events.append(event)
195191

196192
return events
193+
194+
@classmethod
195+
def _marshall_event_body(self, parsed_data, data_type):
196+
# In IoTHub, when setting the eventhub using cardinality = 'many'
197+
# The data is wrapped inside a json (e.g. '[{ "device-id": "1" }]')
198+
199+
# Previously, since the IoTHub events has a 'json' datatype,
200+
# it is handled as single_event by mistake and our users handle the
201+
# data parsing. And we want to keep the same behavior here.
202+
if data_type == 'json':
203+
return json.dumps(parsed_data).encode('utf-8')
204+
205+
return parsed_data
206+
207+
@classmethod
208+
def _decode_iothub_metadata(
209+
cls, trigger_metadata) -> typing.Dict[str, str]:
210+
# Try extracting iothub_metadata from trigger_metadata
211+
iothub_metadata = cls._extract_iothub_from_trigger_metadata(
212+
trigger_metadata)
213+
214+
# Try extracting iothub_metadata from SystemProperties
215+
if not iothub_metadata and trigger_metadata.get('SystemProperties'):
216+
iothub_metadata = cls._extract_iothub_from_system_properties(
217+
trigger_metadata['SystemProperties'].value)
218+
219+
return iothub_metadata
220+
221+
@classmethod
222+
def _extract_iothub_from_trigger_metadata(
223+
cls, metadict: typing.Dict[str, str]) -> typing.Dict[str, str]:
224+
iothub_metadata = {}
225+
for f in metadict:
226+
if f.startswith('iothub-'):
227+
v = cls._decode_trigger_metadata_field(
228+
metadict, f, python_type=str)
229+
iothub_metadata[f[len('iothub-'):]] = v
230+
return iothub_metadata
231+
232+
@classmethod
233+
def _extract_iothub_from_system_properties(
234+
cls, system_properties_string: str) -> typing.Dict[str, str]:
235+
system_properties = json.loads(system_properties_string)
236+
return cls._extract_iothub_from_dict(system_properties)
237+
238+
@classmethod
239+
def _extract_iothub_from_dict(
240+
cls, metadict: typing.Dict[str, str]) -> typing.Dict[str, str]:
241+
iothub_metadata = {}
242+
for f in metadict:
243+
if f.startswith('iothub-'):
244+
iothub_metadata[f[len('iothub-'):]] = metadict[f]
245+
return iothub_metadata
246+
247+
@classmethod
248+
def _is_cardinality_many(cls, trigger_metadata) -> bool:
249+
return 'SystemPropertiesArray' in trigger_metadata
250+
251+
@classmethod
252+
def _is_cardinality_one(cls, trigger_metadata) -> bool:
253+
return 'SystemProperties' in trigger_metadata

tests/test_eventhub.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,27 @@
11
from typing import List
22
import unittest
3+
import json
4+
from unittest.mock import patch
5+
from datetime import datetime
36

47
import azure.functions as func
58
import azure.functions.eventhub as azf_eh
9+
import azure.functions.meta as meta
10+
11+
12+
class CollectionBytes:
13+
def __init__(self, data: List[bytes]):
14+
self.bytes = data
15+
16+
17+
class CollectionString:
18+
def __init__(self, data: List[str]):
19+
self.string = list(map(lambda x: x.encode('utf-8'), data))
620

721

822
class TestEventHub(unittest.TestCase):
23+
MOCKED_ENQUEUE_TIME = datetime.utcnow()
24+
925
def test_eventhub_input_type(self):
1026
check_input_type = (
1127
azf_eh.EventHubConverter.check_input_type_annotation
@@ -26,3 +42,187 @@ def test_eventhub_output_type(self):
2642
self.assertFalse(check_output_type(func.EventHubEvent))
2743
self.assertFalse(check_output_type(List[bytes]))
2844
self.assertFalse(check_output_type(List[func.EventHubEvent]))
45+
46+
@patch('azure.functions.eventhub.EventHubTriggerConverter'
47+
'.decode_single_event')
48+
@patch('azure.functions.eventhub.EventHubTriggerConverter'
49+
'.decode_multiple_events')
50+
def test_eventhub_decode_single_event(self, dme_mock, dse_mock):
51+
azf_eh.EventHubTriggerConverter.decode(
52+
data=self._generate_single_iothub_datum(),
53+
trigger_metadata=self._generate_single_trigger_metadatum()
54+
)
55+
dse_mock.assert_called_once()
56+
dme_mock.assert_not_called()
57+
58+
@patch('azure.functions.eventhub.EventHubTriggerConverter'
59+
'.decode_single_event')
60+
@patch('azure.functions.eventhub.EventHubTriggerConverter'
61+
'.decode_multiple_events')
62+
def test_eventhub_decode_multiple_events(self, dme_mock, dse_mock):
63+
azf_eh.EventHubTriggerConverter.decode(
64+
data=self._generate_multiple_iothub_data(),
65+
trigger_metadata=self._generate_multiple_trigger_metadata()
66+
)
67+
dse_mock.assert_not_called()
68+
dme_mock.assert_called_once()
69+
70+
def test_eventhub_trigger_single_event_json(self):
71+
result = azf_eh.EventHubTriggerConverter.decode(
72+
data=self._generate_single_iothub_datum('json'),
73+
trigger_metadata=self._generate_single_trigger_metadatum()
74+
)
75+
# Result body always has the datatype of bytes
76+
self.assertEqual(
77+
result.get_body().decode('utf-8'), '{"device-status": "good"}'
78+
)
79+
self.assertEqual(result.enqueued_time, self.MOCKED_ENQUEUE_TIME)
80+
81+
def test_eventhub_trigger_single_event_bytes(self):
82+
result = azf_eh.EventHubTriggerConverter.decode(
83+
data=self._generate_single_iothub_datum('bytes'),
84+
trigger_metadata=self._generate_single_trigger_metadatum()
85+
)
86+
self.assertEqual(
87+
result.get_body().decode('utf-8'), '{"device-status": "good"}'
88+
)
89+
self.assertEqual(result.enqueued_time, self.MOCKED_ENQUEUE_TIME)
90+
91+
def test_iothub_metadata_single_event(self):
92+
result = azf_eh.EventHubTriggerConverter.decode(
93+
data=self._generate_single_iothub_datum('json'),
94+
trigger_metadata=self._generate_single_trigger_metadatum()
95+
)
96+
self.assertIsNotNone(result.iothub_metadata)
97+
self.assertEqual(
98+
result.iothub_metadata['connection-device-id'], 'MyTestDevice'
99+
)
100+
101+
def test_eventhub_trigger_single_event_string(self):
102+
result = azf_eh.EventHubTriggerConverter.decode(
103+
data=self._generate_single_iothub_datum('string'),
104+
trigger_metadata=self._generate_single_trigger_metadatum()
105+
)
106+
self.assertEqual(
107+
result.get_body().decode('utf-8'), '{"device-status": "good"}'
108+
)
109+
self.assertEqual(result.enqueued_time, self.MOCKED_ENQUEUE_TIME)
110+
111+
def test_eventhub_trigger_multiple_events_json(self):
112+
result = azf_eh.EventHubTriggerConverter.decode(
113+
data=self._generate_multiple_iothub_data('json'),
114+
trigger_metadata=self._generate_multiple_trigger_metadata()
115+
)
116+
self.assertIsInstance(result, list)
117+
self.assertEqual(len(result), 2)
118+
119+
self.assertEqual(result[0].enqueued_time, self.MOCKED_ENQUEUE_TIME)
120+
self.assertEqual(
121+
result[0].get_body().decode('utf-8'), '{"device-status": "good1"}'
122+
)
123+
124+
self.assertEqual(result[1].enqueued_time, self.MOCKED_ENQUEUE_TIME)
125+
self.assertEqual(
126+
result[1].get_body().decode('utf-8'), '{"device-status": "good2"}'
127+
)
128+
129+
def test_eventhub_trigger_multiple_events_collection_string(self):
130+
result = azf_eh.EventHubTriggerConverter.decode(
131+
data=self._generate_multiple_iothub_data('collection_string'),
132+
trigger_metadata=self._generate_multiple_trigger_metadata()
133+
)
134+
self.assertIsInstance(result, list)
135+
self.assertEqual(len(result), 2)
136+
137+
self.assertEqual(result[0].enqueued_time, self.MOCKED_ENQUEUE_TIME)
138+
self.assertEqual(
139+
result[0].get_body().decode('utf-8'), '{"device-status": "good1"}'
140+
)
141+
142+
self.assertEqual(result[1].enqueued_time, self.MOCKED_ENQUEUE_TIME)
143+
self.assertEqual(
144+
result[1].get_body().decode('utf-8'), '{"device-status": "good2"}'
145+
)
146+
147+
def test_eventhub_trigger_multiple_events_collection_bytes(self):
148+
result = azf_eh.EventHubTriggerConverter.decode(
149+
data=self._generate_multiple_iothub_data('collection_bytes'),
150+
trigger_metadata=self._generate_multiple_trigger_metadata()
151+
)
152+
self.assertIsInstance(result, list)
153+
self.assertEqual(len(result), 2)
154+
155+
self.assertEqual(result[0].enqueued_time, self.MOCKED_ENQUEUE_TIME)
156+
self.assertEqual(
157+
result[0].get_body().decode('utf-8'), '{"device-status": "good1"}'
158+
)
159+
160+
self.assertEqual(result[1].enqueued_time, self.MOCKED_ENQUEUE_TIME)
161+
self.assertEqual(
162+
result[1].get_body().decode('utf-8'), '{"device-status": "good2"}'
163+
)
164+
165+
def test_iothub_metadata_events(self):
166+
result = azf_eh.EventHubTriggerConverter.decode(
167+
data=self._generate_multiple_iothub_data('json'),
168+
trigger_metadata=self._generate_multiple_trigger_metadata()
169+
)
170+
self.assertIsNotNone(result[0].iothub_metadata)
171+
self.assertEqual(
172+
result[0].iothub_metadata['connection-device-id'], 'MyTestDevice1'
173+
)
174+
175+
self.assertIsNotNone(result[1].iothub_metadata)
176+
self.assertEqual(
177+
result[1].iothub_metadata['connection-device-id'], 'MyTestDevice2'
178+
)
179+
180+
def _generate_single_iothub_datum(self, datum_type='json'):
181+
datum = '{"device-status": "good"}'
182+
if datum_type == 'bytes':
183+
datum = datum.encode('utf-8')
184+
185+
return meta.Datum(datum, datum_type)
186+
187+
def _generate_multiple_iothub_data(self, data_type='json'):
188+
data = '[{"device-status": "good1"}, {"device-status": "good2"}]'
189+
if data_type == 'collection_bytes':
190+
data = list(
191+
map(lambda x: json.dumps(x).encode('utf-8'), json.loads(data))
192+
)
193+
data = CollectionBytes(data)
194+
elif data_type == 'collection_string':
195+
data = list(
196+
map(lambda x: json.dumps(x), json.loads(data))
197+
)
198+
data = CollectionString(data)
199+
200+
return meta.Datum(data, data_type)
201+
202+
def _generate_single_trigger_metadatum(self):
203+
return {
204+
'EnqueuedTime': meta.Datum(
205+
f'"{self.MOCKED_ENQUEUE_TIME.isoformat()}"', 'json'
206+
),
207+
'SystemProperties': meta.Datum(
208+
'{"iothub-connection-device-id": "MyTestDevice"}', 'json'
209+
)
210+
}
211+
212+
def _generate_multiple_trigger_metadata(self):
213+
system_props_array = [
214+
{
215+
'EnqueuedTimeUtc': self.MOCKED_ENQUEUE_TIME.isoformat(),
216+
'iothub-connection-device-id': 'MyTestDevice1',
217+
},
218+
{
219+
'EnqueuedTimeUtc': self.MOCKED_ENQUEUE_TIME.isoformat(),
220+
'iothub-connection-device-id': 'MyTestDevice2',
221+
}
222+
]
223+
224+
return {
225+
'SystemPropertiesArray': meta.Datum(
226+
json.dumps(system_props_array), 'json'
227+
)
228+
}

0 commit comments

Comments
 (0)