Skip to content

Commit 0d21644

Browse files
authored
Fix SASL authentication bugs (dpkp#1257)
* Use _send_bytes_blocking in BrokerConnection * _try_authenticate should call recv() so that futures are resolved * _sasl_auth_future can be reset if recv() causes disconnect * validate sasl_mechanism against SaslHandShakeResponse enabled_mechanisms
1 parent fbbd6ca commit 0d21644

File tree

1 file changed

+34
-23
lines changed

1 file changed

+34
-23
lines changed

kafka/conn.py

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -443,8 +443,11 @@ def _try_authenticate(self):
443443
sasl_response.add_callback(self._handle_sasl_handshake_response, future)
444444
sasl_response.add_errback(lambda f, e: f.failure(e), future)
445445
self._sasl_auth_future = future
446-
self._recv()
447-
if self._sasl_auth_future.failed():
446+
self.recv()
447+
# A connection error could trigger close() which will reset the future
448+
if self._sasl_auth_future is None:
449+
return False
450+
elif self._sasl_auth_future.failed():
448451
ex = self._sasl_auth_future.exception
449452
if not isinstance(ex, Errors.ConnectionError):
450453
raise ex # pylint: disable-msg=raising-bad-type
@@ -457,7 +460,12 @@ def _handle_sasl_handshake_response(self, future, response):
457460
self.close(error=error)
458461
return future.failure(error_type(self))
459462

460-
if self.config['sasl_mechanism'] == 'PLAIN':
463+
if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
464+
return future.failure(
465+
Errors.UnsupportedSaslMechanismError(
466+
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
467+
% (self.config['sasl_mechanism'], response.enabled_mechanisms)))
468+
elif self.config['sasl_mechanism'] == 'PLAIN':
461469
return self._try_authenticate_plain(future)
462470
elif self.config['sasl_mechanism'] == 'GSSAPI':
463471
return self._try_authenticate_gssapi(future)
@@ -467,6 +475,19 @@ def _handle_sasl_handshake_response(self, future, response):
467475
'kafka-python does not support SASL mechanism %s' %
468476
self.config['sasl_mechanism']))
469477

478+
def _send_bytes_blocking(self, data):
479+
self._sock.setblocking(True)
480+
total_sent = 0
481+
try:
482+
while total_sent < len(data):
483+
sent_bytes = self._sock.send(data[total_sent:])
484+
total_sent += sent_bytes
485+
if total_sent != len(data):
486+
raise ConnectionError('Buffer overrun during socket send')
487+
return total_sent
488+
finally:
489+
self._sock.setblocking(False)
490+
470491
def _recv_bytes_blocking(self, n):
471492
self._sock.setblocking(True)
472493
try:
@@ -485,15 +506,13 @@ def _try_authenticate_plain(self, future):
485506
log.warning('%s: Sending username and password in the clear', self)
486507

487508
data = b''
509+
# Send PLAIN credentials per RFC-4616
510+
msg = bytes('\0'.join([self.config['sasl_plain_username'],
511+
self.config['sasl_plain_username'],
512+
self.config['sasl_plain_password']]).encode('utf-8'))
513+
size = Int32.encode(len(msg))
488514
try:
489-
self._sock.setblocking(True)
490-
# Send PLAIN credentials per RFC-4616
491-
msg = bytes('\0'.join([self.config['sasl_plain_username'],
492-
self.config['sasl_plain_username'],
493-
self.config['sasl_plain_password']]).encode('utf-8'))
494-
size = Int32.encode(len(msg))
495-
self._sock.sendall(size + msg)
496-
self._sock.setblocking(False)
515+
self._send_bytes_blocking(size + msg)
497516

498517
# The server will send a zero sized message (that is Int32(0)) on success.
499518
# The connection is closed on failure
@@ -530,11 +549,9 @@ def _try_authenticate_gssapi(self, future):
530549

531550
# pass output token to kafka
532551
try:
533-
self._sock.setblocking(True)
534552
msg = output_token
535553
size = Int32.encode(len(msg))
536-
self._sock.sendall(size + msg)
537-
self._sock.setblocking(False)
554+
self._send_bytes_blocking(size + msg)
538555

539556
# The server will send a token back. Processing of this token either
540557
# establishes a security context, or it needs further token exchange.
@@ -662,16 +679,10 @@ def _send(self, request):
662679
# In the future we might manage an internal write buffer
663680
# and send bytes asynchronously. For now, just block
664681
# sending each request payload
665-
self._sock.setblocking(True)
666-
total_sent = 0
667-
while total_sent < len(data):
668-
sent_bytes = self._sock.send(data[total_sent:])
669-
total_sent += sent_bytes
670-
assert total_sent == len(data)
682+
total_bytes = self._send_bytes_blocking(data)
671683
if self._sensors:
672-
self._sensors.bytes_sent.record(total_sent)
673-
self._sock.setblocking(False)
674-
except (AssertionError, ConnectionError) as e:
684+
self._sensors.bytes_sent.record(total_bytes)
685+
except ConnectionError as e:
675686
log.exception("Error sending %s to %s", request, self)
676687
error = Errors.ConnectionError("%s: %s" % (self, e))
677688
self.close(error=error)

0 commit comments

Comments
 (0)