@@ -466,6 +466,7 @@ def __init__(
466466 read_from_replicas : bool = False ,
467467 dynamic_startup_nodes : bool = True ,
468468 url : Optional [str ] = None ,
469+ host_port_remap : Optional [Callable [[str , int ], Tuple [str , int ]]] = None ,
469470 ** kwargs ,
470471 ):
471472 """
@@ -594,6 +595,7 @@ def __init__(
594595 from_url = from_url ,
595596 require_full_coverage = require_full_coverage ,
596597 dynamic_startup_nodes = dynamic_startup_nodes ,
598+ host_port_remap = host_port_remap ,
597599 ** kwargs ,
598600 )
599601
@@ -1269,6 +1271,7 @@ def __init__(
12691271 lock = None ,
12701272 dynamic_startup_nodes = True ,
12711273 connection_pool_class = ConnectionPool ,
1274+ host_port_remap : Optional [Callable [[str , int ], Tuple [str , int ]]] = None ,
12721275 ** kwargs ,
12731276 ):
12741277 self .nodes_cache = {}
@@ -1280,6 +1283,7 @@ def __init__(
12801283 self ._require_full_coverage = require_full_coverage
12811284 self ._dynamic_startup_nodes = dynamic_startup_nodes
12821285 self .connection_pool_class = connection_pool_class
1286+ self .host_port_remap = host_port_remap
12831287 self ._moved_exception = None
12841288 self .connection_kwargs = kwargs
12851289 self .read_load_balancer = LoadBalancer ()
@@ -1502,6 +1506,7 @@ def initialize(self):
15021506 if host == "" :
15031507 host = startup_node .host
15041508 port = int (primary_node [1 ])
1509+ host , port = self .remap_host_port (host , port )
15051510
15061511 target_node = self ._get_or_create_cluster_node (
15071512 host , port , PRIMARY , tmp_nodes_cache
@@ -1518,6 +1523,7 @@ def initialize(self):
15181523 for replica_node in replica_nodes :
15191524 host = str_if_bytes (replica_node [0 ])
15201525 port = replica_node [1 ]
1526+ host , port = self .remap_host_port (host , port )
15211527
15221528 target_replica_node = self ._get_or_create_cluster_node (
15231529 host , port , REPLICA , tmp_nodes_cache
@@ -1591,6 +1597,16 @@ def reset(self):
15911597 # The read_load_balancer is None, do nothing
15921598 pass
15931599
1600+ def remap_host_port (self , host : str , port : int ) -> Tuple [str , int ]:
1601+ """
1602+ Remap the host and port returned from the cluster to a different
1603+ internal value. Useful if the client is not connecting directly
1604+ to the cluster.
1605+ """
1606+ if self .host_port_remap :
1607+ return self .host_port_remap (host , port )
1608+ return host , port
1609+
15941610
15951611class ClusterPubSub (PubSub ):
15961612 """
0 commit comments