Skip to content

Adjust shared subscription sample to better fit docs #442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions samples/mqtt5_shared_subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ MQTT5 introduces additional features and enhancements that improve the developme

Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.

[Shared Subscriptions](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed.
[Shared Subscriptions](https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5-shared-subscription) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed.

Shared Subscriptions rely on a group identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share/<group identifier>/<topic>`.
Shared Subscriptions rely on a group name/identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share/<ShareName>/<TopicFilter>`.
* `$share`: Tells the MQTT5 broker/server that the device is subscribing to a Shared Subscription.
* `<group identifier>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group.
* `<topic>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.
* `<ShareName>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group.
* `<TopicFilter>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.

Shared Subscriptions use a round-robbin like method of distributing messages. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order:
Shared Subscriptions use a round-robbin like method of distributing messages for the subscribed clients. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order:
* Message 1 -> Client one
* Message 2 -> Client two
* Message 3 -> Client three
Expand Down Expand Up @@ -71,14 +71,12 @@ Replace with the following with the data from your AWS account:
* `<region>`: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`.
* `<account>`: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website.

Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
Note that in a real application, you may want to avoid the use of wildcards in your ClientID and shared subscription group names/identifiers. Wildcards should only be used selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.

</details>

## How to run

### Direct MQTT via mTLS

To Run this sample using a direct MQTT connection with a key and certificate, use the following command:

```sh
Expand Down
56 changes: 19 additions & 37 deletions samples/mqtt5_shared_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from awscrt import mqtt5
from awsiot import mqtt5_client_builder
from uuid import uuid4
import threading
from concurrent.futures import Future
import time
Expand All @@ -16,8 +15,6 @@ class sample_mqtt5_client:
client: mqtt5.Client
name: str
count: int
received_count: int
received_all_event = threading.Event()
future_stopped: Future
future_connection_success: Future

Expand All @@ -29,11 +26,8 @@ def __init__(
input_key,
input_ca,
input_client_id,
input_count,
input_client_name) -> None:
try:
self.count = input_count
self.received_count = 0
self.name = input_client_name
self.future_stopped = Future()
self.future_connection_success = Future()
Expand Down Expand Up @@ -68,10 +62,6 @@ def on_publish_received(self, publish_packet_data):
user_property = publish_packet.user_properties[i]
print(f"\t\twith UserProperty ({user_property.name}, {user_property.value})")

self.received_count += 1
if self.received_count == self.count:
self.received_all_event.set()

# Callback for the lifecycle event Stopped
def on_lifecycle_stopped(self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print(f"[{self.name}]: Lifecycle Stopped")
Expand Down Expand Up @@ -108,22 +98,18 @@ def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectD
# Construct the shared topic
input_shared_topic = f"$share/{cmdData.input_group_identifier}/{cmdData.input_topic}"

# Make sure the message count is even
if (cmdData.input_count % 2 > 0):
exit(ValueError("Error: '--count' is an odd number. '--count' must be even or zero for this sample."))

if __name__ == '__main__':
try:
# Create the MQTT5 clients: one publisher and two subscribers
publisher = sample_mqtt5_client(
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
cmdData.input_clientId + "1", cmdData.input_count / 2, "Publisher")
cmdData.input_clientId + "1", "Publisher")
subscriber_one = sample_mqtt5_client(
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
cmdData.input_clientId + "2", cmdData.input_count / 2, "Subscriber One")
cmdData.input_clientId + "2", "Subscriber One")
subscriber_two = sample_mqtt5_client(
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
cmdData.input_clientId + "3", cmdData.input_count, "Subscriber Two")
cmdData.input_clientId + "3", "Subscriber Two")

# Connect all the clients
publisher.client.start()
Expand All @@ -136,25 +122,20 @@ def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectD
subscriber_two.future_connection_success.result(60)
print(f"[{subscriber_two.name}]: Connected")

# Subscribe to the shared topic on the two subscribers
# Subscribe to the shared topic on both subscribers
subscribe_packet = mqtt5.SubscribePacket(
subscriptions=[mqtt5.Subscription(
topic_filter=input_shared_topic,
qos=mqtt5.QoS.AT_LEAST_ONCE)]
)
try:
subscribe_one_future = subscriber_one.client.subscribe(subscribe_packet)
suback_one = subscribe_one_future.result(60)
print(f"[{subscriber_one.name}]: Subscribed with: {suback_one.reason_codes}")
subscribe_two_future = subscriber_two.client.subscribe(subscribe_packet)
suback_two = subscribe_two_future.result(60)
print(f"[{subscriber_two.name}]: Subscribed with: {suback_two.reason_codes}")
except Exception as ex:
# TMP: If this fails subscribing in CI, just exit the sample gracefully.
if (cmdData.input_is_ci is not None):
exit(0)
else:
raise ex
subscribe_one_future = subscriber_one.client.subscribe(subscribe_packet)
suback_one = subscribe_one_future.result(60)
print(f"[{subscriber_one.name}]: Subscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
print(f"[{subscriber_one.name}]: Full subscribed topic is: '{input_shared_topic}' with SubAck code: {suback_one.reason_codes}")
subscribe_two_future = subscriber_two.client.subscribe(subscribe_packet)
suback_two = subscribe_two_future.result(60)
print(f"[{subscriber_two.name}]: Subscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
print(f"[{subscriber_two.name}]: Full subscribed topic is: '{input_shared_topic}' with SubAck code: {suback_two.reason_codes}")

# Publish using the publisher client
if (cmdData.input_count > 0):
Expand All @@ -167,24 +148,25 @@ def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectD
qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publish_future.result(60)
print(f"[{publisher.name}]: Sent publish and got PubAck with {repr(publish_completion_data.puback.reason_code)}")
print(f"[{publisher.name}]: Sent publish and got PubAck code: {repr(publish_completion_data.puback.reason_code)}")
time.sleep(1)
publish_count += 1

# Make sure all the messages were gotten on the subscribers
subscriber_one.received_all_event.wait(60)
subscriber_two.received_all_event.wait(60)
# Wait 5 seconds to let the last publish go out before unsubscribing
time.sleep(5)
else:
print("Skipping publishing messages due to message count being zero...")

# Unsubscribe from the shared topic on the two subscribers
unsubscribe_packet = mqtt5.UnsubscribePacket(topic_filters=[input_shared_topic])
unsubscribe_one_future = subscriber_one.client.unsubscribe(unsubscribe_packet)
unsuback_one = unsubscribe_one_future.result(60)
print(f"[{subscriber_one.name}]: Unsubscribed with {unsuback_one.reason_codes}")
print(f"[{subscriber_one.name}]: Unsubscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
print(f"[{subscriber_one.name}]: Full unsubscribed topic is: '{input_shared_topic}' with UnsubAck code: {unsuback_one.reason_codes}")
unsubscribe_two_future = subscriber_two.client.unsubscribe(unsubscribe_packet)
unsuback_two = unsubscribe_two_future.result(60)
print(f"[{subscriber_two.name}]: Unsubscribed with {unsuback_two.reason_codes}")
print(f"[{subscriber_two.name}]: Unsubscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
print(f"[{subscriber_two.name}]: Full unsubscribed topic is: '{input_shared_topic}' with UnsubAck code {unsuback_two.reason_codes}")

# Disconnect all the clients
publisher.client.stop()
Expand Down