Skip to content

Commit 44cff9a

Browse files
committed
Added Servicebus batch e2e test
1 parent 0514dc1 commit 44cff9a

File tree

2 files changed

+47
-26
lines changed

2 files changed

+47
-26
lines changed

tests/endtoend/servicebus_functions/servicebus_functions_stein/function_app.py

+40-10
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,53 @@ def servicebus_trigger(msg: func.ServiceBusMessage) -> str:
5555
return result
5656

5757

58+
@app.route(route="put_message_batch")
59+
@app.service_bus_queue_output(
60+
arg_name="msg",
61+
connection="AzureWebJobsServiceBusConnectionString",
62+
queue_name="testqueuebatch")
63+
def put_message_batch(req: func.HttpRequest, msg: func.Out[str]):
64+
msg.set(req.get_body().decode('utf-8'))
65+
return 'OK'
66+
67+
5868
@app.service_bus_queue_trigger(
5969
arg_name="msg",
6070
connection="AzureWebJobsServiceBusConnectionString",
61-
queue_name="testqueue",
62-
cardinality=func.Cardinality.MANY)
71+
queue_name="testqueuebatch", cardinality="many")
6372
@app.blob_output(arg_name="$return",
64-
path="python-worker-tests/servicebus-triggered_batch.txt",
73+
path="python-worker-tests/test-servicebus-batch.txt",
6574
connection="AzureWebJobsStorage")
66-
def servicebus_triggered(msg: func.ServiceBusMessage) -> str:
67-
return str(msg)
75+
def servicebus_trigger_batch(msg: func.ServiceBusMessage) -> str:
76+
msg = msg[0]
77+
print(f"Message ============> {msg}")
78+
result = json.dumps({
79+
'body': msg.get_body().decode('utf-8'),
80+
'content_type': msg.content_type,
81+
'delivery_count': msg.delivery_count,
82+
'expiration_time': (msg.expiration_time.isoformat() if
83+
msg.expiration_time else None),
84+
'label': msg.label,
85+
'partition_key': msg.partition_key,
86+
'reply_to': msg.reply_to,
87+
'reply_to_session_id': msg.reply_to_session_id,
88+
'scheduled_enqueue_time': (msg.scheduled_enqueue_time.isoformat() if
89+
msg.scheduled_enqueue_time else None),
90+
'session_id': msg.session_id,
91+
'time_to_live': msg.time_to_live,
92+
'to': msg.to,
93+
'user_properties': msg.user_properties,
94+
})
95+
96+
return result
6897

6998

70-
@app.route(route="servicebus_triggered")
99+
@app.route(route="get_servicebus_triggered_batch")
71100
@app.blob_input(arg_name="file",
72-
path="python-worker-tests/servicebus-triggered_batch.txt",
101+
path="python-worker-tests/test-servicebus-batch.txt",
73102
connection="AzureWebJobsStorage")
74-
def get_servicebus_triggered(req: func.HttpRequest,
75-
file: func.InputStream) -> str:
103+
def get_servicebus_triggered_batch(req: func.HttpRequest,
104+
file: func.InputStream) -> str:
76105
return func.HttpResponse(
77-
file.read().decode('utf-8'), mimetype='application/json')
106+
file.read().decode('utf-8'), mimetype='application/json')
107+

tests/endtoend/test_servicebus_functions.py

+7-16
Original file line numberDiff line numberDiff line change
@@ -47,27 +47,18 @@ def get_script_dir(cls):
4747

4848
@testutils.retryable_test(3, 5)
4949
def test_servicebus_batch(self):
50-
data = str({"value": "2024-01-19T12:50:41.250940Z"})
51-
r = self.webhost.request('POST', 'put_message',
50+
data = '{"value": "2024-01-19T12:50:41.250940Z"}'
51+
r = self.webhost.request('POST', 'put_message_batch',
5252
data=data)
5353
self.assertEqual(r.status_code, 200)
5454
self.assertEqual(r.text, 'OK')
5555

56-
max_retries = 10
57-
for try_no in range(max_retries):
58-
# wait for trigger to process the queue item
59-
time.sleep(1)
56+
time.sleep(2)
6057

61-
try:
62-
r = self.webhost.request('GET', 'servicebus_triggered')
63-
self.assertEqual(r.status_code, 200)
64-
msg = r.json()
65-
self.assertEqual(msg['body'], data)
66-
except (AssertionError, json.JSONDecodeError):
67-
if try_no == max_retries - 1:
68-
raise
69-
else:
70-
break
58+
r = self.webhost.request('GET', 'get_servicebus_triggered_batch')
59+
self.assertEqual(r.status_code, 200)
60+
msg = r.json()
61+
self.assertEqual(msg["body"], data)
7162

7263

7364
class TestServiceBusFunctionsSteinGeneric(TestServiceBusFunctions):

0 commit comments

Comments
 (0)