47
47
# MQTT Commands
48
48
MQTT_PINGREQ = b"\xc0 \0 "
49
49
MQTT_PINGRESP = const (0xD0 )
50
+ MQTT_PUBLISH = const (0x30 )
50
51
MQTT_SUB = b"\x82 "
51
52
MQTT_UNSUB = b"\xA2 "
52
53
MQTT_DISCONNECT = b"\xe0 \0 "
53
54
55
+ MQTT_PKT_TYPE_MASK = const (0xF0 )
56
+
54
57
# Variable CONNECT header [MQTT 3.1.2]
55
58
MQTT_HDR_CONNECT = bytearray (b"\x04 MQTT\x04 \x02 \0 \0 " )
56
59
@@ -210,7 +213,6 @@ def __init__(
210
213
# LWT
211
214
self ._lw_topic = None
212
215
self ._lw_qos = 0
213
- self ._lw_topic = None
214
216
self ._lw_msg = None
215
217
self ._lw_retain = False
216
218
@@ -628,7 +630,7 @@ def publish(self, topic, msg, retain=False, qos=0):
628
630
), "Quality of Service Level 2 is unsupported by this library."
629
631
630
632
# fixed header. [3.3.1.2], [3.3.1.3]
631
- pub_hdr_fixed = bytearray ([0x30 | retain | qos << 1 ])
633
+ pub_hdr_fixed = bytearray ([MQTT_PUBLISH | retain | qos << 1 ])
632
634
633
635
# variable header = 2-byte Topic length (big endian)
634
636
pub_hdr_var = bytearray (struct .pack (">H" , len (topic .encode ("utf-8" ))))
@@ -877,7 +879,9 @@ def loop(self, timeout=0):
877
879
def _wait_for_msg (self , timeout = 0.1 ):
878
880
# pylint: disable = too-many-return-statements
879
881
880
- """Reads and processes network events."""
882
+ """Reads and processes network events.
883
+ Return the packet type or None if there is nothing to be received.
884
+ """
881
885
# CPython socket module contains a timeout attribute
882
886
if hasattr (self ._socket_pool , "timeout" ):
883
887
try :
@@ -898,7 +902,7 @@ def _wait_for_msg(self, timeout=0.1):
898
902
if res in [None , b"" , b"\x00 " ]:
899
903
# If we get here, it means that there is nothing to be received
900
904
return None
901
- if res [0 ] == MQTT_PINGRESP :
905
+ if res [0 ] & MQTT_PKT_TYPE_MASK == MQTT_PINGRESP :
902
906
if self .logger is not None :
903
907
self .logger .debug ("Got PINGRESP" )
904
908
sz = self ._sock_exact_recv (1 )[0 ]
@@ -907,12 +911,21 @@ def _wait_for_msg(self, timeout=0.1):
907
911
"Unexpected PINGRESP returned from broker: {}." .format (sz )
908
912
)
909
913
return MQTT_PINGRESP
910
- if res [0 ] & 0xF0 != 0x30 :
914
+
915
+ if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
911
916
return res [0 ]
917
+
918
+ # Handle only the PUBLISH packet type from now on.
912
919
sz = self ._recv_len ()
913
920
# topic length MSB & LSB
914
921
topic_len = self ._sock_exact_recv (2 )
915
922
topic_len = (topic_len [0 ] << 8 ) | topic_len [1 ]
923
+
924
+ if topic_len > sz - 2 :
925
+ raise MMQTTException (
926
+ f"Topic length { topic_len } in PUBLISH packet exceeds remaining length { sz } - 2"
927
+ )
928
+
916
929
topic = self ._sock_exact_recv (topic_len )
917
930
topic = str (topic , "utf-8" )
918
931
sz -= topic_len + 2
@@ -921,12 +934,13 @@ def _wait_for_msg(self, timeout=0.1):
921
934
pid = self ._sock_exact_recv (2 )
922
935
pid = pid [0 ] << 0x08 | pid [1 ]
923
936
sz -= 0x02
937
+
924
938
# read message contents
925
939
raw_msg = self ._sock_exact_recv (sz )
926
940
msg = raw_msg if self ._use_binary_mode else str (raw_msg , "utf-8" )
927
941
if self .logger is not None :
928
942
self .logger .debug (
929
- "Receiving SUBSCRIBE \n Topic: %s\n Msg: %s\n " , topic , raw_msg
943
+ "Receiving PUBLISH \n Topic: %s\n Msg: %s\n " , topic , raw_msg
930
944
)
931
945
self ._handle_on_message (self , topic , msg )
932
946
if res [0 ] & 0x06 == 0x02 :
@@ -935,6 +949,7 @@ def _wait_for_msg(self, timeout=0.1):
935
949
self ._sock .send (pkt )
936
950
elif res [0 ] & 6 == 4 :
937
951
assert 0
952
+
938
953
return res [0 ]
939
954
940
955
def _recv_len (self ):
0 commit comments