Skip to content

Commit c75f97c

Browse files
SOCKS5: support looking up names remotely (#2666)
1 parent 4197302 commit c75f97c

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

kafka/conn.py

100644100755
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,9 @@ def _dns_lookup(self):
326326
return True
327327

328328
def _next_afi_sockaddr(self):
329+
if self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]):
330+
return (socket.AF_UNSPEC, (self.host, self.port))
331+
329332
if not self._gai:
330333
if not self._dns_lookup():
331334
return
@@ -379,6 +382,7 @@ def connect(self):
379382
self._sock_afi, self._sock_addr = next_lookup
380383
try:
381384
if self.config["socks5_proxy"] is not None:
385+
log.debug('%s: initializing Socks5 proxy at %s', self, self.config["socks5_proxy"])
382386
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
383387
self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
384388
else:
@@ -864,6 +868,8 @@ def connection_delay(self):
864868
if self.disconnected() or self.connecting():
865869
if len(self._gai) > 0:
866870
return 0
871+
elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]):
872+
return 0
867873
else:
868874
time_waited = time.time() - self.last_attempt
869875
return max(self._reconnect_backoff - time_waited, 0) * 1000
@@ -964,6 +970,7 @@ def close(self, error=None):
964970
# the socket fd from selectors cleanly.
965971
sock = self._sock
966972
self._sock = None
973+
self._socks5_proxy = None
967974

968975
# drop lock before state change callback and processing futures
969976
self.config['state_change_callback'](self.node_id, sock, self)

kafka/socks5_wrapper.py

100644100755
Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC):
6464
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex)
6565
return []
6666

67+
@classmethod
68+
def use_remote_lookup(cls, proxy_url):
69+
if proxy_url is None:
70+
return False
71+
return urlparse(proxy_url).scheme == 'socks5h'
72+
73+
def _use_remote_lookup(self):
74+
return self._proxy_url.scheme == 'socks5h'
75+
6776
def socket(self, family, sock_type):
6877
"""Open and record a socket.
6978
@@ -187,7 +196,10 @@ def connect_ex(self, addr):
187196
return errno.ECONNREFUSED
188197

189198
if self._state == ProxyConnectionStates.REQUEST_SUBMIT:
190-
if self._target_afi == socket.AF_INET:
199+
if self._use_remote_lookup():
200+
addr_type = 3
201+
addr_len = len(addr[0])
202+
elif self._target_afi == socket.AF_INET:
191203
addr_type = 1
192204
addr_len = 4
193205
elif self._target_afi == socket.AF_INET6:
@@ -200,14 +212,28 @@ def connect_ex(self, addr):
200212
return errno.ECONNREFUSED
201213

202214
self._buffer_out = struct.pack(
203-
"!bbbb{}sh".format(addr_len),
215+
"!bbbb",
204216
5, # version
205217
1, # command: connect
206218
0, # reserved
207-
addr_type, # 1 for ipv4, 4 for ipv6 address
208-
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
209-
addr[1], # port
219+
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
210220
)
221+
# Addr format depends on type
222+
if addr_type == 3:
223+
# len + domain name (no null terminator)
224+
self._buffer_out += struct.pack(
225+
"!b{}s".format(addr_len),
226+
addr_len,
227+
addr[0].encode('ascii'),
228+
)
229+
else:
230+
# either 4 (type 1) or 16 (type 4) bytes of actual address
231+
self._buffer_out += struct.pack(
232+
"!{}s".format(addr_len),
233+
socket.inet_pton(self._target_afi, addr[0]),
234+
)
235+
self._buffer_out += struct.pack("!H", addr[1]) # port
236+
211237
self._state = ProxyConnectionStates.REQUESTING
212238

213239
if self._state == ProxyConnectionStates.REQUESTING:

0 commit comments

Comments
 (0)