Skip to content

Commit 58e2ab4

Browse files
committed
Merge pull request #473 from ecanzonieri/use_unblocking_io_for_aware_requests
Use unblocking io for broker aware requests
2 parents cdcaea6 + c2adeea commit 58e2ab4

File tree

3 files changed

+52
-18
lines changed

3 files changed

+52
-18
lines changed

kafka/client.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import copy
33
import functools
44
import logging
5+
import select
56
import time
67

78
import kafka.common
@@ -177,6 +178,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
177178
# For each broker, send the list of request payloads
178179
# and collect the responses and errors
179180
broker_failures = []
181+
182+
# For each KafkaConnection keep the real socket so that we can use
183+
# a select to perform unblocking I/O
184+
connections_by_socket = {}
180185
for broker, payloads in payloads_by_broker.items():
181186
requestId = self._next_id()
182187
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -210,27 +215,34 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
210215
topic_partition = (payload.topic, payload.partition)
211216
responses[topic_partition] = None
212217
continue
218+
else:
219+
connections_by_socket[conn.get_connected_socket()] = (conn, broker)
213220

214-
try:
215-
response = conn.recv(requestId)
216-
except ConnectionError as e:
217-
broker_failures.append(broker)
218-
log.warning('ConnectionError attempting to receive a '
219-
'response to request %s from server %s: %s',
220-
requestId, broker, e)
221+
conn = None
222+
while connections_by_socket:
223+
sockets = connections_by_socket.keys()
224+
rlist, _, _ = select.select(sockets, [], [], None)
225+
conn, broker = connections_by_socket.pop(rlist[0])
226+
try:
227+
response = conn.recv(requestId)
228+
except ConnectionError as e:
229+
broker_failures.append(broker)
230+
log.warning('ConnectionError attempting to receive a '
231+
'response to request %s from server %s: %s',
232+
requestId, broker, e)
221233

222-
for payload in payloads:
223-
topic_partition = (payload.topic, payload.partition)
224-
responses[topic_partition] = FailedPayloadsError(payload)
234+
for payload in payloads_by_broker[broker]:
235+
topic_partition = (payload.topic, payload.partition)
236+
responses[topic_partition] = FailedPayloadsError(payload)
225237

226-
else:
227-
_resps = []
228-
for payload_response in decoder_fn(response):
229-
topic_partition = (payload_response.topic,
230-
payload_response.partition)
231-
responses[topic_partition] = payload_response
232-
_resps.append(payload_response)
233-
log.debug('Response %s: %s', requestId, _resps)
238+
else:
239+
_resps = []
240+
for payload_response in decoder_fn(response):
241+
topic_partition = (payload_response.topic,
242+
payload_response.partition)
243+
responses[topic_partition] = payload_response
244+
_resps.append(payload_response)
245+
log.debug('Response %s: %s', requestId, _resps)
234246

235247
# Connection errors generally mean stale metadata
236248
# although sometimes it means incorrect api request

kafka/conn.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ def _read_bytes(self, num_bytes):
118118

119119
# TODO multiplex socket communication to allow for multi-threaded clients
120120

121+
def get_connected_socket(self):
122+
if not self._sock:
123+
self.reinit()
124+
return self._sock
125+
121126
def send(self, request_id, payload):
122127
"""
123128
Send a request to Kafka

test/test_conn.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ def test_recv__doesnt_consume_extra_data_in_stream(self):
165165
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
166166
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
167167

168+
def test_get_connected_socket(self):
169+
s = self.conn.get_connected_socket()
170+
171+
self.assertEqual(s, self.MockCreateConn())
172+
173+
def test_get_connected_socket_on_dirty_conn(self):
174+
# Dirty the connection
175+
try:
176+
self.conn._raise_connection_error()
177+
except ConnectionError:
178+
pass
179+
180+
# Test that get_connected_socket tries to connect
181+
self.assertEqual(self.MockCreateConn.call_count, 0)
182+
self.conn.get_connected_socket()
183+
self.assertEqual(self.MockCreateConn.call_count, 1)
184+
168185
def test_close__object_is_reusable(self):
169186

170187
# test that sending to a closed connection

0 commit comments

Comments
 (0)