@@ -834,14 +834,16 @@ def reconnect(self, resub_topics=True):
834
834
feed = subscribed_topics .pop ()
835
835
self .subscribe (feed )
836
836
837
- def loop (self , timeout = 1 ):
837
+ def loop (self , timeout = 0 ):
838
+ # pylint: disable = too-many-return-statements
838
839
"""Non-blocking message loop. Use this method to
839
840
check incoming subscription messages.
840
841
Returns response codes of any messages received.
841
842
842
843
:param int timeout: Socket timeout, in seconds.
843
844
844
845
"""
846
+
845
847
if self ._timestamp == 0 :
846
848
self ._timestamp = time .monotonic ()
847
849
current_time = time .monotonic ()
@@ -854,11 +856,28 @@ def loop(self, timeout=1):
854
856
)
855
857
rcs = self .ping ()
856
858
return rcs
859
+
860
+ stamp = time .monotonic ()
857
861
self ._sock .settimeout (timeout )
858
- rc = self ._wait_for_msg ()
859
- return [rc ] if rc else None
862
+ rcs = []
863
+
864
+ while True :
865
+ rc = self ._wait_for_msg (timeout )
866
+ if rc is None :
867
+ break
868
+ if time .monotonic () - stamp > self ._recv_timeout :
869
+ if self .logger is not None :
870
+ self .logger .debug (
871
+ f"Loop timed out, message queue not empty after { self ._recv_timeout } s"
872
+ )
873
+ break
874
+ rcs .append (rc )
875
+
876
+ return rcs if rcs else None
860
877
861
878
def _wait_for_msg (self , timeout = 0.1 ):
879
+ # pylint: disable = too-many-return-statements
880
+
862
881
"""Reads and processes network events."""
863
882
# CPython socket module contains a timeout attribute
864
883
if hasattr (self ._socket_pool , "timeout" ):
0 commit comments