Skip to content

Add Topic-specific message callbacks #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ The MIT License (MIT)

Copyright (c) 2019 Brent Rubell for Adafruit Industries


Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
Expand All @@ -19,3 +20,9 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

matcher.py for matching MQTT topics from the Eclipse Paho MQTT Python Client
https://github.com/eclipse/paho.mqtt.python

Eclipse Paho MQTT Python Client is dual licensed under the Eclipse Public License 1.0 and the
Eclipse Distribution License 1.0 as described in the epl-v10 and edl-v10 files.
Empty file added adafruit_minimqtt/__init__.py
Empty file.
55 changes: 51 additions & 4 deletions adafruit_minimqtt.py → adafruit_minimqtt/adafruit_minimqtt.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#
# Original Work Copyright (c) 2016 Paul Sokolovsky, uMQTT
# Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt
# Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -44,6 +45,7 @@
from random import randint
from micropython import const
import adafruit_logging as logging
from .matcher import MQTTMatcher

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git"
Expand Down Expand Up @@ -173,8 +175,9 @@ def __init__(
self._lw_retain = False
# List of subscribed topics, used for tracking
self._subscribed_topics = []
self._on_message_filtered = MQTTMatcher()
# Server callbacks
self.on_message = None
self._on_message = None
self.on_connect = None
self.on_disconnect = None
self.on_publish = None
Expand Down Expand Up @@ -218,6 +221,51 @@ def will_set(self, topic=None, payload=None, qos=0, retain=False):
self._lw_msg = payload
self._lw_retain = retain

def add_topic_callback(self, mqtt_topic, callback_method):
"""Registers a callback_method for a specific MQTT topic.
:param str mqtt_topic: MQTT topic.
:param str callback_method: Name of callback method.

"""
if mqtt_topic is None or callback_method is None:
raise ValueError("MQTT topic and callback method must both be defined.")
self._on_message_filtered[mqtt_topic] = callback_method

def remove_topic_callback(self, mqtt_topic):
"""Removes a registered callback method.
:param str mqtt_topic: MQTT topic.

"""
if mqtt_topic is None:
raise ValueError("MQTT Topic must be defined.")
try:
del self._on_message_filtered[mqtt_topic]
except KeyError:
raise KeyError("MQTT topic callback not added with add_topic_callback.")

@property
def on_message(self):
"""Called when a new message has been received on a subscribed topic.

Expected method signature is:
on_message(client, topic, message)
"""
return self._on_message

@on_message.setter
def on_message(self, method):
self._on_message = method

def _handle_on_message(self, client, topic, message):
matched = False
if topic is not None:
for callback in self._on_message_filtered.iter_match(topic):
callback(client, topic, message) # on_msg with callback
matched = True

if not matched and self.on_message: # regular on_message
self.on_message(client, topic, message)

# pylint: disable=too-many-branches, too-many-statements, too-many-locals
def connect(self, clean_session=True):
"""Initiates connection with the MQTT Broker.
Expand Down Expand Up @@ -665,8 +713,7 @@ def _wait_for_msg(self, timeout=30):
pid = pid[0] << 0x08 | pid[1]
sz -= 0x02
msg = self._sock.recv(sz)
if self.on_message is not None:
self.on_message(self, topic, str(msg, "utf-8"))
self._handle_on_message(self, topic, str(msg, "utf-8"))
if res[0] & 0x06 == 0x02:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
Expand Down Expand Up @@ -751,7 +798,7 @@ def mqtt_msg(self, msg_size):
if msg_size < MQTT_MSG_MAX_SZ:
self._msg_size_lim = msg_size

# Logging
### Logging ###
def attach_logger(self, logger_name="log"):
"""Initializes and attaches a logger to the MQTTClient.
:param str logger_name: Name of the logger instance
Expand Down
100 changes: 100 additions & 0 deletions adafruit_minimqtt/matcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2017 Yoch <https://github.com/yoch>
#
# This file is dual licensed under the Eclipse Public License 1.0 and the
# Eclipse Distribution License 1.0 as described in the epl-v10 and edl-v10 files.
#
#
"""
`matcher`
====================================================================================

MQTT topic filter matcher from the Eclipse Project's Paho.MQTT.Python
https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/matcher.py
* Author(s): Yoch (https://github.com/yoch)
"""


class MQTTMatcher:
"""Intended to manage topic filters including wildcards.

Internally, MQTTMatcher use a prefix tree (trie) to store
values associated with filters, and has an iter_match()
method to iterate efficiently over all filters that match
some topic name.
"""

# pylint: disable=too-few-public-methods
class Node:
"""Individual node on the MQTT prefix tree.
"""

__slots__ = "children", "content"

def __init__(self):
self.children = {}
self.content = None

def __init__(self):
self._root = self.Node()

def __setitem__(self, key, value):
"""Add a topic filter :key to the prefix tree
and associate it to :value"""
node = self._root
for sym in key.split("/"):
node = node.children.setdefault(sym, self.Node())
node.content = value

def __getitem__(self, key):
"""Retrieve the value associated with some topic filter :key"""
try:
node = self._root
for sym in key.split("/"):
node = node.children[sym]
if node.content is None:
raise KeyError(key)
return node.content
except KeyError:
raise KeyError(key)

def __delitem__(self, key):
"""Delete the value associated with some topic filter :key"""
lst = []
try:
parent, node = None, self._root
for k in key.split("/"):
parent, node = node, node.children[k]
lst.append((parent, k, node))
node.content = None
except KeyError:
raise KeyError(key)
else: # cleanup
for parent, k, node in reversed(lst):
if node.children or node.content is not None:
break
del parent.children[k]

def iter_match(self, topic):
"""Return an iterator on all values associated with filters
that match the :topic"""
lst = topic.split("/")
normal = not topic.startswith("$")

def rec(node, i=0):
if i == len(lst):
if node.content is not None:
yield node.content
else:
part = lst[i]
if part in node.children:
for content in rec(node.children[part], i + 1):
yield content
if "+" in node.children and (normal or i > 0):
for content in rec(node.children["+"], i + 1):
yield content
if "#" in node.children and (normal or i > 0):
content = node.children["#"].content
if content is not None:
yield content

return rec(self._root)
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
.. If your library file(s) are nested in a directory (e.g. /adafruit_foo/foo.py)
.. use this format as the module name: "adafruit_foo.foo"

.. automodule:: adafruit_minimqtt
.. automodule:: adafruit_minimqtt.adafruit_minimqtt
:members:
119 changes: 119 additions & 0 deletions examples/minimqtt_pub_sub_blocking_topic_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import time
import board
import busio
from digitalio import DigitalInOut
import neopixel
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT

### WiFi ###

# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise

# If you are using a board with pre-defined ESP32 Pins:
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)

# If you have an externally connected ESP32:
# esp32_cs = DigitalInOut(board.D9)
# esp32_ready = DigitalInOut(board.D10)
# esp32_reset = DigitalInOut(board.D5)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
"""Use below for Most Boards"""
status_light = neopixel.NeoPixel(
board.NEOPIXEL, 1, brightness=0.2
) # Uncomment for Most Boards
"""Uncomment below for ItsyBitsy M4"""
# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
# Uncomment below for an externally defined RGB LED
# import adafruit_rgbled
# from adafruit_esp32spi import PWMOut
# RED_LED = PWMOut.PWMOut(esp, 26)
# GREEN_LED = PWMOut.PWMOut(esp, 27)
# BLUE_LED = PWMOut.PWMOut(esp, 25)
# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)

### Code ###

# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connected(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")


def disconnected(client, userdata, rc):
# This method is called when the client is disconnected
print("Disconnected from MQTT Broker!")


def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))


def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))


def on_battery_msg(client, topic, message):
# Method called when device/batteryLife has a new value
print("Battery level: {}v".format(message))

# client.remove_topic_callback("device/batteryLevel")


def on_message(client, topic, message):
# Method callled when a client's subscribed feed has a new value.
print("New message on topic {0}: {1}".format(topic, message))


# Connect to WiFi
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")

MQTT.set_socket(socket, esp)

# Set up a MiniMQTT Client
client = MQTT.MQTT(broker=secrets["broker"], port=secrets["broker_port"])

# Setup the callback methods above
client.on_connect = connected
client.on_disconnect = disconnected
client.on_subscribe = subscribe
client.on_unsubscribe = unsubscribe
client.on_message = on_message
client.add_topic_callback("device/batteryLevel", on_battery_msg)

# Connect the client to the MQTT broker.
print("Connecting to MQTT broker...")
client.connect()

# Subscribe to all notifications on the device/ topic
client.subscribe("device/#", 1)

# Start a blocking message loop...
# NOTE: NO code below this loop will execute
while True:
try:
client.loop()
except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
wifi.reset()
client.reconnect()
continue
time.sleep(1)