From bccfc56f6bc820d8545a152389ddb27d0bead705 Mon Sep 17 00:00:00 2001 From: hallvictoria Date: Thu, 4 Jan 2024 15:26:31 -0600 Subject: [PATCH 1/3] added additional properties to sb class --- azure/functions/_abc.py | 40 +++++++++++ azure/functions/_servicebus.py | 118 +++++++++++++++++++++++++++++++ azure/functions/servicebus.py | 81 +++++++++++++++++++++ tests/test_servicebus.py | 125 ++++++++++++++++++++++++++++++++- 4 files changed, 363 insertions(+), 1 deletion(-) diff --git a/azure/functions/_abc.py b/azure/functions/_abc.py index ebfc9f3e..17b4822c 100644 --- a/azure/functions/_abc.py +++ b/azure/functions/_abc.py @@ -447,6 +447,11 @@ class ServiceBusMessage(abc.ABC): def get_body(self) -> typing.Union[str, bytes]: pass + @property + @abc.abstractmethod + def application_properties(self) -> typing.Dict[str, typing.Any]: + pass + @property @abc.abstractmethod def content_type(self) -> typing.Optional[str]: @@ -457,6 +462,16 @@ def content_type(self) -> typing.Optional[str]: def correlation_id(self) -> typing.Optional[str]: pass + @property + @abc.abstractmethod + def dead_letter_error_description(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def dead_letter_reason(self) -> typing.Optional[str]: + pass + @property @abc.abstractmethod def dead_letter_source(self) -> typing.Optional[str]: @@ -467,6 +482,11 @@ def dead_letter_source(self) -> typing.Optional[str]: def delivery_count(self) -> typing.Optional[int]: pass + @property + @abc.abstractmethod + def enqueued_sequence_number(self) -> typing.Optional[int]: + pass + @property @abc.abstractmethod def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]: @@ -488,6 +508,11 @@ def expiration_time(self) -> typing.Optional[datetime.datetime]: def label(self) -> typing.Optional[str]: pass + @property + @abc.abstractmethod + def locked_until(self) -> typing.Optional[datetime.datetime]: + pass + @property @abc.abstractmethod def lock_token(self) -> typing.Optional[str]: @@ -534,6 +559,16 @@ def sequence_number(self) -> typing.Optional[int]: def session_id(self) -> typing.Optional[str]: pass + @property + @abc.abstractmethod + def state(self) -> typing.Optional[int]: + pass + + @property + @abc.abstractmethod + def subject(self) -> typing.Optional[str]: + pass + @property @abc.abstractmethod def time_to_live(self) -> typing.Optional[datetime.timedelta]: @@ -544,6 +579,11 @@ def time_to_live(self) -> typing.Optional[datetime.timedelta]: def to(self) -> typing.Optional[str]: pass + @property + @abc.abstractmethod + def transaction_partition_key(self) -> typing.Optional[str]: + pass + @property @abc.abstractmethod def user_properties(self) -> typing.Dict[str, typing.Any]: diff --git a/azure/functions/_servicebus.py b/azure/functions/_servicebus.py index f6f38829..cd3efab7 100644 --- a/azure/functions/_servicebus.py +++ b/azure/functions/_servicebus.py @@ -33,6 +33,20 @@ def __init__(self, *, if body is not None: self.__set_body(body) + @property + def application_properties(self) -> Dict[str, Any]: + """Gets the application properties bag, which can be used for + custom message metadata. + + Returns: + -------- + Dict[str, Any]: + If user has set application properties for the message, + returns a dictionary. + If nothing is set, returns an empty dictionary. + """ + return {} + @property def content_type(self) -> Optional[str]: """Optionally describes the payload of the message, @@ -59,6 +73,30 @@ def correlation_id(self) -> Optional[str]: """ return self.__correlation_id + @property + def dead_letter_error_description(self) -> Optional[str]: + """Optionally describes the dead letter error description for the message. + + Returns: + -------- + Optional[str] + If dead letter error description is set, returns a string. + Otherwise, returns None. + """ + return None + + @property + def dead_letter_reason(self) -> Optional[str]: + """Optionally describes the dead letter reason description for the message. + + Returns: + -------- + Optional[str] + If dead letter reason description is set, returns a string. + Otherwise, returns None. + """ + return None + @property def dead_letter_source(self) -> Optional[str]: """Only set in messages that have been dead-lettered and subsequently @@ -89,6 +127,21 @@ def delivery_count(self) -> Optional[int]: """ return None + @property + def enqueued_sequence_number(self) -> Optional[int]: + """For messages that have been auto-forwarded, this property reflects + the sequence number that had first been assigned to the message at its + original point of submission. This property is read-only. Optionally + describes the enqueued sequence number of the message. + + Returns: + -------- + Optional[int] + If enqueued sequence number is set, returns an integer. + Otherwise, returns None. + """ + return None + @property def enqueued_time_utc(self) -> Optional[datetime.datetime]: """The UTC instant at which the message has been accepted and stored @@ -138,6 +191,24 @@ def label(self) -> Optional[str]: """ return None + @property + def locked_until(self) -> Optional[datetime.datetime]: + """For messages retrieved under a lock (peek-lock receive mode, not + pre-settled) this property reflects the UTC instant until which the + message is held locked in the queue/subscription. When the lock + expires, the DeliveryCount is incremented and the message is again + available for retrieval. This property is read-only.Optionally + describes the date and time in UTC until which the message will be + locked in the queue/subscription. + + Returns: + -------- + Optional[datetime.datetime] + If locked until is set, returns a datetime. + Otherwise, returns None. + """ + return None + @property def lock_token(self) -> Optional[str]: """ The lock token is a reference to the lock that is being held by @@ -267,6 +338,37 @@ def session_id(self) -> Optional[str]: """ return None + @property + def state(self) -> Optional[int]: + """The state of the message can be Active, Deferred, or Scheduled. + Deferred messages have Deferred state, scheduled messages have + Scheduled state, all other messages have Active state. States are + represented by corresponding integer values. Active = 0, + Deferred = 1, Scheduled = 2. + + Returns: + -------- + Optional[int] + If state is set, returns an integer. + Otherwise, returns None. + """ + return None + + @property + def subject(self) -> Optional[str]: + """This property enables the application to indicate the purpose of the + message to the receiver in a standardized fashion, similar to an email + subject line. The mapped AMQP property is "subject". Optionally + describes the application specific label. + + Returns: + -------- + Optional[str] + If subject is set, returns a string. + Otherwise, returns None. + """ + return None + @property def time_to_live(self) -> Optional[datetime.timedelta]: """ This value is the relative duration after which the message @@ -300,6 +402,22 @@ def to(self) -> Optional[str]: """ return None + @property + def transaction_partition_key(self) -> Optional[str]: + """If a message is sent via a transfer queue in the scope of a transaction, + this value selects the transfer queue partition: This is functionally + equivalent to PartitionKey and ensures that messages are kept together + and in order as they are transferred. Optionally describes the + partition key. Maximum length is 128 characters. + + Returns: + -------- + Optional[str] + If transaction partition key is set, returns a string. + Otherwise, returns None. + """ + return None + @property def user_properties(self) -> Dict[str, Any]: """Contains user defined message properties. diff --git a/azure/functions/servicebus.py b/azure/functions/servicebus.py index 28ca36cd..611bdc34 100644 --- a/azure/functions/servicebus.py +++ b/azure/functions/servicebus.py @@ -17,13 +17,18 @@ def __init__( self, *, body: bytes, trigger_metadata: Optional[Mapping[str, Any]] = None, + application_properties: Dict[str, Any], content_type: Optional[str] = None, correlation_id: Optional[str] = None, + dead_letter_error_description: Optional[str] = None, + dead_letter_reason: Optional[str] = None, dead_letter_source: Optional[str] = None, delivery_count: Optional[int] = None, + enqueued_sequence_number: Optional[int] = None, enqueued_time_utc: Optional[datetime.datetime] = None, expires_at_utc: Optional[datetime.datetime] = None, label: Optional[str] = None, + locked_until: Optional[datetime.datetime] = None, lock_token: Optional[str] = None, message_id: str, partition_key: Optional[str] = None, @@ -32,20 +37,28 @@ def __init__( scheduled_enqueue_time_utc: Optional[datetime.datetime] = None, sequence_number: Optional[int] = None, session_id: Optional[str] = None, + state: Optional[int] = None, + subject: Optional[str] = None, time_to_live: Optional[datetime.timedelta] = None, to: Optional[str] = None, + transaction_partition_key: Optional[str] = None, user_properties: Dict[str, object]): super().__init__(body=body, content_type=content_type, correlation_id=correlation_id) self.__body = body self.__trigger_metadata = trigger_metadata + self.__application_properties = application_properties self.__content_type = content_type self.__correlation_id = correlation_id + self.__dead_letter_error_description = dead_letter_error_description + self.__dead_letter_reason = dead_letter_reason self.__dead_letter_source = dead_letter_source self.__delivery_count = delivery_count + self.__enqueued_sequence_number = enqueued_sequence_number self.__enqueued_time_utc = enqueued_time_utc self.__expires_at_utc = expires_at_utc self.__label = label + self.__locked_until = locked_until self.__lock_token = lock_token self.__message_id = message_id self.__partition_key = partition_key @@ -54,8 +67,11 @@ def __init__( self.__scheduled_enqueue_time_utc = scheduled_enqueue_time_utc self.__sequence_number = sequence_number self.__session_id = session_id + self.__state = state + self.__subject = subject self.__time_to_live = time_to_live self.__to = to + self.__transaction_partition_key = transaction_partition_key self.__user_properties = user_properties # Cache for trigger metadata after Python object conversion @@ -64,6 +80,10 @@ def __init__( def get_body(self) -> bytes: return self.__body + @property + def application_properties(self) -> Dict[str, Any]: + return self.__application_properties + @property def content_type(self) -> Optional[str]: return self.__content_type @@ -72,14 +92,26 @@ def content_type(self) -> Optional[str]: def correlation_id(self) -> Optional[str]: return self.__correlation_id + @property + def dead_letter_error_description(self) -> Optional[str]: + return self.__dead_letter_error_description + @property def dead_letter_source(self) -> Optional[str]: return self.__dead_letter_source + @property + def dead_letter_reason(self) -> Optional[str]: + return self.__dead_letter_reason + @property def delivery_count(self) -> Optional[int]: return self.__delivery_count + @property + def enqueued_sequence_number(self) -> Optional[int]: + return self.__enqueued_sequence_number + @property def enqueued_time_utc(self) -> Optional[datetime.datetime]: return self.__enqueued_time_utc @@ -103,6 +135,10 @@ def force_persistence(self) -> Optional[bool]: def label(self) -> Optional[str]: return self.__label + @property + def locked_until(self) -> Optional[datetime.datetime]: + return self.__locked_until + @property def lock_token(self) -> Optional[str]: return self.__lock_token @@ -140,6 +176,14 @@ def sequence_number(self) -> Optional[int]: def session_id(self) -> Optional[str]: return self.__session_id + @property + def state(self) -> Optional[int]: + return self.__state + + @property + def subject(self) -> Optional[str]: + return self.__subject + @property def time_to_live(self) -> Optional[datetime.timedelta]: return self.__time_to_live @@ -148,6 +192,10 @@ def time_to_live(self) -> Optional[datetime.timedelta]: def to(self) -> Optional[str]: return self.__to + @property + def transaction_partition_key(self) -> Optional[str]: + return self.__transaction_partition_key + @property def user_properties(self) -> Dict[str, object]: return self.__user_properties @@ -244,20 +292,31 @@ def decode_single_message( return ServiceBusMessage( body=body, trigger_metadata=trigger_metadata, + application_properties=cls._decode_trigger_metadata_field( + trigger_metadata, 'ApplicationProperties', python_type=dict), content_type=cls._decode_trigger_metadata_field( trigger_metadata, 'ContentType', python_type=str), correlation_id=cls._decode_trigger_metadata_field( trigger_metadata, 'CorrelationId', python_type=str), + dead_letter_error_description=cls._decode_trigger_metadata_field( + trigger_metadata, 'DeadLetterErrorDescription', + python_type=str), + dead_letter_reason=cls._decode_trigger_metadata_field( + trigger_metadata, 'DeadLetterReason', python_type=str), dead_letter_source=cls._decode_trigger_metadata_field( trigger_metadata, 'DeadLetterSource', python_type=str), delivery_count=cls._decode_trigger_metadata_field( trigger_metadata, 'DeliveryCount', python_type=int), + enqueued_sequence_number=cls._decode_trigger_metadata_field( + trigger_metadata, 'EnqueuedSequenceNumber', python_type=int), enqueued_time_utc=cls._parse_datetime_metadata( trigger_metadata, 'EnqueuedTimeUtc'), expires_at_utc=cls._parse_datetime_metadata( trigger_metadata, 'ExpiresAtUtc'), label=cls._decode_trigger_metadata_field( trigger_metadata, 'Label', python_type=str), + locked_until=cls._parse_datetime_metadata( + trigger_metadata, 'LockedUntil'), lock_token=cls._decode_trigger_metadata_field( trigger_metadata, 'LockToken', python_type=str), message_id=cls._decode_trigger_metadata_field( @@ -274,10 +333,16 @@ def decode_single_message( trigger_metadata, 'SequenceNumber', python_type=int), session_id=cls._decode_trigger_metadata_field( trigger_metadata, 'SessionId', python_type=str), + state=cls._decode_trigger_metadata_field( + trigger_metadata, 'State', python_type=int), + subject=cls._decode_trigger_metadata_field( + trigger_metadata, 'Subject', python_type=str), time_to_live=cls._parse_timedelta_metadata( trigger_metadata, 'TimeToLive'), to=cls._decode_trigger_metadata_field( trigger_metadata, 'To', python_type=str), + transaction_partition_key=cls._decode_trigger_metadata_field( + trigger_metadata, 'TransactionPartitionKey', python_type=str), user_properties=cls._decode_trigger_metadata_field( trigger_metadata, 'UserProperties', python_type=dict), ) @@ -381,14 +446,22 @@ def _extract_messages( messages.append(ServiceBusMessage( body=message_bodies[i], trigger_metadata=trigger_metadata, + application_properties=cls._get_from_metadata_array( + trigger_metadata, 'ApplicationProperties', i), content_type=cls._get_from_metadata_array( trigger_metadata, 'ContentTypeArray', i), correlation_id=cls._get_from_metadata_array( trigger_metadata, 'CorrelationIdArray', i), + dead_letter_error_description=cls._get_from_metadata_array( + trigger_metadata, 'DeadLetterErrorDescriptionArray', i), + dead_letter_reason=cls._get_from_metadata_array( + trigger_metadata, 'DeadLetterReasonArray', i), dead_letter_source=cls._get_from_metadata_array( trigger_metadata, 'DeadLetterSourceArray', i), delivery_count=cls._get_from_metadata_array( trigger_metadata, 'DeliveryCountArray', i), + enqueued_sequence_number=cls._get_from_metadata_array( + trigger_metadata, 'EnqueuedSequenceNumberArray', i), enqueued_time_utc=cls._parse_datetime( cls._get_from_metadata_array( trigger_metadata, 'EnqueuedTimeUtcArray', i)), @@ -397,6 +470,8 @@ def _extract_messages( trigger_metadata, 'ExpiresAtUtcArray', i)), label=cls._get_from_metadata_array( trigger_metadata, 'LabelArray', i), + locked_until=cls._parse_datetime(cls._get_from_metadata_array( + trigger_metadata, 'LockedUntilArray', i)), lock_token=cls._get_from_metadata_array( trigger_metadata, 'LockTokenArray', i), message_id=cls._get_from_metadata_array( @@ -412,11 +487,17 @@ def _extract_messages( trigger_metadata, 'SequenceNumberArray', i), session_id=cls._get_from_metadata_array( trigger_metadata, 'SessionIdArray', i), + state=cls._get_from_metadata_array( + trigger_metadata, 'StateArray', i), + subject=cls._get_from_metadata_array( + trigger_metadata, 'SubjectArray', i), time_to_live=cls._parse_timedelta( cls._get_from_metadata_array( trigger_metadata, 'TimeToLiveArray', i)), to=cls._get_from_metadata_array( trigger_metadata, 'ToArray', i), + transaction_partition_key=cls._get_from_metadata_array( + trigger_metadata, 'TransactionPartitionKeyArray', i), reply_to=cls._get_from_metadata_array( trigger_metadata, 'ReplyToArray', i), user_properties=cls._get_from_metadata_array( diff --git a/tests/test_servicebus.py b/tests/test_servicebus.py index 5784a42d..34465ae5 100644 --- a/tests/test_servicebus.py +++ b/tests/test_servicebus.py @@ -16,11 +16,16 @@ class TestServiceBus(unittest.TestCase): MOCKED_CONTENT_TYPE = 'application/json' MOCKED_CORROLATION_ID = '87c66eaf88e84119b66a26278a7b4149' + MOCKED_DEADLETTER_ERROR_DESCRIPTION = \ + 'mocked_dead_letter_error_description' + MOCKED_DEADLETTER_REASON = 'mocked_dead_letter_reason' MOCKED_DEADLETTER_SOURCE = 'mocked_dead_letter_source' MOCKED_DELIVERY_COUNT = 571 + MOCKED_ENQUEUED_SEQUENCE_NUMBER = 15 MOCKED_ENQUEUE_TIME_UTC = datetime.utcnow() MOCKED_EXPIRY_AT_UTC = datetime.utcnow() MOCKED_LABEL = 'mocked_label' + MOCKED_LOCKED_UNTIL = datetime.utcnow() MOCKED_LOCK_TOKEN = '87931fd2-39f4-415a-9fdc-adfdcbed3148' MOCKED_MESSAGE_ID = 'abcee18397398d93891830a0aac89eed' MOCKED_MESSAGE_ID_A = 'aaaaa18397398d93891830a0aac89eed' @@ -32,9 +37,12 @@ class TestServiceBus(unittest.TestCase): MOCKED_SCHEDULED_ENQUEUE_TIME_UTC = datetime.utcnow() MOCKED_SEQUENCE_NUMBER = 38291 MOCKED_SESSION_ID = 'mocked_session_id' + MOCKED_STATE = 1 + MOCKED_SUBJECT = 'mocked_subject' MOCKED_TIME_TO_LIVE = '11:22:33' MOCKED_TIME_TO_LIVE_TIMEDELTA = timedelta(hours=11, minutes=22, seconds=33) MOCKED_TO = 'mocked_to' + MOCKED_TRANSITION_PARTITION_KEY = 'mocked_transition_partition_key' MOCKED_AZURE_PARTNER_ID = '6ceef68b-0794-45dd-bb2e-630748515552' @@ -69,12 +77,17 @@ def test_servicebusmessage_initialize_all_arguments(self): assert expected_body == test_sb_message.get_body() assert expected_content_type == test_sb_message.content_type assert expected_correlation_id == test_sb_message.correlation_id + self.assertDictEqual(test_sb_message.application_properties, {}) + self.assertIsNone(test_sb_message.dead_letter_error_description) + self.assertIsNone(test_sb_message.dead_letter_reason) self.assertIsNone(test_sb_message.dead_letter_source) self.assertIsNone(test_sb_message.delivery_count) + self.assertIsNone(test_sb_message.enqueued_sequence_number) self.assertIsNone(test_sb_message.enqueued_time_utc) self.assertIsNone(test_sb_message.expires_at_utc) self.assertIsNone(test_sb_message.expiration_time) self.assertIsNone(test_sb_message.label) + self.assertIsNone(test_sb_message.locked_until) self.assertIsNone(test_sb_message.lock_token) assert "" == test_sb_message.message_id self.assertIsNone(test_sb_message.partition_key) @@ -84,8 +97,11 @@ def test_servicebusmessage_initialize_all_arguments(self): self.assertIsNone(test_sb_message.scheduled_enqueue_time_utc) self.assertIsNone(test_sb_message.sequence_number) self.assertIsNone(test_sb_message.session_id) + self.assertIsNone(test_sb_message.state) + self.assertIsNone(test_sb_message.subject) self.assertIsNone(test_sb_message.time_to_live) self.assertIsNone(test_sb_message.to) + self.assertIsNone(test_sb_message.transaction_partition_key) self.assertDictEqual(test_sb_message.user_properties, {}) self.assertIsNone(test_sb_message.metadata) @@ -93,13 +109,18 @@ def test_servicebus_message_initialize_all_args(self): # given body = "body" trigger_metadata = "trigger metadata" + application_properties = {"application": "properties"} content_type = "content type" correlation_id = "correlation id" + dead_letter_error_description = "dead letter error description" + dead_letter_reason = "dead letter reason" dead_letter_source = "dead letter source" delivery_count = 1 + enqueued_sequence_number = 1 enqueued_time_utc = date(2022, 5, 1) expires_at_utc = date(2022, 5, 1) label = "label" + locked_until = date(2022, 5, 1) lock_token = "lock token" message_id = "message id" partition_key = "partition key" @@ -108,21 +129,29 @@ def test_servicebus_message_initialize_all_args(self): scheduled_enqueue_time_utc = date(2022, 5, 1) sequence_number = 1 session_id = "session id" + state = 1 + subject = "subject" time_to_live = timedelta(hours=1) to = "to" + transaction_partition_key = "transaction partition key" user_properties = {"user": "properties"} # when sb_message = azf_sb.ServiceBusMessage( body=body, trigger_metadata=trigger_metadata, + application_properties=application_properties, content_type=content_type, correlation_id=correlation_id, + dead_letter_error_description=dead_letter_error_description, + dead_letter_reason=dead_letter_reason, dead_letter_source=dead_letter_source, delivery_count=delivery_count, + enqueued_sequence_number=enqueued_sequence_number, enqueued_time_utc=enqueued_time_utc, expires_at_utc=expires_at_utc, label=label, + locked_until=locked_until, lock_token=lock_token, message_id=message_id, partition_key=partition_key, @@ -131,19 +160,30 @@ def test_servicebus_message_initialize_all_args(self): scheduled_enqueue_time_utc=scheduled_enqueue_time_utc, sequence_number=sequence_number, session_id=session_id, + state=state, + subject=subject, time_to_live=time_to_live, to=to, + transaction_partition_key=transaction_partition_key, user_properties=user_properties) # then self.assertEqual(sb_message.get_body(), body) + self.assertEqual(sb_message.application_properties, + application_properties) self.assertEqual(sb_message.content_type, content_type) self.assertEqual(sb_message.correlation_id, correlation_id) + self.assertEqual(sb_message.dead_letter_error_description, + dead_letter_error_description) + self.assertEqual(sb_message.dead_letter_reason, dead_letter_reason) self.assertEqual(sb_message.dead_letter_source, dead_letter_source) self.assertEqual(sb_message.delivery_count, delivery_count) + self.assertEqual(sb_message.enqueued_sequence_number, + enqueued_sequence_number) self.assertEqual(sb_message.enqueued_time_utc, enqueued_time_utc) self.assertEqual(sb_message.expires_at_utc, expires_at_utc) self.assertEqual(sb_message.label, label) + self.assertEqual(sb_message.locked_until, locked_until) self.assertEqual(sb_message.lock_token, lock_token) self.assertEqual(sb_message.message_id, message_id) self.assertEqual(sb_message.partition_key, partition_key) @@ -153,8 +193,12 @@ def test_servicebus_message_initialize_all_args(self): scheduled_enqueue_time_utc) self.assertEqual(sb_message.sequence_number, sequence_number) self.assertEqual(sb_message.session_id, session_id) + self.assertEqual(sb_message.state, state) + self.assertEqual(sb_message.subject, subject) self.assertEqual(sb_message.time_to_live, time_to_live) self.assertEqual(sb_message.to, to) + self.assertEqual(sb_message.transaction_partition_key, + transaction_partition_key) self.assertEqual(sb_message.user_properties, user_properties) def test_abstract_servicebus_message(self): @@ -237,12 +281,18 @@ def test_servicebus_properties(self): self.assertEqual(msg.get_body(), b'body_bytes') # Test individual ServiceBus properties respectively + self.assertEqual(msg.application_properties, {}) self.assertEqual(msg.content_type, self.MOCKED_CONTENT_TYPE) self.assertEqual(msg.correlation_id, self.MOCKED_CORROLATION_ID) + self.assertEqual(msg.dead_letter_error_description, + self.MOCKED_DEADLETTER_ERROR_DESCRIPTION) + self.assertEqual(msg.dead_letter_reason, self.MOCKED_DEADLETTER_REASON) self.assertEqual(msg.dead_letter_source, self.MOCKED_DEADLETTER_SOURCE) + self.assertEqual(msg.enqueued_sequence_number, + self.MOCKED_ENQUEUED_SEQUENCE_NUMBER) self.assertEqual(msg.enqueued_time_utc, self.MOCKED_ENQUEUE_TIME_UTC) self.assertEqual(msg.expires_at_utc, @@ -251,6 +301,10 @@ def test_servicebus_properties(self): self.MOCKED_EXPIRY_AT_UTC) self.assertEqual(msg.label, self.MOCKED_LABEL) + self.assertEqual(msg.locked_until, + self.MOCKED_LOCKED_UNTIL) + self.assertEqual(msg.lock_token, + self.MOCKED_LOCK_TOKEN) self.assertEqual(msg.message_id, self.MOCKED_MESSAGE_ID) self.assertEqual(msg.partition_key, @@ -265,10 +319,16 @@ def test_servicebus_properties(self): self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC) self.assertEqual(msg.session_id, self.MOCKED_SESSION_ID) + self.assertEqual(msg.state, + self.MOCKED_STATE) + self.assertEqual(msg.subject, + self.MOCKED_SUBJECT) self.assertEqual(msg.time_to_live, self.MOCKED_TIME_TO_LIVE_TIMEDELTA) self.assertEqual(msg.to, self.MOCKED_TO) + self.assertEqual(msg.transaction_partition_key, + self.MOCKED_TRANSITION_PARTITION_KEY) self.assertDictEqual(msg.user_properties, { '$AzureWebJobsParentId': self.MOCKED_AZURE_PARTNER_ID, 'x-opt-enqueue-sequence-number': 0 @@ -385,13 +445,19 @@ def test_multiple_servicebus_trigger_properties(self): msg = servicebus_msgs[i] body_data = msg.get_body().decode('utf-8') self.assertEqual(body_data, expceted_bodies[i]) - + self.assertEqual(msg.application_properties, {}) self.assertEqual(msg.content_type, self.MOCKED_CONTENT_TYPE) self.assertEqual(msg.correlation_id, self.MOCKED_CORROLATION_ID) + self.assertEqual(msg.dead_letter_error_description, + self.MOCKED_DEADLETTER_ERROR_DESCRIPTION) + self.assertEqual(msg.dead_letter_reason, + self.MOCKED_DEADLETTER_REASON) self.assertEqual(msg.dead_letter_source, self.MOCKED_DEADLETTER_SOURCE) + self.assertEqual(msg.enqueued_sequence_number, + self.MOCKED_ENQUEUED_SEQUENCE_NUMBER) self.assertEqual(msg.enqueued_time_utc, self.MOCKED_ENQUEUE_TIME_UTC) self.assertEqual(msg.expires_at_utc, @@ -400,6 +466,10 @@ def test_multiple_servicebus_trigger_properties(self): self.MOCKED_EXPIRY_AT_UTC) self.assertEqual(msg.label, self.MOCKED_LABEL) + self.assertEqual(msg.locked_until, + self.MOCKED_LOCKED_UNTIL) + self.assertEqual(msg.lock_token, + self.MOCKED_LOCK_TOKEN) self.assertEqual(msg.message_id, expected_message_ids[i]) self.assertEqual(msg.partition_key, @@ -414,10 +484,16 @@ def test_multiple_servicebus_trigger_properties(self): self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC) self.assertEqual(msg.session_id, self.MOCKED_SESSION_ID) + self.assertEqual(msg.state, + self.MOCKED_STATE) + self.assertEqual(msg.subject, + self.MOCKED_SUBJECT) self.assertEqual(msg.time_to_live, self.MOCKED_TIME_TO_LIVE_TIMEDELTA) self.assertEqual(msg.to, self.MOCKED_TO) + self.assertEqual(msg.transaction_partition_key, + self.MOCKED_TRANSITION_PARTITION_KEY) self.assertDictEqual(msg.user_properties, { '$AzureWebJobsParentId': self.MOCKED_AZURE_PARTNER_ID, 'x-opt-enqueue-sequence-number': 0 @@ -468,12 +544,21 @@ def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: 'CorrelationId': meta.Datum( self.MOCKED_CORROLATION_ID, 'string' ), + 'DeadLetterErrorDescription': meta.Datum( + self.MOCKED_DEADLETTER_ERROR_DESCRIPTION, 'string' + ), + 'DeadLetterReason': meta.Datum( + self.MOCKED_DEADLETTER_REASON, 'string' + ), 'DeadLetterSource': meta.Datum( self.MOCKED_DEADLETTER_SOURCE, 'string' ), 'DeliveryCount': meta.Datum( self.MOCKED_DELIVERY_COUNT, 'int' ), + 'EnqueuedSequenceNumber': meta.Datum( + self.MOCKED_ENQUEUED_SEQUENCE_NUMBER, 'int' + ), 'EnqueuedTimeUtc': meta.Datum( self.MOCKED_ENQUEUE_TIME_UTC.isoformat(), 'string' ), @@ -484,6 +569,9 @@ def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: 'Label': meta.Datum( self.MOCKED_LABEL, 'string' ), + 'LockedUntil': meta.Datum( + self.MOCKED_LOCKED_UNTIL.isoformat(), 'string' + ), 'LockToken': meta.Datum( self.MOCKED_LOCK_TOKEN, 'string' ), @@ -508,11 +596,20 @@ def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: 'SessionId': meta.Datum( self.MOCKED_SESSION_ID, 'string' ), + 'State': meta.Datum( + self.MOCKED_STATE, 'int' + ), + 'Subject': meta.Datum( + self.MOCKED_SUBJECT, 'string' + ), 'TimeToLive': meta.Datum( self.MOCKED_TIME_TO_LIVE, 'string' ), 'To': meta.Datum( self.MOCKED_TO, 'string' + ), + 'TransitionPartitionKey': meta.Datum( + self.MOCKED_TRANSITION_PARTITION_KEY, 'string' ) } mocked_metadata['MessageReceiver'] = meta.Datum(type='json', value=''' @@ -548,6 +645,8 @@ def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: "ServerBusyExceptionMessage": null } }''') + mocked_metadata['ApplicationProperties'] = meta.Datum(type='json', value=''' + {}''') mocked_metadata['UserProperties'] = meta.Datum(type='json', value=''' { "$AzureWebJobsParentId": "6ceef68b-0794-45dd-bb2e-630748515552", @@ -577,15 +676,27 @@ def _generate_multiple_trigger_metadata(self) -> Dict[str, meta.Datum]: combine_from = lambda key, et: self._zip(key, et, sb_a, sb_b, sb_c) mocked_metadata = { + 'ApplicationPropertiesArray': combine_from( + 'ApplicationProperties', 'json' + ), 'ContentTypeArray': combine_from( 'ContentType', 'collection_string' ), 'CorrelationIdArray': combine_from( 'CorrelationId', 'collection_string' ), + 'DeadLetterErrorDescriptionArray': combine_from( + 'DeadLetterErrorDescription', 'collection_string' + ), + 'DeadLetterReasonArray': combine_from( + 'DeadLetterReason', 'collection_string' + ), 'DeadLetterSourceArray': combine_from( 'DeadLetterSource', 'collection_string' ), + 'EnqueuedSequenceNumberArray': combine_from( + 'EnqueuedSequenceNumber', 'collection_sint64' + ), 'EnqueuedTimeUtcArray': combine_from( 'EnqueuedTimeUtc', 'json' ), @@ -595,6 +706,9 @@ def _generate_multiple_trigger_metadata(self) -> Dict[str, meta.Datum]: 'LabelArray': combine_from( 'Label', 'collection_string' ), + 'LockedUntilArray': combine_from( + 'LockedUntilArray', 'json' + ), 'LockTokenArray': combine_from( 'LockToken', 'collection_string' ), @@ -619,12 +733,21 @@ def _generate_multiple_trigger_metadata(self) -> Dict[str, meta.Datum]: 'SequenceNumberArray': combine_from( 'SequenceNumber', 'collection_sint64' ), + 'StateArray': combine_from( + 'State', 'collection_sint64' + ), + 'SubjectArray': combine_from( + 'Subject', 'collection_string' + ), 'TimeToLiveArray': combine_from( 'TimeToLive', 'collection_string' ), 'ToArray': combine_from( 'To', 'collection_string' ), + 'TransactionPartitionKeyArray': combine_from( + 'TransactionPartitionKey', 'collection_string' + ), 'UserPropertiesArray': combine_from( 'UserProperties', 'json' ) From a115579dc0a21418e1d1533efe2659bfeeef188d Mon Sep 17 00:00:00 2001 From: hallvictoria Date: Thu, 4 Jan 2024 15:34:59 -0600 Subject: [PATCH 2/3] testing fixes --- tests/test_servicebus.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_servicebus.py b/tests/test_servicebus.py index 34465ae5..f07e2219 100644 --- a/tests/test_servicebus.py +++ b/tests/test_servicebus.py @@ -42,7 +42,7 @@ class TestServiceBus(unittest.TestCase): MOCKED_TIME_TO_LIVE = '11:22:33' MOCKED_TIME_TO_LIVE_TIMEDELTA = timedelta(hours=11, minutes=22, seconds=33) MOCKED_TO = 'mocked_to' - MOCKED_TRANSITION_PARTITION_KEY = 'mocked_transition_partition_key' + MOCKED_TRANSACTION_PARTITION_KEY = 'mocked_transaction_partition_key' MOCKED_AZURE_PARTNER_ID = '6ceef68b-0794-45dd-bb2e-630748515552' @@ -328,7 +328,7 @@ def test_servicebus_properties(self): self.assertEqual(msg.to, self.MOCKED_TO) self.assertEqual(msg.transaction_partition_key, - self.MOCKED_TRANSITION_PARTITION_KEY) + self.MOCKED_TRANSACTION_PARTITION_KEY) self.assertDictEqual(msg.user_properties, { '$AzureWebJobsParentId': self.MOCKED_AZURE_PARTNER_ID, 'x-opt-enqueue-sequence-number': 0 @@ -493,7 +493,7 @@ def test_multiple_servicebus_trigger_properties(self): self.assertEqual(msg.to, self.MOCKED_TO) self.assertEqual(msg.transaction_partition_key, - self.MOCKED_TRANSITION_PARTITION_KEY) + self.MOCKED_TRANSACTION_PARTITION_KEY) self.assertDictEqual(msg.user_properties, { '$AzureWebJobsParentId': self.MOCKED_AZURE_PARTNER_ID, 'x-opt-enqueue-sequence-number': 0 @@ -608,8 +608,8 @@ def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: 'To': meta.Datum( self.MOCKED_TO, 'string' ), - 'TransitionPartitionKey': meta.Datum( - self.MOCKED_TRANSITION_PARTITION_KEY, 'string' + 'TransactionPartitionKey': meta.Datum( + self.MOCKED_TRANSACTION_PARTITION_KEY, 'string' ) } mocked_metadata['MessageReceiver'] = meta.Datum(type='json', value=''' @@ -707,7 +707,7 @@ def _generate_multiple_trigger_metadata(self) -> Dict[str, meta.Datum]: 'Label', 'collection_string' ), 'LockedUntilArray': combine_from( - 'LockedUntilArray', 'json' + 'LockedUntil', 'json' ), 'LockTokenArray': combine_from( 'LockToken', 'collection_string' From fa4da24a3b05e810c702c0d128a8e3068150ae3b Mon Sep 17 00:00:00 2001 From: hallvictoria Date: Thu, 4 Jan 2024 15:52:59 -0600 Subject: [PATCH 3/3] correct check --- azure/functions/servicebus.py | 2 +- tests/test_servicebus.py | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/azure/functions/servicebus.py b/azure/functions/servicebus.py index 611bdc34..72a9d254 100644 --- a/azure/functions/servicebus.py +++ b/azure/functions/servicebus.py @@ -447,7 +447,7 @@ def _extract_messages( body=message_bodies[i], trigger_metadata=trigger_metadata, application_properties=cls._get_from_metadata_array( - trigger_metadata, 'ApplicationProperties', i), + trigger_metadata, 'ApplicationPropertiesArray', i), content_type=cls._get_from_metadata_array( trigger_metadata, 'ContentTypeArray', i), correlation_id=cls._get_from_metadata_array( diff --git a/tests/test_servicebus.py b/tests/test_servicebus.py index f07e2219..33357878 100644 --- a/tests/test_servicebus.py +++ b/tests/test_servicebus.py @@ -281,7 +281,8 @@ def test_servicebus_properties(self): self.assertEqual(msg.get_body(), b'body_bytes') # Test individual ServiceBus properties respectively - self.assertEqual(msg.application_properties, {}) + self.assertEqual(msg.application_properties, + {'application': 'value'}) self.assertEqual(msg.content_type, self.MOCKED_CONTENT_TYPE) self.assertEqual(msg.correlation_id, @@ -429,7 +430,7 @@ def test_multiple_servicebus_trigger_properties(self): trigger_metadata=self._generate_multiple_trigger_metadata() ) - expceted_bodies: List[str] = [ + expected_bodies: List[str] = [ json.dumps({"lucky_number": 23}), json.dumps({"lucky_number": 34}), json.dumps({"lucky_number": 45}), @@ -444,8 +445,9 @@ def test_multiple_servicebus_trigger_properties(self): for i in range(len(servicebus_msgs)): msg = servicebus_msgs[i] body_data = msg.get_body().decode('utf-8') - self.assertEqual(body_data, expceted_bodies[i]) - self.assertEqual(msg.application_properties, {}) + self.assertEqual(body_data, expected_bodies[i]) + self.assertDictEqual(msg.application_properties, + {"application": "value"}) self.assertEqual(msg.content_type, self.MOCKED_CONTENT_TYPE) self.assertEqual(msg.correlation_id, @@ -646,7 +648,10 @@ def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: } }''') mocked_metadata['ApplicationProperties'] = meta.Datum(type='json', value=''' - {}''') + { + "application": "value" + } + ''') mocked_metadata['UserProperties'] = meta.Datum(type='json', value=''' { "$AzureWebJobsParentId": "6ceef68b-0794-45dd-bb2e-630748515552",