@@ -149,6 +149,7 @@ def __init__(
149149 self .logger .setLevel (logging .INFO )
150150 self ._sock = None
151151 self ._is_connected = False
152+ self ._pending_ping_response = False
152153 self ._msg_size_lim = MQTT_MSG_SZ_LIM
153154 self ._pid = 0
154155 self ._timestamp = 0
@@ -175,6 +176,35 @@ def __enter__(self):
175176 def __exit__ (self , exception_type , exception_value , traceback ):
176177 self .deinit ()
177178
179+ def _sock_exact_recv (self , bufsize ):
180+ """Reads _exact_ number of bytes from the connected socket. Will only return
181+ string with the exact number of bytes requested.
182+
183+ The semantics of native socket receive is that it returns no more than the
184+ specified number of bytes (i.e. max size). However, it makes no guarantees in
185+ terms of the minimum size of the buffer, which could be 1 byte. This is a
186+ wrapper for socket recv() to ensure that no less than the expected number of
187+ bytes is returned or trigger a timeout exception.
188+
189+ :param int bufsize: number of bytes to receive
190+ """
191+ stamp = time .monotonic ()
192+ rc = self ._sock .recv (bufsize )
193+ to_read = bufsize - len (rc )
194+ assert to_read >= 0
195+ read_timeout = min (self .keep_alive , self ._sock ._timeout )
196+ while to_read > 0 :
197+ recv = self ._sock .recv (to_read )
198+ to_read -= len (recv )
199+ rc += recv
200+ if time .monotonic () - stamp > read_timeout :
201+ raise MMQTTException (
202+ "Unable to receive {} bytes within {} seconds." .format (
203+ to_read , read_timeout
204+ )
205+ )
206+ return rc
207+
178208 def deinit (self ):
179209 """De-initializes the MQTT client and disconnects from the mqtt broker."""
180210 self .disconnect ()
@@ -351,7 +381,7 @@ def connect(self, clean_session=True):
351381 while True :
352382 op = self ._wait_for_msg ()
353383 if op == 32 :
354- rc = self ._sock . recv (3 )
384+ rc = self ._sock_exact_recv (3 )
355385 assert rc [0 ] == 0x02
356386 if rc [2 ] != 0x00 :
357387 raise MMQTTException (CONNACK_ERRORS [rc [2 ]])
@@ -366,12 +396,16 @@ def disconnect(self):
366396 self .is_connected ()
367397 if self .logger is not None :
368398 self .logger .debug ("Sending DISCONNECT packet to broker" )
369- self ._sock .send (MQTT_DISCONNECT )
399+ try :
400+ self ._sock .send (MQTT_DISCONNECT )
401+ except RuntimeError as e :
402+ if self .logger :
403+ self .logger .warning ("Unable to send DISCONNECT packet: {}" .format (e ))
370404 if self .logger is not None :
371405 self .logger .debug ("Closing socket" )
372406 self ._sock .close ()
373407 self ._is_connected = False
374- self ._subscribed_topics = None
408+ self ._subscribed_topics = []
375409 if self .on_disconnect is not None :
376410 self .on_disconnect (self , self .user_data , 0 )
377411
@@ -380,18 +414,15 @@ def ping(self):
380414 there is an active network connection.
381415 """
382416 self .is_connected ()
417+ if self ._pending_ping_response :
418+ self ._pending_ping_response = False
419+ raise MMQTTException ("Ping response was pending from previous MQTT_PINGREQ" )
383420 if self .logger is not None :
384421 self .logger .debug ("Sending PINGREQ" )
385422 self ._sock .send (MQTT_PINGREQ )
386- if self .logger is not None :
387- self .logger .debug ("Checking PINGRESP" )
388- while True :
389- op = self ._wait_for_msg (0.5 )
390- if op == 208 :
391- ping_resp = self ._sock .recv (2 )
392- if ping_resp [0 ] != 0x00 :
393- raise MMQTTException ("PINGRESP not returned from broker." )
394- return
423+ # Set pending ping response. It will be checked upon next ping and
424+ # assumed to be cleared via _wait_for_msg()
425+ self ._pending_ping_response = True
395426
396427 # pylint: disable=too-many-branches, too-many-statements
397428 def publish (self , topic , msg , retain = False , qos = 0 ):
@@ -486,9 +517,9 @@ def publish(self, topic, msg, retain=False, qos=0):
486517 while True :
487518 op = self ._wait_for_msg ()
488519 if op == 0x40 :
489- sz = self ._sock . recv (1 )
520+ sz = self ._sock_exact_recv (1 )
490521 assert sz == b"\x02 "
491- rcv_pid = self ._sock . recv (2 )
522+ rcv_pid = self ._sock_exact_recv (2 )
492523 rcv_pid = rcv_pid [0 ] << 0x08 | rcv_pid [1 ]
493524 if pid == rcv_pid :
494525 if self .on_publish is not None :
@@ -571,7 +602,7 @@ def subscribe(self, topic, qos=0):
571602 while True :
572603 op = self ._wait_for_msg ()
573604 if op == 0x90 :
574- rc = self ._sock . recv (4 )
605+ rc = self ._sock_exact_recv (4 )
575606 assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
576607 if rc [3 ] == 0x80 :
577608 raise MMQTTException ("SUBACK Failure!" )
@@ -634,7 +665,7 @@ def unsubscribe(self, topic):
634665 while True :
635666 op = self ._wait_for_msg ()
636667 if op == 176 :
637- return_code = self ._sock . recv (3 )
668+ return_code = self ._sock_exact_recv (3 )
638669 assert return_code [0 ] == 0x02
639670 # [MQTT-3.32]
640671 assert (
@@ -694,24 +725,32 @@ def _wait_for_msg(self, timeout=30):
694725 res = self ._sock .recv (1 )
695726 self ._sock .settimeout (timeout )
696727 if res in [None , b"" ]:
728+ # If we get here, it means that there is nothing to be received
697729 return None
698- if res == MQTT_PINGRESP :
699- sz = self ._sock .recv (1 )[0 ]
700- assert sz == 0
730+ if res [0 ] == MQTT_PINGRESP :
731+ if self .logger :
732+ self .logger .debug ("Checking PINGRESP" )
733+ sz = self ._sock_exact_recv (1 )[0 ]
734+ if sz != 0x00 :
735+ raise MMQTTException (
736+ "Unexpected PINGRESP returned from broker: {}." .format (sz )
737+ )
738+ # Ping response is no longer pending
739+ self ._pending_ping_response = False
701740 return None
702741 if res [0 ] & 0xF0 != 0x30 :
703742 return res [0 ]
704743 sz = self ._recv_len ()
705- topic_len = self ._sock . recv (2 )
744+ topic_len = self ._sock_exact_recv (2 )
706745 topic_len = (topic_len [0 ] << 8 ) | topic_len [1 ]
707- topic = self ._sock . recv (topic_len )
746+ topic = self ._sock_exact_recv (topic_len )
708747 topic = str (topic , "utf-8" )
709748 sz -= topic_len + 2
710749 if res [0 ] & 0x06 :
711- pid = self ._sock . recv (2 )
750+ pid = self ._sock_exact_recv (2 )
712751 pid = pid [0 ] << 0x08 | pid [1 ]
713752 sz -= 0x02
714- msg = self ._sock . recv (sz )
753+ msg = self ._sock_exact_recv (sz )
715754 self ._handle_on_message (self , topic , str (msg , "utf-8" ))
716755 if res [0 ] & 0x06 == 0x02 :
717756 pkt = bytearray (b"\x40 \x02 \0 \0 " )
@@ -725,7 +764,7 @@ def _recv_len(self):
725764 n = 0
726765 sh = 0
727766 while True :
728- b = self ._sock . recv (1 )[0 ]
767+ b = self ._sock_exact_recv (1 )[0 ]
729768 n |= (b & 0x7F ) << sh
730769 if not b & 0x80 :
731770 return n
0 commit comments