Skip to content

Commit 5ab4bbf

Browse files
committed
Update region resolution logic and refactor / add more tests
1 parent c120448 commit 5ab4bbf

File tree

2 files changed

+99
-13
lines changed

2 files changed

+99
-13
lines changed

kafka/msk.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ def __init__(self, host, boto_session):
3434
self.host = host
3535
self.boto_session = boto_session
3636

37+
# This will raise if the region can't be determined
38+
# Do this during init instead of waiting for failures downstream
39+
if self.region:
40+
pass
41+
3742
@property
3843
def access_key(self):
3944
return self.boto_session.get_credentials().access_key
@@ -48,11 +53,21 @@ def token(self):
4853

4954
@property
5055
def region(self):
51-
# TODO: This logic is not perfect and should be revisited
56+
# Try to get the region information from the broker hostname
5257
for host in self.host.split(','):
5358
if 'amazonaws.com' in host:
5459
return host.split('.')[-3]
55-
return 'us-west-2'
60+
61+
# If the region can't be determined from hostname, try the boto session
62+
# This will only have a value if:
63+
# - `AWS_DEFAULT_REGION` environment variable is set
64+
# - `~/.aws/config` region variable is set
65+
region = self.boto_session.get_config_variable('region')
66+
if region:
67+
return region
68+
69+
# Otherwise give up
70+
raise Exception('Could not determine region from broker host(s) or aws configuration')
5671

5772
@property
5873
def _credential(self):

test/test_msk.py

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,92 @@
1313
import mock
1414

1515

16-
@pytest.fixture(params=[{'session_token': 'session_token', 'host': 'localhost'}, {'session_token': None, 'host': 'localhost.us-east-1.amazonaws.com'}])
17-
def msk_client(request):
16+
@pytest.fixture
17+
def boto_session():
1818
# To avoid a package dependency on the optional botocore library, we mock the module out
1919
sys.modules['botocore.session'] = mock.MagicMock()
2020
from botocore.session import Session # pylint: disable=import-error
2121

22-
session = Session()
23-
session.get_credentials = mock.MagicMock(return_value=mock.MagicMock(id='the_actual_credentials', access_key='akia', secret_key='secret', token=request.param['session_token']))
24-
yield AwsMskIamClient(
25-
host=request.param["host"],
26-
boto_session = session,
22+
boto_session = Session()
23+
boto_session.get_credentials = mock.MagicMock(return_value=mock.MagicMock(id='the_actual_credentials', access_key='akia', secret_key='secret', token=None))
24+
yield boto_session
25+
26+
27+
def test_aws_msk_iam_region_from_config(boto_session):
28+
# Region determined by configuration
29+
boto_session.get_config_variable = mock.MagicMock(return_value='us-west-2')
30+
msk_client = AwsMskIamClient(
31+
host='localhost',
32+
boto_session = boto_session,
2733
)
34+
msg = msk_client.first_message()
35+
assert msg
36+
assert isinstance(msg, bytes)
37+
actual = json.loads(msg.decode('utf-8'))
38+
39+
expected = {
40+
'version': '2020_10_22',
41+
'host': msk_client.host,
42+
'user-agent': 'kafka-python',
43+
'action': 'kafka-cluster:Connect',
44+
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
45+
'x-amz-credential': '{}/{}/us-west-2/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d')),
46+
'x-amz-date': mock.ANY,
47+
'x-amz-signedheaders': 'host',
48+
'x-amz-expires': '900',
49+
'x-amz-signature': mock.ANY,
50+
}
51+
TestCase().assertEqual(actual, expected)
2852

2953

30-
def test_aws_msk_iam(msk_client):
54+
def test_aws_msk_iam_region_from_hostname(boto_session):
55+
# Region determined by hostname
56+
msk_client = AwsMskIamClient(
57+
host='localhost.us-east-1.amazonaws.com',
58+
boto_session = boto_session,
59+
)
60+
msg = msk_client.first_message()
61+
assert msg
62+
assert isinstance(msg, bytes)
63+
actual = json.loads(msg.decode('utf-8'))
64+
65+
expected = {
66+
'version': '2020_10_22',
67+
'host': msk_client.host,
68+
'user-agent': 'kafka-python',
69+
'action': 'kafka-cluster:Connect',
70+
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
71+
'x-amz-credential': '{}/{}/us-east-1/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d')),
72+
'x-amz-date': mock.ANY,
73+
'x-amz-signedheaders': 'host',
74+
'x-amz-expires': '900',
75+
'x-amz-signature': mock.ANY,
76+
}
77+
TestCase().assertEqual(actual, expected)
78+
79+
80+
def test_aws_msk_iam_no_region(boto_session):
81+
# No region from config
82+
boto_session.get_config_variable = mock.MagicMock(return_value=None)
83+
84+
with TestCase().assertRaises(Exception) as e:
85+
# No region from hostname
86+
msk_client = AwsMskIamClient(
87+
host='localhost',
88+
boto_session = boto_session,
89+
)
90+
assert 'Could not determine region from broker host(s) or aws configuration' == str(e.exception)
91+
92+
93+
@pytest.mark.parametrize('session_token', [(None), ('the_token')])
94+
def test_aws_msk_iam_permanent_and_temporary_credentials(session_token, request):
95+
boto_session = request.getfixturevalue('boto_session')
96+
if session_token:
97+
boto_session.get_credentials.return_value.token = session_token
98+
msk_client = AwsMskIamClient(
99+
host='localhost.us-east-1.amazonaws.com',
100+
boto_session = boto_session,
101+
)
31102
msg = msk_client.first_message()
32103
assert msg
33104
assert isinstance(msg, bytes)
@@ -39,12 +110,12 @@ def test_aws_msk_iam(msk_client):
39110
'user-agent': 'kafka-python',
40111
'action': 'kafka-cluster:Connect',
41112
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
42-
'x-amz-credential': '{}/{}/{}/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d'), 'us-west-2' if msk_client.host == 'localhost' else 'us-east-1'),
113+
'x-amz-credential': '{}/{}/us-east-1/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d')),
43114
'x-amz-date': mock.ANY,
44115
'x-amz-signedheaders': 'host',
45116
'x-amz-expires': '900',
46117
'x-amz-signature': mock.ANY,
47118
}
48-
if msk_client.token:
49-
expected['x-amz-security-token'] = msk_client.token
119+
if session_token:
120+
expected['x-amz-security-token'] = session_token
50121
TestCase().assertEqual(actual, expected)

0 commit comments

Comments
 (0)