Skip to content

Commit b5ba6fe

Browse files
authored
Merge pull request #310 from aws/integration_test
Added integration tests in GitHub CI
2 parents 99fe2e9 + b970c2d commit b5ba6fe

21 files changed

+2759
-0
lines changed

.github/workflows/ci.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,22 @@ jobs:
3131
pip install mock
3232
python3 -m pytest test
3333
34+
integration-tests:
35+
runs-on: ubuntu-latest
36+
strategy:
37+
fail-fast: false
38+
matrix:
39+
test-type: [ MutualAuth, MutualAuthT , Websocket, ALPN, ALPNT]
40+
python-version: [ '2.x', '3.x' ]
41+
#[MutualAuth, Websocket, ALPN]
42+
steps:
43+
- uses: actions/checkout@v2
44+
- uses: actions/setup-python@v2
45+
with:
46+
python-version: ${{ matrix.python-version }}
47+
- name: Integration tests
48+
run: |
49+
pip install pytest
50+
pip install mock
51+
pip install boto3
52+
./test-integration/run/run.sh ${{ matrix.test-type }} 1000 100 7
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*
2+
*/
3+
!.gitignore
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# This integration test verifies the functionality of asynchronous API for plain MQTT operations, as well as general
2+
# notification callbacks. There are 2 phases for this test:
3+
# a) Testing async APIs + onMessage general notification callback
4+
# b) Testing onOnline, onOffline notification callbacks
5+
# To achieve test goal a) and b), the client will follow the routine described below:
6+
# 1. Client does async connect to AWS IoT and captures the CONNACK event and onOnline callback event in the record
7+
# 2. Client does async subscribe to a topic and captures the SUBACK event in the record
8+
# 3. Client does several async publish (QoS1) to the same topic and captures the PUBACK event in the record
9+
# 4. Since client subscribes and publishes to the same topic, onMessage callback should be triggered. We capture these
10+
# events as well in the record.
11+
# 5. Client does async disconnect. This would trigger the offline callback and disconnect event callback. We capture
12+
# them in the record.
13+
# We should be able to receive all ACKs for all operations and corresponding general notification callback triggering
14+
# events.
15+
16+
17+
import random
18+
import string
19+
import time
20+
import sys
21+
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary")
22+
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage")
23+
24+
from TestToolLibrary.checkInManager import checkInManager
25+
from TestToolLibrary.MQTTClientManager import MQTTClientManager
26+
from TestToolLibrary.skip import skip_when_match
27+
from TestToolLibrary.skip import ModeIsALPN
28+
from TestToolLibrary.skip import Python2VersionLowerThan
29+
from TestToolLibrary.skip import Python3VersionLowerThan
30+
31+
32+
TOPIC = "topic/test/async_cb/"
33+
MESSAGE_PREFIX = "MagicMessage-"
34+
NUMBER_OF_PUBLISHES = 3
35+
HOST = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com"
36+
ROOT_CA = "./test-integration/Credentials/rootCA.crt"
37+
CERT = "./test-integration/Credentials/certificate.pem.crt"
38+
KEY = "./test-integration/Credentials/privateKey.pem.key"
39+
CLIENT_ID = "PySdkIntegTest_AsyncAPI_Callbacks"
40+
41+
KEY_ON_ONLINE = "OnOnline"
42+
KEY_ON_OFFLINE = "OnOffline"
43+
KEY_ON_MESSAGE = "OnMessage"
44+
KEY_CONNACK = "Connack"
45+
KEY_DISCONNECT = "Disconnect"
46+
KEY_PUBACK = "Puback"
47+
KEY_SUBACK = "Suback"
48+
KEY_UNSUBACK = "Unsuback"
49+
50+
51+
class CallbackManager(object):
52+
53+
def __init__(self):
54+
self.callback_invocation_record = {
55+
KEY_ON_ONLINE : 0,
56+
KEY_ON_OFFLINE : 0,
57+
KEY_ON_MESSAGE : 0,
58+
KEY_CONNACK : 0,
59+
KEY_DISCONNECT : 0,
60+
KEY_PUBACK : 0,
61+
KEY_SUBACK : 0,
62+
KEY_UNSUBACK : 0
63+
}
64+
65+
def on_online(self):
66+
print("OMG, I am online!")
67+
self.callback_invocation_record[KEY_ON_ONLINE] += 1
68+
69+
def on_offline(self):
70+
print("OMG, I am offline!")
71+
self.callback_invocation_record[KEY_ON_OFFLINE] += 1
72+
73+
def on_message(self, message):
74+
print("OMG, I got a message!")
75+
self.callback_invocation_record[KEY_ON_MESSAGE] += 1
76+
77+
def connack(self, mid, data):
78+
print("OMG, I got a connack!")
79+
self.callback_invocation_record[KEY_CONNACK] += 1
80+
81+
def disconnect(self, mid, data):
82+
print("OMG, I got a disconnect!")
83+
self.callback_invocation_record[KEY_DISCONNECT] += 1
84+
85+
def puback(self, mid):
86+
print("OMG, I got a puback!")
87+
self.callback_invocation_record[KEY_PUBACK] += 1
88+
89+
def suback(self, mid, data):
90+
print("OMG, I got a suback!")
91+
self.callback_invocation_record[KEY_SUBACK] += 1
92+
93+
def unsuback(self, mid):
94+
print("OMG, I got an unsuback!")
95+
self.callback_invocation_record[KEY_UNSUBACK] += 1
96+
97+
98+
def get_random_string(length):
99+
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
100+
101+
102+
############################################################################
103+
# Main #
104+
# Check inputs
105+
my_check_in_manager = checkInManager(1)
106+
my_check_in_manager.verify(sys.argv)
107+
mode = my_check_in_manager.mode
108+
109+
skip_when_match(ModeIsALPN(mode).And(
110+
Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0)))
111+
), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3]))
112+
113+
# Performing
114+
############
115+
print("Connecting...")
116+
callback_manager = CallbackManager()
117+
sdk_mqtt_client = MQTTClientManager()\
118+
.create_nonconnected_mqtt_client(mode, CLIENT_ID, HOST, (ROOT_CA, CERT, KEY), callback_manager)
119+
sdk_mqtt_client.connectAsync(keepAliveIntervalSecond=1, ackCallback=callback_manager.connack) # Add callback
120+
print("Wait some time to make sure we are connected...")
121+
time.sleep(10) # 10 sec
122+
123+
topic = TOPIC + get_random_string(4)
124+
print("Subscribing to topic: " + topic)
125+
sdk_mqtt_client.subscribeAsync(topic, 1, ackCallback=callback_manager.suback, messageCallback=None)
126+
print("Wait some time to make sure we are subscribed...")
127+
time.sleep(3) # 3 sec
128+
129+
print("Publishing...")
130+
for i in range(NUMBER_OF_PUBLISHES):
131+
sdk_mqtt_client.publishAsync(topic, MESSAGE_PREFIX + str(i), 1, ackCallback=callback_manager.puback)
132+
time.sleep(1)
133+
print("Wait sometime to make sure we finished with publishing...")
134+
time.sleep(2)
135+
136+
print("Unsubscribing...")
137+
sdk_mqtt_client.unsubscribeAsync(topic, ackCallback=callback_manager.unsuback)
138+
print("Wait sometime to make sure we finished with unsubscribing...")
139+
time.sleep(2)
140+
141+
print("Disconnecting...")
142+
sdk_mqtt_client.disconnectAsync(ackCallback=callback_manager.disconnect)
143+
144+
print("Wait sometime to let the test result sync...")
145+
time.sleep(3)
146+
147+
print("Verifying...")
148+
try:
149+
assert callback_manager.callback_invocation_record[KEY_ON_ONLINE] == 1
150+
assert callback_manager.callback_invocation_record[KEY_CONNACK] == 1
151+
assert callback_manager.callback_invocation_record[KEY_SUBACK] == 1
152+
assert callback_manager.callback_invocation_record[KEY_PUBACK] == NUMBER_OF_PUBLISHES
153+
assert callback_manager.callback_invocation_record[KEY_ON_MESSAGE] == NUMBER_OF_PUBLISHES
154+
assert callback_manager.callback_invocation_record[KEY_UNSUBACK] == 1
155+
assert callback_manager.callback_invocation_record[KEY_DISCONNECT] == 1
156+
assert callback_manager.callback_invocation_record[KEY_ON_OFFLINE] == 1
157+
except BaseException as e:
158+
print("Failed! %s" % e.message)
159+
print("Pass!")
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# This integration test verifies the functionality in the Python core of Yun/Python SDK
2+
# for auto-reconnect and auto-resubscribe.
3+
# It starts two threads using two different connections to AWS IoT:
4+
# Thread A publishes 10 messages to topicB first, then quiet for a while, and finally
5+
# publishes another 10 messages to topicB.
6+
# Thread B subscribes to topicB and waits to receive messages. Once it receives the first
7+
# 10 messages. It simulates a network error, disconnecting from the broker. In a short time,
8+
# it should automatically reconnect and resubscribe to the previous topic and be able to
9+
# receive the next 10 messages from thread A.
10+
# Because of auto-reconnect/resubscribe, thread B should be able to receive all of the
11+
# messages from topicB published by thread A without calling subscribe again in user code
12+
# explicitly.
13+
14+
15+
import random
16+
import string
17+
import sys
18+
import time
19+
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary")
20+
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage")
21+
22+
import TestToolLibrary.checkInManager as checkInManager
23+
import TestToolLibrary.MQTTClientManager as MQTTClientManager
24+
from TestToolLibrary import simpleThreadManager
25+
from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError
26+
from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError
27+
from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException
28+
from TestToolLibrary.skip import skip_when_match
29+
from TestToolLibrary.skip import ModeIsALPN
30+
from TestToolLibrary.skip import Python2VersionLowerThan
31+
from TestToolLibrary.skip import Python3VersionLowerThan
32+
33+
CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4))
34+
CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4))
35+
36+
# Callback unit
37+
class callbackUnit:
38+
def __init__(self):
39+
self._internalSet = set()
40+
41+
# Callback fro clientSub
42+
def messageCallback(self, client, userdata, message):
43+
print("Received a new message: " + str(message.payload))
44+
self._internalSet.add(message.payload.decode('utf-8'))
45+
46+
def getInternalSet(self):
47+
return self._internalSet
48+
49+
50+
# Simulate a network error
51+
def manualNetworkError(srcPyMQTTCore):
52+
# Ensure we close the socket
53+
if srcPyMQTTCore._internal_async_client._paho_client._sock:
54+
srcPyMQTTCore._internal_async_client._paho_client._sock.close()
55+
srcPyMQTTCore._internal_async_client._paho_client._sock = None
56+
if srcPyMQTTCore._internal_async_client._paho_client._ssl:
57+
srcPyMQTTCore._internal_async_client._paho_client._ssl.close()
58+
srcPyMQTTCore._internal_async_client._paho_client._ssl = None
59+
# Fake that we have detected the disconnection
60+
srcPyMQTTCore._internal_async_client._paho_client.on_disconnect(None, None, 0)
61+
62+
63+
# runFunctionUnit
64+
class runFunctionUnit():
65+
def __init__(self):
66+
self._messagesPublished = set()
67+
self._topicB = "topicB/" + "".join(random.choice(string.ascii_lowercase) for i in range(4))
68+
69+
# ThreadA runtime function:
70+
# 1. Publish 10 messages to topicB.
71+
# 2. Take a nap: 20 sec
72+
# 3. Publish another 10 messages to topicB.
73+
def threadARuntime(self, pyCoreClient):
74+
time.sleep(3) # Ensure a valid subscription
75+
messageCount = 0
76+
# First 10 messages
77+
while messageCount < 10:
78+
try:
79+
pyCoreClient.publish(self._topicB, str(messageCount), 1, False)
80+
self._messagesPublished.add(str(messageCount))
81+
except publishError:
82+
print("Publish error!")
83+
except Exception as e:
84+
print("Unknown exception!")
85+
print("Type: " + str(type(e)))
86+
print("Message: " + str(e.message))
87+
messageCount += 1
88+
time.sleep(0.5) # TPS = 2
89+
# Take a nap
90+
time.sleep(20)
91+
# Second 10 messages
92+
while messageCount < 20:
93+
try:
94+
pyCoreClient.publish(self._topicB, str(messageCount), 1, False)
95+
self._messagesPublished.add(str(messageCount))
96+
except publishError:
97+
print("Publish Error!")
98+
except Exception as e:
99+
print("Unknown exception!")
100+
print("Type: " + str(type(e)))
101+
print("Message: " + str(e.message))
102+
messageCount += 1
103+
time.sleep(0.5)
104+
print("Publish thread terminated.")
105+
106+
# ThreadB runtime function:
107+
# 1. Subscribe to topicB
108+
# 2. Wait for a while
109+
# 3. Create network blocking, triggering auto-reconnect and auto-resubscribe
110+
# 4. On connect, wait for another while
111+
def threadBRuntime(self, pyCoreClient, callback):
112+
try:
113+
# Subscribe to topicB
114+
pyCoreClient.subscribe(self._topicB, 1, callback)
115+
except subscribeTimeoutException:
116+
print("Subscribe timeout!")
117+
except subscribeError:
118+
print("Subscribe error!")
119+
except Exception as e:
120+
print("Unknown exception!")
121+
print("Type: " + str(type(e)))
122+
print("Message: " + str(e.message))
123+
# Wait to get the first 10 messages from thread A
124+
time.sleep(10)
125+
# Block the network for 3 sec
126+
print("Block the network for 3 sec...")
127+
blockingTimeTenMs = 300
128+
while blockingTimeTenMs != 0:
129+
manualNetworkError(pyCoreClient)
130+
blockingTimeTenMs -= 1
131+
time.sleep(0.01)
132+
print("Leave it to the main thread to keep waiting...")
133+
134+
135+
############################################################################
136+
# Main #
137+
# Check inputs
138+
myCheckInManager = checkInManager.checkInManager(1)
139+
myCheckInManager.verify(sys.argv)
140+
141+
host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com"
142+
rootCA = "./test-integration/Credentials/rootCA.crt"
143+
certificate = "./test-integration/Credentials/certificate.pem.crt"
144+
privateKey = "./test-integration/Credentials/privateKey.pem.key"
145+
mode = myCheckInManager.mode
146+
147+
skip_when_match(ModeIsALPN(mode).And(
148+
Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0)))
149+
), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3]))
150+
151+
# Init Python core and connect
152+
myMQTTClientManager = MQTTClientManager.MQTTClientManager()
153+
clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA,
154+
certificate, privateKey, mode=mode)
155+
clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA,
156+
certificate, privateKey, mode=mode)
157+
158+
if clientPub is None or clientSub is None:
159+
print("Clients not init!")
160+
exit(4)
161+
162+
print("Two clients are connected!")
163+
164+
# Configurations
165+
################
166+
# Callback unit
167+
subCallbackUnit = callbackUnit()
168+
# Threads
169+
mySimpleThreadManager = simpleThreadManager.simpleThreadManager()
170+
myRunFunctionUnit = runFunctionUnit()
171+
publishThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnit.threadARuntime, [clientPub])
172+
subscribeThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnit.threadBRuntime,
173+
[clientSub, subCallbackUnit.messageCallback])
174+
175+
# Performing
176+
############
177+
mySimpleThreadManager.startThreadWithID(subscribeThreadID)
178+
mySimpleThreadManager.startThreadWithID(publishThreadID)
179+
mySimpleThreadManager.joinOneTimeThreadWithID(subscribeThreadID)
180+
mySimpleThreadManager.joinOneTimeThreadWithID(publishThreadID)
181+
time.sleep(3) # Just in case messages arrive slowly
182+
183+
# Verifying
184+
###########
185+
# Length
186+
print("Check if the length of the two sets are equal...")
187+
print("Received from subscription: " + str(len(subCallbackUnit.getInternalSet())))
188+
print("Sent through publishes: " + str(len(myRunFunctionUnit._messagesPublished)))
189+
if len(myRunFunctionUnit._messagesPublished) != len(subCallbackUnit.getInternalSet()):
190+
print("Number of messages not equal!")
191+
exit(4)
192+
# Content
193+
print("Check if the content if the two sets are equivalent...")
194+
if myRunFunctionUnit._messagesPublished != subCallbackUnit.getInternalSet():
195+
print("Sent through publishes:")
196+
print(myRunFunctionUnit._messagesPublished)
197+
print("Received from subscription:")
198+
print(subCallbackUnit.getInternalSet())
199+
print("Set content not equal!")
200+
exit(4)
201+
else:
202+
print("Yes!")

0 commit comments

Comments
 (0)