Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions API_changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
PyModbus - API changes.
=======================

-------------
Version 3.2.0
-------------
- StartAsync<type>Server, removed defer_start argument, return is None.
instead of using defer_start instantiate the Modbus<type>Server directly.

-------------
Version 3.1.0
-------------
Expand Down
25 changes: 13 additions & 12 deletions pymodbus/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,11 @@ async def execute(self, request=None): # pylint: disable=invalid-overridden-met
if self.params.broadcast_enable and not request.unit_id:
resp = b"Broadcast write sent - no response expected"
else:
resp = await asyncio.wait_for(req, timeout=self.params.timeout)
try:
resp = await asyncio.wait_for(req, timeout=self.params.timeout)
except asyncio.exceptions.TimeoutError:
self.connection_lost("trying to send")
raise
return resp

def connection_made(self, transport):
Expand All @@ -309,30 +313,27 @@ def connection_made(self, transport):
self._connection_made()

if self.factory:
self.factory.protocol_made_connection( # pylint: disable=no-member,useless-suppression
self
)
self.factory.protocol_made_connection(self) # pylint: disable=no-member

async def close(self): # pylint: disable=invalid-overridden-method
"""Close connection."""
if self.transport:
self.transport.close()
while self.transport is not None:
await asyncio.sleep(0.1)
self._connected = False

def connection_lost(self, reason):
"""Call when the connection is lost or closed.

The argument is either an exception object or None
"""
self.transport = None
self._connection_lost(reason)

if self.transport:
self.transport.abort()
if hasattr(self.transport, "_sock"):
self.transport._sock.close() # pylint: disable=protected-access
self.transport = None
if self.factory:
self.factory.protocol_lost_connection( # pylint: disable=no-member,useless-suppression
self
)
self.factory.protocol_lost_connection(self) # pylint: disable=no-member
self._connection_lost(reason)

def data_received(self, data):
"""Call when some data is received.
Expand Down
50 changes: 38 additions & 12 deletions pymodbus/client/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(
self.params.handle_local_echo = handle_local_echo
self.loop = None
self._connected_event = asyncio.Event()
self._reconnect_task = None

async def close(self): # pylint: disable=invalid-overridden-method
"""Stop connection."""
Expand All @@ -81,8 +82,14 @@ async def close(self): # pylint: disable=invalid-overridden-method
self.protocol.transport.close()
if self.protocol:
await self.protocol.close()
self.protocol = None
await asyncio.sleep(0.1)

# if there is an unfinished delayed reconnection attempt pending, cancel it
if self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None

def _create_protocol(self):
"""Create protocol."""
protocol = ModbusClientProtocol(
Expand Down Expand Up @@ -118,6 +125,8 @@ async def connect(self): # pylint: disable=invalid-overridden-method
Log.info("Connected to {}", self.params.port)
except Exception as exc: # pylint: disable=broad-except
Log.warning("Failed to connect: {}", exc)
if self.delay_ms > 0:
self._launch_reconnect()
return self.connected

def protocol_made_connection(self, protocol):
Expand All @@ -131,19 +140,36 @@ def protocol_made_connection(self, protocol):

def protocol_lost_connection(self, protocol):
"""Notify lost connection."""
if self.connected:
Log.info("Serial lost connection.")
if protocol is not self.protocol:
Log.error("Serial: protocol is not self.protocol.")

self._connected_event.clear()
if self.protocol is not None:
del self.protocol
self.protocol = None
# if self.host:
# asyncio.asynchronous(self._reconnect())
Log.info("Serial lost connection.")
if protocol is not self.protocol:
Log.error("Serial: protocol is not self.protocol.")

self._connected_event.clear()
if self.protocol is not None:
del self.protocol
self.protocol = None
if self.delay_ms:
self._launch_reconnect()

def _launch_reconnect(self):
"""Launch delayed reconnection coroutine"""
if self._reconnect_task:
Log.warning(
"Ignoring launch of delayed reconnection, another is in progress"
)
else:
Log.error("Serial, lost_connection but not connected.")
# store the future in a member variable so we know we have a pending reconnection attempt
# also prevents its garbage collection
self._reconnect_task = asyncio.create_task(self._reconnect())

async def _reconnect(self):
"""Reconnect."""
Log.debug("Waiting {} ms before next connection attempt.", self.delay_ms)
await asyncio.sleep(self.delay_ms / 1000)
self.delay_ms = min(2 * self.delay_ms, self.params.reconnect_delay_max)

self._reconnect_task = None
return await self.connect()


class ModbusSerialClient(ModbusBaseClient):
Expand Down
23 changes: 10 additions & 13 deletions pymodbus/client/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
self.loop = None
self.connected = False
self.delay_ms = self.params.reconnect_delay
self._reconnect_future = None
self._reconnect_task = None

async def connect(self): # pylint: disable=invalid-overridden-method
"""Initiate connection to start client."""
Expand All @@ -70,21 +70,20 @@ async def connect(self): # pylint: disable=invalid-overridden-method

async def close(self): # pylint: disable=invalid-overridden-method
"""Stop client."""

# if there is an unfinished delayed reconnection attempt pending, cancel it
if self._reconnect_future:
self._reconnect_future.cancel()
self._reconnect_future = None

# prevent reconnect:
self.delay_ms = 0
if self.connected:
if self.protocol.transport:
self.protocol.transport.abort()
self.protocol.transport.close()
if self.protocol:
await self.protocol.close()
self.protocol = None
await asyncio.sleep(0.1)

if self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None

def _create_protocol(self):
"""Create initialized protocol instance with factory function."""
protocol = ModbusClientProtocol(
Expand Down Expand Up @@ -156,22 +155,20 @@ def protocol_lost_connection(self, protocol):

def _launch_reconnect(self):
"""Launch delayed reconnection coroutine"""
if self._reconnect_future:
if self._reconnect_task:
Log.warning(
"Ignoring launch of delayed reconnection, another is in progress"
)
else:
# store the future in a member variable so we know we have a pending reconnection attempt
# also prevents its garbage collection
self._reconnect_future = asyncio.ensure_future(self._reconnect())
self._reconnect_task = asyncio.create_task(self._reconnect())

async def _reconnect(self):
"""Reconnect."""
Log.debug("Waiting {} ms before next connection attempt.", self.delay_ms)
await asyncio.sleep(self.delay_ms / 1000)
self.delay_ms = min(2 * self.delay_ms, self.params.reconnect_delay_max)

self._reconnect_future = None
self._reconnect_task = None
return await self._connect()


Expand Down
3 changes: 1 addition & 2 deletions pymodbus/client/tls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Modbus client async TLS communication."""
import asyncio
import socket
import ssl

Expand Down Expand Up @@ -101,7 +100,7 @@ async def _connect(self):
except Exception as exc: # pylint: disable=broad-except
Log.warning("Failed to connect: {}", exc)
if self.delay_ms > 0:
asyncio.ensure_future(self._reconnect())
self._launch_reconnect()
return
Log.info("Connected to {}:{}.", self.params.host, self.params.port)
self.reset_delay()
Expand Down
50 changes: 27 additions & 23 deletions pymodbus/client/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,18 @@ def __init__(
self.params.host = host
self.params.port = port
self.params.source_address = source_address

self._reconnect_task = None
self.loop = asyncio.get_event_loop()
self.connected = False
self.delay_ms = self.params.reconnect_delay
self._reconnect_task = None
self.reset_delay()

async def connect(self): # pylint: disable=invalid-overridden-method
"""Start reconnecting asynchronous udp client.

:meta private:
"""
# force reconnect if required:
host = self.params.host
await self.close()
self.params.host = host

# get current loop, if there are no loop a RuntimeError will be raised
self.loop = asyncio.get_running_loop()
Log.debug("Connecting to {}:{}.", self.params.host, self.params.port)
Expand All @@ -83,15 +79,20 @@ async def close(self): # pylint: disable=invalid-overridden-method

:meta private:
"""
# prevent reconnect:
self.delay_ms = 0
if self.connected:
if self.protocol.transport:
self.protocol.transport.abort()
self.protocol.transport.close()
if self.protocol:
await self.protocol.close()
self.protocol = None
await asyncio.sleep(0.1)

if self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None

def _create_protocol(self, host=None, port=0):
"""Create initialized protocol instance with factory function."""
protocol = ModbusClientProtocol(
Expand Down Expand Up @@ -127,7 +128,7 @@ async def _connect(self):
return endpoint
except Exception as exc: # pylint: disable=broad-except
Log.warning("Failed to connect: {}", exc)
asyncio.ensure_future(self._reconnect())
self._reconnect_task = asyncio.ensure_future(self._reconnect())

def protocol_made_connection(self, protocol):
"""Notify successful connection.
Expand All @@ -146,22 +147,25 @@ def protocol_lost_connection(self, protocol):

:meta private:
"""
if self.connected:
Log.info("Protocol lost connection.")
if protocol is not self.protocol:
Log.error(
"Factory protocol callback called "
"from unexpected protocol instance."
)

self.connected = False
if self.protocol is not None:
del self.protocol
self.protocol = None
if self.delay_ms > 0:
asyncio.create_task(self._reconnect())
Log.info("Protocol lost connection.")
if protocol is not self.protocol:
Log.error("Factory protocol cb from unexpected protocol instance.")

self.connected = False
if self.protocol is not None:
del self.protocol
self.protocol = None
if self.delay_ms > 0:
self._launch_reconnect()

def _launch_reconnect(self):
"""Launch delayed reconnection coroutine"""
if self._reconnect_task:
Log.warning(
"Ignoring launch of delayed reconnection, another is in progress"
)
else:
Log.error("Factory protocol connect callback called while connected.")
self._reconnect_task = asyncio.create_task(self._reconnect())

async def _reconnect(self):
"""Reconnect."""
Expand Down
Loading