Skip to content

Commit 4cfeaca

Browse files
asdaraujodpkp
authored andcommitted
Add security layer negotiation to the GSSAPI authentication. (#1283)
When trying to establish a connection with Kafka using SASL with the GSSAPI authentication mechanism the connection was hanging an timing out after 60 secons. On the Kafka broker side I noticed that the SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state. The GSSAPI auth implementation was missing the second handshake defined in RFC 2222, which happens after the security context is established. This handshake is used by the client and server to negotiate the security layer (QoP) to be used for the connection. Kafka currently only support the "auth" QoP, so the implementation in this commit doesn't make it configurable, but this can be extended later. With this change I was able to successfully connect to a Kerberos-enabled Kafka broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism.
1 parent c49ae90 commit 4cfeaca

File tree

1 file changed

+43
-22
lines changed

1 file changed

+43
-22
lines changed

kafka/conn.py

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import collections
44
import copy
55
import errno
6+
import io
67
import logging
78
from random import shuffle, uniform
89

@@ -27,7 +28,7 @@
2728
from kafka.protocol.commit import OffsetFetchRequest
2829
from kafka.protocol.metadata import MetadataRequest
2930
from kafka.protocol.parser import KafkaProtocol
30-
from kafka.protocol.types import Int32
31+
from kafka.protocol.types import Int32, Int8
3132
from kafka.version import __version__
3233

3334

@@ -39,6 +40,10 @@
3940

4041
DEFAULT_KAFKA_PORT = 9092
4142

43+
SASL_QOP_AUTH = 1
44+
SASL_QOP_AUTH_INT = 2
45+
SASL_QOP_AUTH_CONF = 4
46+
4247
try:
4348
import ssl
4449
ssl_available = True
@@ -517,43 +522,59 @@ def _try_authenticate_plain(self, future):
517522
return future.success(True)
518523

519524
def _try_authenticate_gssapi(self, future):
525+
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
520526
gssapi_name = gssapi.Name(
521-
self.config['sasl_kerberos_service_name'] + '@' + self.hostname,
527+
auth_id,
522528
name_type=gssapi.NameType.hostbased_service
523529
).canonicalize(gssapi.MechType.kerberos)
524530
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
525531

526-
# Exchange tokens until authentication either succeeds or fails
527-
client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
528-
received_token = None
532+
# Establish security context and negotiate protection level
533+
# For reference RFC 2222, section 7.2.1
529534
try:
535+
# Exchange tokens until authentication either succeeds or fails
536+
client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
537+
received_token = None
530538
while not client_ctx.complete:
531539
# calculate an output token from kafka token (or None if first iteration)
532540
output_token = client_ctx.step(received_token)
533541

542+
# pass output token to kafka, or send empty response if the security
543+
# context is complete (output token is None in that case)
534544
if output_token is None:
535-
continue
536-
537-
# pass output token to kafka
538-
try:
545+
self._send_bytes_blocking(Int32.encode(0))
546+
else:
539547
msg = output_token
540548
size = Int32.encode(len(msg))
541549
self._send_bytes_blocking(size + msg)
542550

543-
# The server will send a token back. Processing of this token either
544-
# establishes a security context, or it needs further token exchange.
545-
# The gssapi will be able to identify the needed next step.
546-
# The connection is closed on failure.
547-
header = self._recv_bytes_blocking(4)
548-
(token_size,) = struct.unpack('>i', header)
549-
received_token = self._recv_bytes_blocking(token_size)
550-
551-
except ConnectionError as e:
552-
log.exception("%s: Error receiving reply from server", self)
553-
error = Errors.ConnectionError("%s: %s" % (self, e))
554-
self.close(error=error)
555-
return future.failure(error)
551+
# The server will send a token back. Processing of this token either
552+
# establishes a security context, or it needs further token exchange.
553+
# The gssapi will be able to identify the needed next step.
554+
# The connection is closed on failure.
555+
header = self._recv_bytes_blocking(4)
556+
(token_size,) = struct.unpack('>i', header)
557+
received_token = self._recv_bytes_blocking(token_size)
558+
559+
# Process the security layer negotiation token, sent by the server
560+
# once the security context is established.
561+
562+
# unwraps message containing supported protection levels and msg size
563+
msg = client_ctx.unwrap(received_token).message
564+
# Kafka currently doesn't support integrity or confidentiality security layers, so we
565+
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
566+
# by the server
567+
msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0]))) + msg[1:]
568+
# add authorization identity to the response, GSS-wrap and send it
569+
msg = client_ctx.wrap(msg + auth_id, False).message
570+
size = Int32.encode(len(msg))
571+
self._send_bytes_blocking(size + msg)
556572

573+
except ConnectionError as e:
574+
log.exception("%s: Error receiving reply from server", self)
575+
error = Errors.ConnectionError("%s: %s" % (self, e))
576+
self.close(error=error)
577+
return future.failure(error)
557578
except Exception as e:
558579
return future.failure(e)
559580

0 commit comments

Comments
 (0)