diff --git a/.coveragerc b/.coveragerc index 48da16e6..21030a82 100644 --- a/.coveragerc +++ b/.coveragerc @@ -38,4 +38,5 @@ omit = */venv/* */.venv/* */.env*/* - */.vscode/* \ No newline at end of file + */.vscode/* + azure/functions/_abc.py \ No newline at end of file diff --git a/azure/functions/_abc.py b/azure/functions/_abc.py index 56e70b2a..ebfc9f3e 100644 --- a/azure/functions/_abc.py +++ b/azure/functions/_abc.py @@ -439,3 +439,117 @@ class OrchestrationContext(abc.ABC): @abc.abstractmethod def body(self) -> str: pass + + +class ServiceBusMessage(abc.ABC): + + @abc.abstractmethod + def get_body(self) -> typing.Union[str, bytes]: + pass + + @property + @abc.abstractmethod + def content_type(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def correlation_id(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def dead_letter_source(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def delivery_count(self) -> typing.Optional[int]: + pass + + @property + @abc.abstractmethod + def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]: + pass + + @property + @abc.abstractmethod + def expires_at_utc(self) -> typing.Optional[datetime.datetime]: + pass + + @property + @abc.abstractmethod + def expiration_time(self) -> typing.Optional[datetime.datetime]: + """(Deprecated, use expires_at_utc instead)""" + pass + + @property + @abc.abstractmethod + def label(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def lock_token(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def message_id(self) -> str: + pass + + @property + @abc.abstractmethod + def partition_key(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def reply_to(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def reply_to_session_id(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def scheduled_enqueue_time(self) -> typing.Optional[datetime.datetime]: + """(Deprecated, use scheduled_enqueue_time_utc instead)""" + pass + + @property + @abc.abstractmethod + def scheduled_enqueue_time_utc(self) -> typing.Optional[datetime.datetime]: + pass + + @property + @abc.abstractmethod + def sequence_number(self) -> typing.Optional[int]: + pass + + @property + @abc.abstractmethod + def session_id(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def time_to_live(self) -> typing.Optional[datetime.timedelta]: + pass + + @property + @abc.abstractmethod + def to(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def user_properties(self) -> typing.Dict[str, typing.Any]: + pass + + @property + @abc.abstractmethod + def metadata(self) -> typing.Optional[typing.Dict[str, typing.Any]]: + pass diff --git a/azure/functions/_servicebus.py b/azure/functions/_servicebus.py index 95449ce9..f6f38829 100644 --- a/azure/functions/_servicebus.py +++ b/azure/functions/_servicebus.py @@ -1,26 +1,39 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -import abc import datetime -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Union +from . import _abc -class ServiceBusMessage(abc.ABC): - @abc.abstractmethod - def get_body(self) -> bytes: - """Get the message body from ServiceBus +class ServiceBusMessage(_abc.ServiceBusMessage): - Returns: - -------- - bytes - The ServiceBus message body in bytes form - """ - pass + """A ServiceBuss message object. + + :param body: + A string or bytes instance specifying the message body. + + :param content_type: + An optional string specifying the content type. + + :param correlation_id: + An optional string specifying the correlation id. + + """ + + def __init__(self, *, + body: Optional[Union[str, bytes]] = None, + content_type: Optional[str] = None, + correlation_id: Optional[str] = None) -> None: + self.__body = b'' + self.__content_type = content_type + self.__correlation_id = correlation_id + + if body is not None: + self.__set_body(body) @property - @abc.abstractmethod def content_type(self) -> Optional[str]: """Optionally describes the payload of the message, with a descriptor following the format of RFC2045 @@ -31,10 +44,9 @@ def content_type(self) -> Optional[str]: If content type is set, returns a string. Otherwise, returns None. """ - pass + return self.__content_type @property - @abc.abstractmethod def correlation_id(self) -> Optional[str]: """Enables an application to specify a context for the message for the purposes of correlation @@ -45,10 +57,9 @@ def correlation_id(self) -> Optional[str]: If correlation id set, returns a string. Otherwise, returns None. """ - pass + return self.__correlation_id @property - @abc.abstractmethod def dead_letter_source(self) -> Optional[str]: """Only set in messages that have been dead-lettered and subsequently auto-forwarded from the dead-letter queue to another entity. @@ -61,10 +72,9 @@ def dead_letter_source(self) -> Optional[str]: If dead letter source is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def delivery_count(self) -> Optional[int]: """Number of deliveries that have been attempted for this message. The count is incremented when a message lock expires, @@ -77,10 +87,9 @@ def delivery_count(self) -> Optional[int]: If delivery count is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def enqueued_time_utc(self) -> Optional[datetime.datetime]: """The UTC instant at which the message has been accepted and stored in the entity. This value can be used as an authoritative and neutral @@ -93,10 +102,9 @@ def enqueued_time_utc(self) -> Optional[datetime.datetime]: If enqueued time utc is set, returns a datetime. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def expires_at_utc(self) -> Optional[datetime.datetime]: """The UTC instant at which the message is marked for removal and no longer available for retrieval from the entity due to its expiration. @@ -109,16 +117,14 @@ def expires_at_utc(self) -> Optional[datetime.datetime]: If expires at utc is set, returns a datetime. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def expiration_time(self) -> Optional[datetime.datetime]: """(Deprecated, use expires_at_utc instead)""" - pass + return None @property - @abc.abstractmethod def label(self) -> Optional[str]: """This property enables the application to indicate the purpose of the message to the receiver in a standardized fashion, similar to an @@ -130,10 +136,9 @@ def label(self) -> Optional[str]: If label is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def lock_token(self) -> Optional[str]: """ The lock token is a reference to the lock that is being held by the broker in peek-lock receive mode. The token can be used to pin the @@ -147,10 +152,9 @@ def lock_token(self) -> Optional[str]: If local token is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def message_id(self) -> str: """The message identifier is an application-defined value that uniquely identifies the message and its payload. @@ -164,10 +168,9 @@ def message_id(self) -> str: str The message identifier """ - pass + return "" @property - @abc.abstractmethod def partition_key(self) -> Optional[str]: """ For partitioned entities, setting this value enables assigning related messages to the same internal partition, so that submission @@ -181,10 +184,9 @@ def partition_key(self) -> Optional[str]: If partition key is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def reply_to(self) -> Optional[str]: """This optional and application-defined value is a standard way to express a reply path to the receiver of the message. When a sender @@ -197,10 +199,9 @@ def reply_to(self) -> Optional[str]: If reply to is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def reply_to_session_id(self) -> Optional[str]: """This value augments the ReplyTo information and specifies which SessionId should be set for the reply when sent to the reply entity. @@ -211,16 +212,14 @@ def reply_to_session_id(self) -> Optional[str]: If reply to session id is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def scheduled_enqueue_time(self) -> Optional[datetime.datetime]: """(Deprecated, use scheduled_enqueue_time_utc instead)""" - pass + return None @property - @abc.abstractmethod def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]: """For messages that are only made available for retrieval after a delay, this property defines the UTC instant at which the message @@ -233,10 +232,9 @@ def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]: If scheduled enqueue time utc is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def sequence_number(self) -> Optional[int]: """The sequence number is a unique 64-bit integer assigned to a message as it is accepted and stored by the broker and functions as its true @@ -251,10 +249,9 @@ def sequence_number(self) -> Optional[int]: If sequence number is set, returns an integer. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def session_id(self) -> Optional[str]: """For session-aware entities, this application-defined value specifies the session affiliation of the message. Messages with the @@ -268,10 +265,9 @@ def session_id(self) -> Optional[str]: If session id is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def time_to_live(self) -> Optional[datetime.timedelta]: """ This value is the relative duration after which the message expires, starting from the instant the message has been accepted and @@ -287,10 +283,9 @@ def time_to_live(self) -> Optional[datetime.timedelta]: If time to live is set, returns a timedelta. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def to(self) -> Optional[str]: """ This property is reserved for future use in routing scenarios and currently ignored by the broker itself. Applications can use this @@ -303,10 +298,9 @@ def to(self) -> Optional[str]: If the recipient is set, returns a string. Otherwise, returns None. """ - pass + return None @property - @abc.abstractmethod def user_properties(self) -> Dict[str, Any]: """Contains user defined message properties. @@ -316,10 +310,9 @@ def user_properties(self) -> Dict[str, Any]: If user has set properties for the message, returns a dictionary. If nothing is set, returns an empty dictionary. """ - pass + return {} @property - @abc.abstractmethod def metadata(self) -> Optional[Dict[str, Any]]: """Getting read-only trigger metadata in a Python dictionary. @@ -334,4 +327,19 @@ def metadata(self) -> Optional[Dict[str, Any]]: Dict[str, object] Return the Python dictionary of trigger metadata """ - pass + return None + + def __set_body(self, body): + if isinstance(body, str): + body = body.encode('utf-8') + + if not isinstance(body, (bytes, bytearray)): + raise TypeError( + f'response is expected to be either of ' + f'str, bytes, or bytearray, got {type(body).__name__}') + + self.__body = bytes(body) + + def get_body(self) -> bytes: + """Return message content as bytes.""" + return self.__body diff --git a/azure/functions/servicebus.py b/azure/functions/servicebus.py index 8c395e1b..28ca36cd 100644 --- a/azure/functions/servicebus.py +++ b/azure/functions/servicebus.py @@ -34,8 +34,9 @@ def __init__( session_id: Optional[str] = None, time_to_live: Optional[datetime.timedelta] = None, to: Optional[str] = None, - user_properties: Dict[str, object]) -> None: - + user_properties: Dict[str, object]): + super().__init__(body=body, content_type=content_type, + correlation_id=correlation_id) self.__body = body self.__trigger_metadata = trigger_metadata self.__content_type = content_type diff --git a/tests/test_servicebus.py b/tests/test_servicebus.py index 258779cd..5784a42d 100644 --- a/tests/test_servicebus.py +++ b/tests/test_servicebus.py @@ -4,7 +4,7 @@ from typing import Dict, List import json import unittest -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date import azure.functions as func import azure.functions.servicebus as azf_sb @@ -38,6 +38,133 @@ class TestServiceBus(unittest.TestCase): MOCKED_AZURE_PARTNER_ID = '6ceef68b-0794-45dd-bb2e-630748515552' + def test_servicebusmessage_initialize_without_args(self): + # given + expected_body = b"" + expexceted_content_type = None + expected_correlation_id = None + + # when + test_sb_message = func.ServiceBusMessage() + + # then + assert expected_body == test_sb_message.get_body() + assert expexceted_content_type == test_sb_message.content_type + assert expected_correlation_id == test_sb_message.correlation_id + + def test_servicebusmessage_initialize_all_arguments(self): + # given + expected_body: bytes = b"Body" + expected_content_type: str = "Content Type" + expected_correlation_id: str = "Correlation ID" + + # when + test_sb_message = func.ServiceBusMessage( + body=expected_body, + content_type=expected_content_type, + correlation_id=expected_correlation_id + ) + + # then + assert expected_body == test_sb_message.get_body() + assert expected_content_type == test_sb_message.content_type + assert expected_correlation_id == test_sb_message.correlation_id + self.assertIsNone(test_sb_message.dead_letter_source) + self.assertIsNone(test_sb_message.delivery_count) + self.assertIsNone(test_sb_message.enqueued_time_utc) + self.assertIsNone(test_sb_message.expires_at_utc) + self.assertIsNone(test_sb_message.expiration_time) + self.assertIsNone(test_sb_message.label) + self.assertIsNone(test_sb_message.lock_token) + assert "" == test_sb_message.message_id + self.assertIsNone(test_sb_message.partition_key) + self.assertIsNone(test_sb_message.reply_to) + self.assertIsNone(test_sb_message.reply_to_session_id) + self.assertIsNone(test_sb_message.scheduled_enqueue_time) + self.assertIsNone(test_sb_message.scheduled_enqueue_time_utc) + self.assertIsNone(test_sb_message.sequence_number) + self.assertIsNone(test_sb_message.session_id) + self.assertIsNone(test_sb_message.time_to_live) + self.assertIsNone(test_sb_message.to) + self.assertDictEqual(test_sb_message.user_properties, {}) + self.assertIsNone(test_sb_message.metadata) + + def test_servicebus_message_initialize_all_args(self): + # given + body = "body" + trigger_metadata = "trigger metadata" + content_type = "content type" + correlation_id = "correlation id" + dead_letter_source = "dead letter source" + delivery_count = 1 + enqueued_time_utc = date(2022, 5, 1) + expires_at_utc = date(2022, 5, 1) + label = "label" + lock_token = "lock token" + message_id = "message id" + partition_key = "partition key" + reply_to = "reply to" + reply_to_session_id = "reply to session id" + scheduled_enqueue_time_utc = date(2022, 5, 1) + sequence_number = 1 + session_id = "session id" + time_to_live = timedelta(hours=1) + to = "to" + user_properties = {"user": "properties"} + + # when + sb_message = azf_sb.ServiceBusMessage( + body=body, + trigger_metadata=trigger_metadata, + content_type=content_type, + correlation_id=correlation_id, + dead_letter_source=dead_letter_source, + delivery_count=delivery_count, + enqueued_time_utc=enqueued_time_utc, + expires_at_utc=expires_at_utc, + label=label, + lock_token=lock_token, + message_id=message_id, + partition_key=partition_key, + reply_to=reply_to, + reply_to_session_id=reply_to_session_id, + scheduled_enqueue_time_utc=scheduled_enqueue_time_utc, + sequence_number=sequence_number, + session_id=session_id, + time_to_live=time_to_live, + to=to, + user_properties=user_properties) + + # then + self.assertEqual(sb_message.get_body(), body) + self.assertEqual(sb_message.content_type, content_type) + self.assertEqual(sb_message.correlation_id, correlation_id) + self.assertEqual(sb_message.dead_letter_source, dead_letter_source) + self.assertEqual(sb_message.delivery_count, delivery_count) + self.assertEqual(sb_message.enqueued_time_utc, enqueued_time_utc) + self.assertEqual(sb_message.expires_at_utc, expires_at_utc) + self.assertEqual(sb_message.label, label) + self.assertEqual(sb_message.lock_token, lock_token) + self.assertEqual(sb_message.message_id, message_id) + self.assertEqual(sb_message.partition_key, partition_key) + self.assertEqual(sb_message.reply_to, reply_to) + self.assertEqual(sb_message.reply_to_session_id, reply_to_session_id) + self.assertEqual(sb_message.scheduled_enqueue_time_utc, + scheduled_enqueue_time_utc) + self.assertEqual(sb_message.sequence_number, sequence_number) + self.assertEqual(sb_message.session_id, session_id) + self.assertEqual(sb_message.time_to_live, time_to_live) + self.assertEqual(sb_message.to, to) + self.assertEqual(sb_message.user_properties, user_properties) + + def test_abstract_servicebus_message(self): + test_sb_message = func.ServiceBusMessage() + abstract_sb_message = func._abc.ServiceBusMessage + + self.assertIsInstance(test_sb_message, abstract_sb_message) + with self.assertRaises(TypeError): + func._abc.ServiceBusMessage() + def test_servicebus_input_type(self): check_input_type = ( azf_sb.ServiceBusMessageInConverter.check_input_type_annotation