Skip to content

Commit 3827b04

Browse files
Mqtt5 shared subscription sample (#428)
* Adds a MQTT5 Shared Subscription sample. Also adds it to CI, where it will just skip if the client becomes disconnected due to a lack of shared subscription support.
1 parent c83d3be commit 3827b04

File tree

5 files changed

+371
-0
lines changed

5 files changed

+371
-0
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ jobs:
252252
- name: run MQTT5 CustomAuthorizerConnect sample (websockets)
253253
run: |
254254
python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_custom_authorizer_websockets_cfg.json
255+
- name: run MQTT5 Shared Subscription sample
256+
run: |
257+
python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_shared_subscription_cfg.json
255258
- name: configure AWS credentials (Custom Authorizer)
256259
uses: aws-actions/configure-aws-credentials@v1
257260
with:
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"language": "Python",
3+
"sample_file": "./aws-iot-device-sdk-python-v2/samples/mqtt5_shared_subscription.py",
4+
"sample_region": "us-east-1",
5+
"sample_main_class": "",
6+
"arguments": [
7+
{
8+
"name": "--endpoint",
9+
"secret": "ci/endpoint"
10+
},
11+
{
12+
"name": "--cert",
13+
"secret": "ci/mqtt5/us/mqtt5_thing/cert",
14+
"filename": "tmp_certificate.pem"
15+
},
16+
{
17+
"name": "--key",
18+
"secret": "ci/mqtt5/us/mqtt5_thing/key",
19+
"filename": "tmp_key.pem"
20+
}
21+
]
22+
}

samples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
* [MQTT5 PubSub](./mqtt5_pubsub.md)
44
* [PubSub](./pubsub.md)
5+
* [MQTT5 Shared Subscription](./mqtt5_shared_subscription.md)
56
* [Basic Connect](./basic_connect.md)
67
* [Websocket Connect](./websocket_connect.md)
78
* [MQTT5 PKCS#11 Connect](./mqtt5_pkcs11_connect.md)

samples/mqtt5_shared_subscription.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# MQTT5 Shared Subscription
2+
3+
[**Return to main sample list**](./README.md)
4+
5+
This sample uses the
6+
[Message Broker](https://docs.aws.amazon.com/iot/latest/developerguide/iot-message-broker.html)
7+
for AWS IoT to send and receive messages through an MQTT connection using MQTT5 using a Shared Subscription.
8+
9+
MQTT5 introduces additional features and enhancements that improve the development experience with MQTT. You can read more about MQTT5 in the Python V2 SDK by checking out the [MQTT5 user guide](../documents/MQTT5_Userguide.md).
10+
11+
Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.
12+
13+
Shared Subscriptions allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to processed.
14+
15+
Shared Subscriptions rely on what is called a group identifier, which tells the MQTT5 broker/server which IoT devices are in what group. This is done when subscribing by formatting the subscription topic like the following: `$share/<group identifier>/<topic>`.
16+
* `$share`: Tells the MQTT5 broker/server that the device is subscribing to a Shared Subscription.
17+
* `<group identifier>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. THis is the group of MQTT5 clients that will be worked through as part of the round-robin when a message comes in. For example: `my-iot-group`.
18+
* `<topic>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.
19+
20+
As mentioned, Shared Subscriptions use a round-robbin like method of distributing messages. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order:
21+
* Message 1 -> Client one
22+
* Message 2 -> Client two
23+
* Message 3 -> Client three
24+
* Message 4 -> Client one
25+
* Message 5 -> Client two
26+
* etc...
27+
28+
Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended.
29+
30+
<details>
31+
<summary>(see sample policy)</summary>
32+
<pre>
33+
{
34+
"Version": "2012-10-17",
35+
"Statement": [
36+
{
37+
"Effect": "Allow",
38+
"Action": [
39+
"iot:Publish",
40+
"iot:Receive"
41+
],
42+
"Resource": [
43+
"arn:aws:iot:<b>region</b>:<b>account</b>:topic/test/topic",
44+
"arn:aws:iot:<b>region</b>:<b>account</b>:topic/$share/*/test/topic"
45+
]
46+
},
47+
{
48+
"Effect": "Allow",
49+
"Action": [
50+
"iot:Subscribe"
51+
],
52+
"Resource": [
53+
"arn:aws:iot:<b>region</b>:<b>account</b>:topicfilter/test/topic",
54+
"arn:aws:iot:<b>region</b>:<b>account</b>:topicfilter/$share/*/test/topic"
55+
]
56+
},
57+
{
58+
"Effect": "Allow",
59+
"Action": [
60+
"iot:Connect"
61+
],
62+
"Resource": [
63+
"arn:aws:iot:<b>region</b>:<b>account</b>:client/test-*"
64+
]
65+
}
66+
]
67+
}
68+
</pre>
69+
70+
Replace with the following with the data from your AWS account:
71+
* `<region>`: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`.
72+
* `<account>`: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website.
73+
74+
Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
75+
76+
</details>
77+
78+
## How to run
79+
80+
### Direct MQTT via mTLS
81+
82+
To Run this sample using a direct MQTT connection with a key and certificate, use the following command:
83+
84+
```sh
85+
# For Windows: replace 'python3' with 'python'
86+
python3 mqtt5_shared_subscription.py --endpoint <endpoint> --cert <file> --key <file>
87+
```
88+
89+
You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it:
90+
91+
```sh
92+
# For Windows: replace 'python3' with 'python'
93+
python3 mqtt5_shared_subscription.py --endpoint <endpoint> --cert <file> --key <file> --ca_file <file>
94+
```
95+
96+
Finally, you can also set the Shared Subscription group identifier and topic with `--group_identifier` and `--topic` respectively:
97+
98+
```sh
99+
# For Windows: replace 'python3' with 'python'
100+
python3 mqtt5_shared_subscription.py --endpoint <endpoint> --cert <file> --key <file> --group_identifier <group identifier> --topic <topic>
101+
```

samples/mqtt5_shared_subscription.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0.
3+
4+
from awscrt import mqtt5
5+
from awsiot import mqtt5_client_builder
6+
from uuid import uuid4
7+
import threading
8+
from concurrent.futures import Future
9+
import time
10+
11+
# MQTT5 support is currently in <b>developer preview</b>. We encourage feedback at all times, but feedback during the
12+
# preview window is especially valuable in shaping the final product. During the preview period we may make
13+
# backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.
14+
15+
# For the purposes of this sample, we need to associate certain variables with a particular MQTT5 client
16+
# and to do so we use this class to hold all the data for a particular client used in the sample.
17+
class sample_mqtt5_client:
18+
client : mqtt5.Client
19+
name : str
20+
count : int
21+
received_count : int
22+
received_all_event = threading.Event()
23+
future_stopped : Future
24+
future_connection_success : Future
25+
26+
# Creates a MQTT5 client using direct MQTT5 via mTLS with the passed input data.
27+
def __init__(self, input_endpoint, input_cert, input_key, input_ca, input_client_id, input_count, input_client_name) -> None:
28+
try:
29+
self.count = input_count
30+
self.received_count = 0
31+
self.name = input_client_name
32+
self.future_stopped = Future()
33+
self.future_connection_success = Future()
34+
self.client = mqtt5_client_builder.mtls_from_path(
35+
endpoint=input_endpoint,
36+
cert_filepath=input_cert,
37+
pri_key_filepath=input_key,
38+
client_id=input_client_id,
39+
ca_filepath=input_ca,
40+
on_publish_received=self.on_publish_received,
41+
on_lifecycle_stopped=self.on_lifecycle_stopped,
42+
on_lifecycle_connection_success=self.on_lifecycle_connection_success,
43+
on_lifecycle_connection_failure=self.on_lifecycle_connection_failure,
44+
on_lifecycle_disconnection=self.on_lifecycle_disconnection,
45+
)
46+
except Exception as ex:
47+
print (f"Client creation failed with exception: {ex}")
48+
raise ex
49+
50+
# Callback when any publish is received
51+
def on_publish_received(self, publish_packet_data):
52+
print(f"[{self.name}] Received a publish")
53+
54+
publish_packet = publish_packet_data.publish_packet
55+
assert isinstance(publish_packet, mqtt5.PublishPacket)
56+
print(f"\tPublish received message on topic: {publish_packet.topic}")
57+
print(f"\tMessage: {publish_packet.payload}")
58+
59+
if (publish_packet.user_properties != None):
60+
if (publish_packet.user_properties.count > 0):
61+
for i in range(0, publish_packet.user_properties.count):
62+
user_property = publish_packet.user_properties[i]
63+
print(f"\t\twith UserProperty ({user_property.name}, {user_property.value})")
64+
65+
self.received_count += 1
66+
if self.received_count == self.count:
67+
self.received_all_event.set()
68+
69+
# Callback for the lifecycle event Stopped
70+
def on_lifecycle_stopped(self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
71+
print(f"[{self.name}]: Lifecycle Stopped")
72+
self.future_stopped.set_result(lifecycle_stopped_data)
73+
74+
# Callback for the lifecycle event Connection Success
75+
def on_lifecycle_connection_success(self, lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
76+
print(f"{self.name}]: Lifecycle Connection Success")
77+
self.future_connection_success.set_result(lifecycle_connect_success_data)
78+
79+
# Callback for the lifecycle event Connection Failure
80+
def on_lifecycle_connection_failure(self, lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
81+
print(f"{self.name}]: Lifecycle Connection Failure")
82+
print(f"{self.name}]: Connection failed with exception:{lifecycle_connection_failure.exception}")
83+
84+
# Callback for the lifecycle event Disconnection
85+
def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectData):
86+
print(f"{self.name}]: Lifecycle Disconnected")
87+
88+
if (disconnect_data.disconnect_packet != None):
89+
print(f"\tDisconnection packet code: {disconnect_data.disconnect_packet.reason_code}")
90+
print(f"\tDisconnection packet reason: {disconnect_data.disconnect_packet.reason_string}")
91+
if (disconnect_data.disconnect_packet.reason_code == mqtt5.DisconnectReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED):
92+
# Stop the client, which will interrupt the subscription and stop the sample
93+
self.client.stop()
94+
95+
# Register arguments that can be parsed from the command line
96+
import utils.command_line_utils as command_line_utils
97+
cmdUtils = command_line_utils.CommandLineUtils("SharedSubscription - Send and receive messages through a MQTT5 shared subscription")
98+
cmdUtils.add_common_mqtt5_commands()
99+
cmdUtils.add_common_topic_message_commands()
100+
cmdUtils.add_common_proxy_commands()
101+
cmdUtils.add_common_logging_commands()
102+
cmdUtils.register_command("key", "<path>", "Path to your key in PEM format.", True, str)
103+
cmdUtils.register_command("cert", "<path>", "Path to your client certificate in PEM format.", True, str)
104+
cmdUtils.register_command(
105+
"port",
106+
"<int>",
107+
"Connection port. AWS IoT supports 433 and 8883 (optional, default=auto).",
108+
type=int)
109+
cmdUtils.register_command(
110+
"client_id",
111+
"<str>",
112+
"Client ID to use for MQTT5 connection (optional, default=None)."
113+
"Note that '1', '2', and '3' will be added for to the given clientIDs since this sample uses 3 clients.",
114+
default="test-" + str(uuid4()))
115+
cmdUtils.register_command(
116+
"count",
117+
"<int>",
118+
"The number of messages to send (optional, default='10').",
119+
default=10,
120+
type=int)
121+
cmdUtils.register_command(
122+
"group_identifier",
123+
"<str>",
124+
"The group identifier to use in the shared subscription (optional, default='python-sample')",
125+
default="python-sample",
126+
type=str)
127+
cmdUtils.register_command("is_ci", "<str>", "If present the sample will run in CI mode (optional, default='None')")
128+
# Needs to be called so the command utils parse the commands
129+
cmdUtils.get_args()
130+
131+
# Pull all the data from the command line
132+
input_endpoint = cmdUtils.get_command_required("endpoint")
133+
input_cert = cmdUtils.get_command_required("cert")
134+
input_key = cmdUtils.get_command_required("key")
135+
input_ca = cmdUtils.get_command("ca_file")
136+
input_client_id = cmdUtils.get_command("client_id", "test-" + str(uuid4()))
137+
input_count = cmdUtils.get_command("count", 10)
138+
input_topic = cmdUtils.get_command("topic", "test/topic")
139+
input_message = cmdUtils.get_command("message", "Hello World!")
140+
input_group_identifier = cmdUtils.get_command("group_identifier", "python-sample")
141+
input_is_ci = cmdUtils.get_command("is_ci", None)
142+
input_is_ci_boolean = (input_is_ci != None and input_is_ci != "None")
143+
144+
# If this is CI, append a UUID to the topic
145+
if (input_is_ci_boolean):
146+
input_topic += "/" + str(uuid4())
147+
148+
# Construct the shared topic
149+
input_shared_topic = f"$share/{input_group_identifier}/{input_topic}"
150+
151+
# Make sure the message count is even
152+
if (input_count % 2 > 0):
153+
exit(ValueError("Error: '--count' is an odd number. '--count' must be even or zero for this sample."))
154+
155+
if __name__ == '__main__':
156+
try:
157+
# Create the MQTT5 clients: one publisher and two subscribers
158+
publisher = sample_mqtt5_client(
159+
input_endpoint, input_cert, input_key, input_ca,
160+
input_client_id + "1", input_count/2, "Publisher")
161+
subscriber_one = sample_mqtt5_client(
162+
input_endpoint, input_cert, input_key, input_ca,
163+
input_client_id + "2", input_count/2, "Subscriber One")
164+
subscriber_two = sample_mqtt5_client(
165+
input_endpoint, input_cert, input_key, input_ca,
166+
input_client_id + "3", input_count, "Subscriber Two")
167+
168+
# Connect all the clients
169+
publisher.client.start()
170+
publisher.future_connection_success.result(60)
171+
print (f"[{publisher.name}]: Connected")
172+
subscriber_one.client.start()
173+
subscriber_one.future_connection_success.result(60)
174+
print (f"[{subscriber_one.name}]: Connected")
175+
subscriber_two.client.start()
176+
subscriber_two.future_connection_success.result(60)
177+
print (f"[{subscriber_two.name}]: Connected")
178+
179+
# Subscribe to the shared topic on the two subscribers
180+
subscribe_packet = mqtt5.SubscribePacket(
181+
subscriptions=[mqtt5.Subscription(
182+
topic_filter=input_shared_topic,
183+
qos=mqtt5.QoS.AT_LEAST_ONCE)]
184+
)
185+
try:
186+
subscribe_one_future = subscriber_one.client.subscribe(subscribe_packet)
187+
suback_one = subscribe_one_future.result(60)
188+
print(f"[{subscriber_one.name}]: Subscribed with: {suback_one.reason_codes}")
189+
subscribe_two_future = subscriber_two.client.subscribe(subscribe_packet)
190+
suback_two = subscribe_two_future.result(60)
191+
print(f"[{subscriber_two.name}]: Subscribed with: {suback_two.reason_codes}")
192+
except Exception as ex:
193+
# TMP: If this fails subscribing in CI, just exit the sample gracefully.
194+
if (input_is_ci != None and input_is_ci != "None"):
195+
exit(0)
196+
else:
197+
raise ex
198+
199+
# Publish using the publisher client
200+
if (input_count > 0):
201+
publish_count = 1
202+
while (publish_count <= input_count):
203+
publish_message = f"{input_message} [{publish_count}]"
204+
publish_future = publisher.client.publish(mqtt5.PublishPacket(
205+
topic=input_topic,
206+
payload=publish_message,
207+
qos=mqtt5.QoS.AT_LEAST_ONCE
208+
))
209+
publish_completion_data = publish_future.result(60)
210+
print(f"[{publisher.name}]: Sent publish and got PubAck with {repr(publish_completion_data.puback.reason_code)}")
211+
time.sleep(1)
212+
publish_count += 1
213+
214+
# Make sure all the messages were gotten on the subscribers
215+
subscriber_one.received_all_event.wait(60)
216+
subscriber_two.received_all_event.wait(60)
217+
else:
218+
print("Skipping publishing messages due to message count being zero...")
219+
220+
# Unsubscribe from the shared topic on the two subscribers
221+
unsubscribe_packet = mqtt5.UnsubscribePacket(topic_filters=[input_shared_topic])
222+
unsubscribe_one_future = subscriber_one.client.unsubscribe(unsubscribe_packet)
223+
unsuback_one = unsubscribe_one_future.result(60)
224+
print(f"[{subscriber_one.name}]: Unsubscribed with {unsuback_one.reason_codes}")
225+
unsubscribe_two_future = subscriber_two.client.unsubscribe(unsubscribe_packet)
226+
unsuback_two = unsubscribe_two_future.result(60)
227+
print(f"[{subscriber_two.name}]: Unsubscribed with {unsuback_two.reason_codes}")
228+
229+
# Disconnect all the clients
230+
publisher.client.stop()
231+
publisher.future_stopped.result(60)
232+
print(f"[{publisher.name}]: Fully stopped")
233+
subscriber_one.client.stop()
234+
subscriber_one.future_stopped.result(60)
235+
print(f"[{subscriber_one.name}]: Fully stopped")
236+
subscriber_two.client.stop()
237+
subscriber_two.future_stopped.result(60)
238+
print(f"[{subscriber_two.name}]: Fully stopped")
239+
240+
except Exception as ex:
241+
print (f"An exception ocurred while running sample! Exception: {ex}")
242+
exit(ex)
243+
244+
print ("Complete!")

0 commit comments

Comments
 (0)