Skip to content

Commit 370b9cb

Browse files
author
Evan Roman
committed
Try with EHE
1 parent 4e42516 commit 370b9cb

File tree

2 files changed

+50
-50
lines changed

2 files changed

+50
-50
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,52 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33
import json
4+
import typing
45

56
import azure.functions as func
6-
import azurefunctions.extensions.bindings.eventhub as eh
7+
# import azurefunctions.extensions.bindings.eventhub as eh
78

89
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
910

1011

11-
# @app.function_name(name="put_bc_trigger")
12-
# @app.blob_output(arg_name="file",
13-
# path="python-worker-tests/test-blobclient-trigger.txt",
14-
# connection="AzureWebJobsStorage")
15-
# @app.route(route="put_bc_trigger")
16-
# def put_bc_trigger(req: func.HttpRequest, file: func.Out[str]) -> str:
17-
# file.set(req.get_body())
18-
# return 'OK'
19-
20-
# An HttpTrigger to generating EventHub event from EventHub Output Binding
21-
@app.function_name(name="eventhub_output")
12+
@app.function_name(name="put_eh_trigger")
2213
@app.event_hub_output(arg_name="event",
2314
event_hub_name="python-worker-ci-eventhub-one",
2415
connection="AzureWebJobsEventHubConnectionString")
25-
@app.route(route="eventhub_output")
26-
def eventhub_output(req: func.HttpRequest, event: func.Out[str]) -> str:
27-
event.set('debug')
16+
@app.route(route="put_eh_trigger")
17+
def put_eh_trigger(req: func.HttpRequest, event: func.Out[str]) -> str:
18+
event.set(req.get_body())
2819
return 'OK'
2920

30-
# This is an actual EventHub trigger which will convert the event data
31-
# into a storage blob.
32-
@app.function_name(name="eventhub_trigger")
33-
@app.event_hub_message_trigger(arg_name="event",
34-
event_hub_name="python-worker-ci-eventhub-one",
35-
connection="AzureWebJobsEventHubConnectionString")
21+
@app.function_name(name="eh_ed_trigger")
22+
@app.event_hub_message_trigger(
23+
arg_name="event",
24+
event_hub_name="python-worker-ci-eventhub-one-metadata",
25+
connection="AzureWebJobsEventHubConnectionString")
3626
@app.blob_output(arg_name="$return",
37-
path="python-worker-tests/test-eventhub-triggered.txt",
27+
path="python-worker-tests/test-metadata-triggered.txt",
3828
connection="AzureWebJobsStorage")
39-
def eventhub_trigger(event: eh.EventData) -> bytes:
40-
return bytes(event.body_as_str())
29+
async def eh_ed_trigger(event: func.EventHubEvent) -> bytes:
30+
event_dict: typing.Mapping[str, typing.Any] = {
31+
'body': event.get_body().decode('utf-8'),
32+
# Uncomment this when the EnqueuedTimeUtc is fixed in azure-functions
33+
# 'enqueued_time': event.enqueued_time.isoformat(),
34+
'partition_key': event.partition_key,
35+
'sequence_number': event.sequence_number,
36+
'offset': event.offset,
37+
'metadata': event.metadata
38+
}
39+
40+
return json.dumps(event_dict)
4141

4242
# Retrieve the event data from storage blob and return it as Http response
43-
@app.function_name(name="get_eventhub_triggered")
44-
@app.route(route="get_eventhub_triggered")
43+
@app.function_name(name="get_eh_ed_triggered")
44+
@app.route(route="get_eh_ed_triggered")
4545
@app.blob_input(arg_name="file",
46-
path="python-worker-tests/test-eventhub-triggered.txt",
46+
path="python-worker-tests/test-metadata-triggered.txt",
4747
connection="AzureWebJobsStorage")
48-
def get_eventhub_triggered(req: func.HttpRequest,
49-
file: func.InputStream) -> str:
50-
return file.read().decode('utf-8')
51-
52-
# @app.event_hub_message_trigger(
53-
# arg_name="event",
54-
# event_hub_name="python-worker-ci-eventhub-one",
55-
# connection="AzureWebJobsEventHubConnectionString"
56-
# )
57-
# def eventhub_trigger(event: eh.EventHubData) -> str:
58-
# return event.body_as_str()
48+
async def get_eh_ed_triggered(req: func.HttpRequest,
49+
file: func.InputStream) -> str:
50+
return func.HttpResponse(body=file.read().decode('utf-8'),
51+
status_code=200,
52+
mimetype='application/json')

tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_eventhub_functions.py

+18-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3-
import os
3+
import json
44
import sys
55
import time
66
import unittest
@@ -23,23 +23,29 @@ def get_libraries_to_install(cls):
2323

2424
@testutils.retryable_test(3, 5)
2525
def test_ed_eventhub_trigger(self):
26-
# data = "DummyData"
27-
28-
r = self.webhost.request('POST', 'eventhub_output',
29-
data="test")
26+
# Generate a unique event body for EventHub eventme
27+
random_number = str(round(time.time()) % 1000)
28+
req_body = {
29+
'body': random_number
30+
}
31+
32+
r = self.webhost.request('POST', 'put_eh_trigger',
33+
data=json.dumps(req_body))
3034
self.assertEqual(r.status_code, 200)
3135
self.assertEqual(r.text, 'OK')
3236

33-
# # Once the event get generated, allow function host to poll from
34-
# # EventHub and wait for eventhub_trigger to execute,
35-
# # converting the event metadata into a blob.
37+
# Once the event get generated, allow function host to poll from
38+
# EventHub and wait for eventhub_trigger to execute,
39+
# converting the event metadata into a blob.
3640
time.sleep(5)
3741

38-
# # Call get_eventhub_triggered to retrieve event metadata from blob.
39-
r = self.webhost.request('GET', 'get_eventhub_triggered')
42+
# Call get_eh_ed_triggered to retrieve event metadata from blob.
43+
r = self.webhost.request('GET', 'get_eh_ed_triggered')
4044

41-
# # Waiting for the blob get updated with the latest data from the
45+
# Waiting for the blob to get updated with the latest data from the
4246
# eventhub output binding
4347
time.sleep(5)
4448
self.assertEqual(r.status_code, 200)
45-
# response = r.json()
49+
50+
event = r.json()
51+
self.assertEqual(event['body'], random_number)

0 commit comments

Comments
 (0)