Skip to content

Commit 2ed735d

Browse files
authored
Merge pull request adafruit#38 from brentru/on-message-enhancements
Add Topic-specific message callbacks
2 parents 994938d + 270e8a4 commit 2ed735d

File tree

6 files changed

+278
-5
lines changed

6 files changed

+278
-5
lines changed

LICENSE

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ The MIT License (MIT)
22

33
Copyright (c) 2019 Brent Rubell for Adafruit Industries
44

5+
56
Permission is hereby granted, free of charge, to any person obtaining a copy
67
of this software and associated documentation files (the "Software"), to deal
78
in the Software without restriction, including without limitation the rights
@@ -19,3 +20,9 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1920
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2021
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2122
SOFTWARE.
23+
24+
matcher.py for matching MQTT topics from the Eclipse Paho MQTT Python Client
25+
https://github.com/eclipse/paho.mqtt.python
26+
27+
Eclipse Paho MQTT Python Client is dual licensed under the Eclipse Public License 1.0 and the
28+
Eclipse Distribution License 1.0 as described in the epl-v10 and edl-v10 files.

adafruit_minimqtt/__init__.py

Whitespace-only changes.

adafruit_minimqtt.py renamed to adafruit_minimqtt/adafruit_minimqtt.py

+51-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#
55
# Original Work Copyright (c) 2016 Paul Sokolovsky, uMQTT
66
# Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt
7+
# Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python
78
#
89
# Permission is hereby granted, free of charge, to any person obtaining a copy
910
# of this software and associated documentation files (the "Software"), to deal
@@ -44,6 +45,7 @@
4445
from random import randint
4546
from micropython import const
4647
import adafruit_logging as logging
48+
from .matcher import MQTTMatcher
4749

4850
__version__ = "0.0.0-auto.0"
4951
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git"
@@ -173,8 +175,9 @@ def __init__(
173175
self._lw_retain = False
174176
# List of subscribed topics, used for tracking
175177
self._subscribed_topics = []
178+
self._on_message_filtered = MQTTMatcher()
176179
# Server callbacks
177-
self.on_message = None
180+
self._on_message = None
178181
self.on_connect = None
179182
self.on_disconnect = None
180183
self.on_publish = None
@@ -218,6 +221,51 @@ def will_set(self, topic=None, payload=None, qos=0, retain=False):
218221
self._lw_msg = payload
219222
self._lw_retain = retain
220223

224+
def add_topic_callback(self, mqtt_topic, callback_method):
225+
"""Registers a callback_method for a specific MQTT topic.
226+
:param str mqtt_topic: MQTT topic.
227+
:param str callback_method: Name of callback method.
228+
229+
"""
230+
if mqtt_topic is None or callback_method is None:
231+
raise ValueError("MQTT topic and callback method must both be defined.")
232+
self._on_message_filtered[mqtt_topic] = callback_method
233+
234+
def remove_topic_callback(self, mqtt_topic):
235+
"""Removes a registered callback method.
236+
:param str mqtt_topic: MQTT topic.
237+
238+
"""
239+
if mqtt_topic is None:
240+
raise ValueError("MQTT Topic must be defined.")
241+
try:
242+
del self._on_message_filtered[mqtt_topic]
243+
except KeyError:
244+
raise KeyError("MQTT topic callback not added with add_topic_callback.")
245+
246+
@property
247+
def on_message(self):
248+
"""Called when a new message has been received on a subscribed topic.
249+
250+
Expected method signature is:
251+
on_message(client, topic, message)
252+
"""
253+
return self._on_message
254+
255+
@on_message.setter
256+
def on_message(self, method):
257+
self._on_message = method
258+
259+
def _handle_on_message(self, client, topic, message):
260+
matched = False
261+
if topic is not None:
262+
for callback in self._on_message_filtered.iter_match(topic):
263+
callback(client, topic, message) # on_msg with callback
264+
matched = True
265+
266+
if not matched and self.on_message: # regular on_message
267+
self.on_message(client, topic, message)
268+
221269
# pylint: disable=too-many-branches, too-many-statements, too-many-locals
222270
def connect(self, clean_session=True):
223271
"""Initiates connection with the MQTT Broker.
@@ -665,8 +713,7 @@ def _wait_for_msg(self, timeout=30):
665713
pid = pid[0] << 0x08 | pid[1]
666714
sz -= 0x02
667715
msg = self._sock.recv(sz)
668-
if self.on_message is not None:
669-
self.on_message(self, topic, str(msg, "utf-8"))
716+
self._handle_on_message(self, topic, str(msg, "utf-8"))
670717
if res[0] & 0x06 == 0x02:
671718
pkt = bytearray(b"\x40\x02\0\0")
672719
struct.pack_into("!H", pkt, 2, pid)
@@ -751,7 +798,7 @@ def mqtt_msg(self, msg_size):
751798
if msg_size < MQTT_MSG_MAX_SZ:
752799
self._msg_size_lim = msg_size
753800

754-
# Logging
801+
### Logging ###
755802
def attach_logger(self, logger_name="log"):
756803
"""Initializes and attaches a logger to the MQTTClient.
757804
:param str logger_name: Name of the logger instance

adafruit_minimqtt/matcher.py

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright (c) 2017 Yoch <https://github.com/yoch>
2+
#
3+
# This file is dual licensed under the Eclipse Public License 1.0 and the
4+
# Eclipse Distribution License 1.0 as described in the epl-v10 and edl-v10 files.
5+
#
6+
#
7+
"""
8+
`matcher`
9+
====================================================================================
10+
11+
MQTT topic filter matcher from the Eclipse Project's Paho.MQTT.Python
12+
https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/matcher.py
13+
* Author(s): Yoch (https://github.com/yoch)
14+
"""
15+
16+
17+
class MQTTMatcher:
18+
"""Intended to manage topic filters including wildcards.
19+
20+
Internally, MQTTMatcher use a prefix tree (trie) to store
21+
values associated with filters, and has an iter_match()
22+
method to iterate efficiently over all filters that match
23+
some topic name.
24+
"""
25+
26+
# pylint: disable=too-few-public-methods
27+
class Node:
28+
"""Individual node on the MQTT prefix tree.
29+
"""
30+
31+
__slots__ = "children", "content"
32+
33+
def __init__(self):
34+
self.children = {}
35+
self.content = None
36+
37+
def __init__(self):
38+
self._root = self.Node()
39+
40+
def __setitem__(self, key, value):
41+
"""Add a topic filter :key to the prefix tree
42+
and associate it to :value"""
43+
node = self._root
44+
for sym in key.split("/"):
45+
node = node.children.setdefault(sym, self.Node())
46+
node.content = value
47+
48+
def __getitem__(self, key):
49+
"""Retrieve the value associated with some topic filter :key"""
50+
try:
51+
node = self._root
52+
for sym in key.split("/"):
53+
node = node.children[sym]
54+
if node.content is None:
55+
raise KeyError(key)
56+
return node.content
57+
except KeyError:
58+
raise KeyError(key)
59+
60+
def __delitem__(self, key):
61+
"""Delete the value associated with some topic filter :key"""
62+
lst = []
63+
try:
64+
parent, node = None, self._root
65+
for k in key.split("/"):
66+
parent, node = node, node.children[k]
67+
lst.append((parent, k, node))
68+
node.content = None
69+
except KeyError:
70+
raise KeyError(key)
71+
else: # cleanup
72+
for parent, k, node in reversed(lst):
73+
if node.children or node.content is not None:
74+
break
75+
del parent.children[k]
76+
77+
def iter_match(self, topic):
78+
"""Return an iterator on all values associated with filters
79+
that match the :topic"""
80+
lst = topic.split("/")
81+
normal = not topic.startswith("$")
82+
83+
def rec(node, i=0):
84+
if i == len(lst):
85+
if node.content is not None:
86+
yield node.content
87+
else:
88+
part = lst[i]
89+
if part in node.children:
90+
for content in rec(node.children[part], i + 1):
91+
yield content
92+
if "+" in node.children and (normal or i > 0):
93+
for content in rec(node.children["+"], i + 1):
94+
yield content
95+
if "#" in node.children and (normal or i > 0):
96+
content = node.children["#"].content
97+
if content is not None:
98+
yield content
99+
100+
return rec(self._root)

docs/api.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
.. If your library file(s) are nested in a directory (e.g. /adafruit_foo/foo.py)
55
.. use this format as the module name: "adafruit_foo.foo"
66
7-
.. automodule:: adafruit_minimqtt
7+
.. automodule:: adafruit_minimqtt.adafruit_minimqtt
88
:members:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import time
2+
import board
3+
import busio
4+
from digitalio import DigitalInOut
5+
import neopixel
6+
from adafruit_esp32spi import adafruit_esp32spi
7+
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
8+
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
9+
import adafruit_minimqtt.adafruit_minimqtt as MQTT
10+
11+
### WiFi ###
12+
13+
# Get wifi details and more from a secrets.py file
14+
try:
15+
from secrets import secrets
16+
except ImportError:
17+
print("WiFi secrets are kept in secrets.py, please add them there!")
18+
raise
19+
20+
# If you are using a board with pre-defined ESP32 Pins:
21+
esp32_cs = DigitalInOut(board.ESP_CS)
22+
esp32_ready = DigitalInOut(board.ESP_BUSY)
23+
esp32_reset = DigitalInOut(board.ESP_RESET)
24+
25+
# If you have an externally connected ESP32:
26+
# esp32_cs = DigitalInOut(board.D9)
27+
# esp32_ready = DigitalInOut(board.D10)
28+
# esp32_reset = DigitalInOut(board.D5)
29+
30+
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
31+
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
32+
"""Use below for Most Boards"""
33+
status_light = neopixel.NeoPixel(
34+
board.NEOPIXEL, 1, brightness=0.2
35+
) # Uncomment for Most Boards
36+
"""Uncomment below for ItsyBitsy M4"""
37+
# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
38+
# Uncomment below for an externally defined RGB LED
39+
# import adafruit_rgbled
40+
# from adafruit_esp32spi import PWMOut
41+
# RED_LED = PWMOut.PWMOut(esp, 26)
42+
# GREEN_LED = PWMOut.PWMOut(esp, 27)
43+
# BLUE_LED = PWMOut.PWMOut(esp, 25)
44+
# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
45+
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
46+
47+
### Code ###
48+
49+
# Define callback methods which are called when events occur
50+
# pylint: disable=unused-argument, redefined-outer-name
51+
def connected(client, userdata, flags, rc):
52+
# This function will be called when the client is connected
53+
# successfully to the broker.
54+
print("Connected to MQTT Broker!")
55+
56+
57+
def disconnected(client, userdata, rc):
58+
# This method is called when the client is disconnected
59+
print("Disconnected from MQTT Broker!")
60+
61+
62+
def subscribe(client, userdata, topic, granted_qos):
63+
# This method is called when the client subscribes to a new feed.
64+
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
65+
66+
67+
def unsubscribe(client, userdata, topic, pid):
68+
# This method is called when the client unsubscribes from a feed.
69+
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
70+
71+
72+
def on_battery_msg(client, topic, message):
73+
# Method called when device/batteryLife has a new value
74+
print("Battery level: {}v".format(message))
75+
76+
# client.remove_topic_callback("device/batteryLevel")
77+
78+
79+
def on_message(client, topic, message):
80+
# Method callled when a client's subscribed feed has a new value.
81+
print("New message on topic {0}: {1}".format(topic, message))
82+
83+
84+
# Connect to WiFi
85+
print("Connecting to WiFi...")
86+
wifi.connect()
87+
print("Connected!")
88+
89+
MQTT.set_socket(socket, esp)
90+
91+
# Set up a MiniMQTT Client
92+
client = MQTT.MQTT(broker=secrets["broker"], port=secrets["broker_port"])
93+
94+
# Setup the callback methods above
95+
client.on_connect = connected
96+
client.on_disconnect = disconnected
97+
client.on_subscribe = subscribe
98+
client.on_unsubscribe = unsubscribe
99+
client.on_message = on_message
100+
client.add_topic_callback("device/batteryLevel", on_battery_msg)
101+
102+
# Connect the client to the MQTT broker.
103+
print("Connecting to MQTT broker...")
104+
client.connect()
105+
106+
# Subscribe to all notifications on the device/ topic
107+
client.subscribe("device/#", 1)
108+
109+
# Start a blocking message loop...
110+
# NOTE: NO code below this loop will execute
111+
while True:
112+
try:
113+
client.loop()
114+
except (ValueError, RuntimeError) as e:
115+
print("Failed to get data, retrying\n", e)
116+
wifi.reset()
117+
client.reconnect()
118+
continue
119+
time.sleep(1)

0 commit comments

Comments
 (0)