Skip to content

Commit 787e8b2

Browse files
Lars Jørgen Solbergdpkp
Lars Jørgen Solberg
authored andcommitted
minor tweaks to get authentication working
1 parent 2b2c72f commit 787e8b2

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

kafka/conn.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def _try_handshake(self):
299299
return False
300300

301301
def _try_authenticate(self):
302-
assert self.config['api_version'] >= (0, 10) or self.config['api_version'] is None
302+
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
303303

304304
if self._sasl_auth_future is None:
305305
# Build a SaslHandShakeRequest message
@@ -311,7 +311,7 @@ def _try_authenticate(self):
311311
self._sasl_auth_future = future
312312
self._recv()
313313
if self._sasl_auth_future.failed():
314-
raise self._sasl_auth_future.exception
314+
raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
315315
return self._sasl_auth_future.succeeded()
316316

317317
def _handle_sasl_handshake_response(self, future, response):
@@ -345,25 +345,24 @@ def _try_authenticate_plain(self, future):
345345

346346
# The server will send a zero sized message (that is Int32(0)) on success.
347347
# The connection is closed on failure
348-
received_bytes = 0
349-
while received_bytes < 4:
350-
data += self._sock.recv(4 - received_bytes)
351-
received_bytes += len(data)
352-
if not data:
348+
while len(data) < 4:
349+
fragment = self._sock.recv(4 - len(data))
350+
if not fragment:
353351
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
354352
error = Errors.AuthenticationFailedError(
355353
'Authentication failed for user {0}'.format(
356354
self.config['sasl_plain_username']))
357355
future.failure(error)
358356
raise error
357+
data += fragment
359358
self._sock.setblocking(False)
360359
except (AssertionError, ConnectionError) as e:
361360
log.exception("%s: Error receiving reply from server", self)
362361
error = Errors.ConnectionError("%s: %s" % (str(self), e))
363362
future.failure(error)
364363
self.close(error=error)
365364

366-
if data != '\x00\x00\x00\x00':
365+
if data != b'\x00\x00\x00\x00':
367366
return future.failure(Errors.AuthenticationFailedError())
368367

369368
return future.success(True)

0 commit comments

Comments
 (0)