@@ -1047,7 +1047,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
10471047 rcs = []
10481048
10491049 while True :
1050- rc = self ._wait_for_msg ()
1050+ rc = self ._wait_for_msg (timeout = timeout )
10511051 if rc is not None :
10521052 rcs .append (rc )
10531053 if self .get_monotonic_time () - stamp > timeout :
@@ -1056,11 +1056,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
10561056
10571057 return rcs if rcs else None
10581058
1059- def _wait_for_msg (self ) -> Optional [int ]:
1059+ def _wait_for_msg (self , timeout : Optional [ float ] = None ) -> Optional [int ]:
10601060 # pylint: disable = too-many-return-statements
10611061
10621062 """Reads and processes network events.
10631063 Return the packet type or None if there is nothing to be received.
1064+
1065+ :param float timeout: return after this timeout, in seconds.
10641066 """
10651067 # CPython socket module contains a timeout attribute
10661068 if hasattr (self ._socket_pool , "timeout" ):
@@ -1070,7 +1072,7 @@ def _wait_for_msg(self) -> Optional[int]:
10701072 return None
10711073 else : # socketpool, esp32spi
10721074 try :
1073- res = self ._sock_exact_recv (1 )
1075+ res = self ._sock_exact_recv (1 , timeout = timeout )
10741076 except OSError as error :
10751077 if error .errno in (errno .ETIMEDOUT , errno .EAGAIN ):
10761078 # raised by a socket timeout if 0 bytes were present
@@ -1139,7 +1141,9 @@ def _decode_remaining_length(self) -> int:
11391141 return n
11401142 sh += 7
11411143
1142- def _sock_exact_recv (self , bufsize : int ) -> bytearray :
1144+ def _sock_exact_recv (
1145+ self , bufsize : int , timeout : Optional [float ] = None
1146+ ) -> bytearray :
11431147 """Reads _exact_ number of bytes from the connected socket. Will only return
11441148 bytearray with the exact number of bytes requested.
11451149
@@ -1150,6 +1154,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
11501154 bytes is returned or trigger a timeout exception.
11511155
11521156 :param int bufsize: number of bytes to receive
1157+ :param float timeout: timeout, in seconds. Defaults to keep_alive
11531158 :return: byte array
11541159 """
11551160 stamp = self .get_monotonic_time ()
@@ -1161,7 +1166,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
11611166 to_read = bufsize - recv_len
11621167 if to_read < 0 :
11631168 raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1164- read_timeout = self .keep_alive
1169+ read_timeout = timeout if timeout is not None else self .keep_alive
11651170 mv = mv [recv_len :]
11661171 while to_read > 0 :
11671172 recv_len = self ._sock .recv_into (mv , to_read )
0 commit comments