Skip to content

loop() doesn't fetch all messages for a topic #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
calcut opened this issue Apr 12, 2022 · 4 comments
Closed

loop() doesn't fetch all messages for a topic #108

calcut opened this issue Apr 12, 2022 · 4 comments

Comments

@calcut
Copy link
Contributor

calcut commented Apr 12, 2022

Still trying to suss out what is going on here.

At the simplest level, I'm seeing that mqtt_client.loop() usually only fetches a single message from the queue (for a particular topic), even if there are many messages waiting. I assume this is incorrect behaviour

I have seen that sometimes it does a 'catch up' and fetch all of the messages!
Not sure if this is a quirk of AdafruitIO's MQTT broker or this library.

e.g. if I run loop() every 5 seconds, expecting a message from AIO's time/seconds topic

MQTT time: 1649762379
sleep 5
MQTT time: 1649762380
sleep 5
MQTT time: 1649762381
sleep 5
MQTT time: 1649762382
sleep 5
MQTT time: 1649762383
sleep 5
MQTT time: 1649762384
sleep 5
MQTT time: 1649762385
sleep 5
MQTT time: 1649762386
sleep 5
MQTT time: 1649762387
sleep 5
MQTT time: 1649762388
sleep 5
MQTT time: 1649762389
sleep 5
MQTT time: 1649762390 <--------- Here it is very out of sync with the actual time, but then it suddenly does a catch up
sleep 5
MQTT time: 1649762391
MQTT time: 1649762392
MQTT time: 1649762393
MQTT time: 1649762394
....
MQTT time: 1649762436
MQTT time: 1649762437
MQTT time: 1649762438
MQTT time: 1649762439
sleep 5
MQTT time: 1649762440
sleep 5
MQTT time: 1649762441
sleep 5

It looks like if the queue builds up too much, it can cause other issues.
e.g. I'm trying to use MQTT and HTTP requests in the same code, there is an interaction where I see errors like:

Traceback (most recent call last):
  File "ssl_debug.py", line 44, in <module>
  File "adafruit_requests.py", line 752, in get
  File "adafruit_requests.py", line 693, in request
  File "adafruit_requests.py", line 546, in _get_socket
RuntimeError: Sending request failed

Which seem to be associated with the MQTT queue not being cleared

@calcut
Copy link
Contributor Author

calcut commented Apr 12, 2022

Should have said, this is:
Adafruit CircuitPython 7.2.5 on 2022-04-06; Adafruit Feather ESP32S2 with ESP32S2

and my code to reproduce is:

import time
from secrets import secrets
import wifi
import ssl
import socketpool
import adafruit_requests
import adafruit_minimqtt.adafruit_minimqtt as MQTT

def message_callback(client, feed_id, payload):
    print(f"MQTT time: {payload}")


wifi.radio.connect(secrets["ssid"], secrets["password"])
print('Wifi Connected')

pool = socketpool.SocketPool(wifi.radio)
ssl_context=ssl.create_default_context()

# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
    broker="io.adafruit.com",
    username=secrets["aio_username"],
    password=secrets["aio_key"],
    socket_pool=pool,
    ssl_context=ssl_context,
)

mqtt_client.connect()
mqtt_client.on_message = message_callback
mqtt_client.subscribe("time/seconds")

requests = adafruit_requests.Session(pool, ssl_context)
url = "http://worldtimeapi.org/api/timezone/Etc/UTC"

while True:

        mqtt_client.loop()
        time_dict = requests.get(url).json()
        print(f"HTTP time: {time_dict['unixtime']}")
        print('sleep 5')
        time.sleep(5)

@calcut
Copy link
Contributor Author

calcut commented Apr 12, 2022

It might just be "User error" i.e. it is supposed to work like this, and I just need to call loop() more often! But calling it too often can lead to #101

@calcut
Copy link
Contributor Author

calcut commented Sep 2, 2022

my suggestion to "fix" this is having _wait_for_msg() run in a while loop until no response is received.
response codes could still be returned in a list as shown.

e.g.

    def loop(self, timeout=1):
        """Non-blocking message loop. Use this method to
        check incoming subscription messages.
        Returns response codes of any messages received.

        :param int timeout: Socket timeout, in seconds.

        """
        if self._timestamp == 0:
            self._timestamp = time.monotonic()
        current_time = time.monotonic()
        if current_time - self._timestamp >= self.keep_alive:
            self._timestamp = 0
            # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
            if self.logger is not None:
                self.logger.debug(
                    "KeepAlive period elapsed - requesting a PINGRESP from the server..."
                )
            rcs = self.ping()
            return rcs
        self._sock.settimeout(timeout)

        responses = [] 
        while True:
            rc = self._wait_for_msg()
            if rc == None: 
                break
            else:
                responses.append(rc)
        return responses

@vladak
Copy link
Contributor

vladak commented Dec 26, 2022

This issue should be closed given the associated PR #122 was merged in and the base changes addressing this issue went into cf39fbc.

@brentru brentru closed this as completed Apr 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants