Skip to content

Commit f7b3133

Browse files
authored
Fix python2.7 errors (#2578)
1 parent 2966a8e commit f7b3133

12 files changed

+70
-14
lines changed

kafka/conn.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,8 @@ def _handle_api_versions_response(self, future, response):
594594
future.failure(error_type())
595595
if error_type is Errors.UnsupportedVersionError:
596596
self._api_versions_idx -= 1
597-
for api_key, min_version, max_version, *rest in response.api_versions:
597+
for api_version_data in response.api_versions:
598+
api_key, min_version, max_version = api_version_data[:3]
598599
# If broker provides a lower max_version, skip to that
599600
if api_key == response.API_KEY:
600601
self._api_versions_idx = min(self._api_versions_idx, max_version)
@@ -607,8 +608,8 @@ def _handle_api_versions_response(self, future, response):
607608
self.close(error=error_type())
608609
return
609610
self._api_versions = dict([
610-
(api_key, (min_version, max_version))
611-
for api_key, min_version, max_version, *rest in response.api_versions
611+
(api_version_data[0], (api_version_data[1], api_version_data[2]))
612+
for api_version_data in response.api_versions
612613
])
613614
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
614615
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))

kafka/consumer/fetcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,9 @@ def _maybe_skip_record(self, record):
867867
def __bool__(self):
868868
return self.record_iterator is not None
869869

870+
# py2
871+
__nonzero__ = __bool__
872+
870873
def drain(self):
871874
if self.record_iterator is not None:
872875
self.record_iterator = None

kafka/consumer/subscription_state.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,11 @@ def resume(self, partition):
381381

382382
def move_partition_to_end(self, partition):
383383
if partition in self.assignment:
384-
self.assignment.move_to_end(partition)
384+
try:
385+
self.assignment.move_to_end(partition)
386+
except AttributeError:
387+
state = self.assignment.pop(partition)
388+
self.assignment[partition] = state
385389

386390

387391
class TopicPartitionState(object):

test/record/test_default_records.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
)
1212
from kafka.errors import UnsupportedCodecError
1313

14+
from test.testutil import maybe_skip_unsupported_compression
15+
1416

1517
@pytest.mark.parametrize("compression_type", [
1618
DefaultRecordBatch.CODEC_NONE,
@@ -19,6 +21,7 @@
1921
DefaultRecordBatch.CODEC_LZ4
2022
])
2123
def test_read_write_serde_v2(compression_type):
24+
maybe_skip_unsupported_compression(compression_type)
2225
builder = DefaultRecordBatchBuilder(
2326
magic=2, compression_type=compression_type, is_transactional=1,
2427
producer_id=123456, producer_epoch=123, base_sequence=9999,
@@ -186,6 +189,8 @@ def test_default_batch_size_limit():
186189
])
187190
@pytest.mark.parametrize("magic", [0, 1])
188191
def test_unavailable_codec(magic, compression_type, name, checker_name):
192+
if not getattr(kafka.codec, checker_name)():
193+
pytest.skip('%s compression_type not installed' % (compression_type,))
189194
builder = DefaultRecordBatchBuilder(
190195
magic=2, compression_type=compression_type, is_transactional=0,
191196
producer_id=-1, producer_epoch=-1, base_sequence=-1,

test/record/test_legacy_records.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import kafka.codec
1111
from kafka.errors import UnsupportedCodecError
1212

13+
from test.testutil import maybe_skip_unsupported_compression
14+
1315

1416
@pytest.mark.parametrize("magic", [0, 1])
1517
def test_read_write_serde_v0_v1_no_compression(magic):
@@ -39,6 +41,7 @@ def test_read_write_serde_v0_v1_no_compression(magic):
3941
])
4042
@pytest.mark.parametrize("magic", [0, 1])
4143
def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
44+
maybe_skip_unsupported_compression(compression_type)
4245
builder = LegacyRecordBatchBuilder(
4346
magic=magic, compression_type=compression_type, batch_size=9999999)
4447
for offset in range(10):
@@ -179,6 +182,7 @@ def test_legacy_batch_size_limit(magic):
179182
])
180183
@pytest.mark.parametrize("magic", [0, 1])
181184
def test_unavailable_codec(magic, compression_type, name, checker_name):
185+
maybe_skip_unsupported_compression(compression_type)
182186
builder = LegacyRecordBatchBuilder(
183187
magic=magic, compression_type=compression_type, batch_size=1024)
184188
builder.append(0, timestamp=None, key=None, value=b"M")

test/record/test_records.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from kafka.record import MemoryRecords, MemoryRecordsBuilder
55
from kafka.errors import CorruptRecordException
66

7+
from test.testutil import maybe_skip_unsupported_compression
8+
79
# This is real live data from Kafka 11 broker
810
record_batch_data_v2 = [
911
# First Batch value == "123"
@@ -179,6 +181,7 @@ def test_memory_records_corrupt():
179181
@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
180182
@pytest.mark.parametrize("magic", [0, 1, 2])
181183
def test_memory_records_builder(magic, compression_type):
184+
maybe_skip_unsupported_compression(compression_type)
182185
builder = MemoryRecordsBuilder(
183186
magic=magic, compression_type=compression_type, batch_size=1024 * 10)
184187
base_size = builder.size_in_bytes() # V2 has a header before

test/test_conn.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import pytest
1212

1313
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
14+
from kafka.future import Future
1415
from kafka.protocol.api import RequestHeader
1516
from kafka.protocol.group import HeartbeatResponse
1617
from kafka.protocol.metadata import MetadataRequest
@@ -69,8 +70,10 @@ def test_connect(_socket, conn, states):
6970
assert conn.state is state
7071

7172

72-
def test_api_versions_check(_socket):
73+
def test_api_versions_check(_socket, mocker):
7374
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
75+
mocker.patch.object(conn, '_send', return_value=Future())
76+
mocker.patch.object(conn, 'recv', return_value=[])
7477
assert conn._api_versions_future is None
7578
conn.connect()
7679
assert conn._api_versions_future is not None

test/test_consumer_group.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ def consumer_thread(i):
6868

6969
num_consumers = 4
7070
for i in range(num_consumers):
71-
t = threading.Thread(target=consumer_thread, args=(i,), daemon=True)
71+
t = threading.Thread(target=consumer_thread, args=(i,))
72+
t.daemon = True
7273
t.start()
7374
threads[i] = t
7475

test/test_coordinator.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,25 @@
2525
from kafka.structs import OffsetAndMetadata, TopicPartition
2626
from kafka.util import WeakMethod
2727

28-
2928
@pytest.fixture
30-
def client(conn):
31-
return KafkaClient(api_version=(0, 9))
29+
def client(conn, mocker):
30+
cli = KafkaClient(api_version=(0, 9))
31+
mocker.patch.object(cli, '_init_connect', return_value=True)
32+
try:
33+
yield cli
34+
finally:
35+
cli._close()
3236

3337
@pytest.fixture
34-
def coordinator(client):
35-
return ConsumerCoordinator(client, SubscriptionState(), Metrics())
38+
def coordinator(client, mocker):
39+
metrics = Metrics()
40+
coord = ConsumerCoordinator(client, SubscriptionState(), metrics)
41+
try:
42+
yield coord
43+
finally:
44+
mocker.patch.object(coord, 'coordinator_unknown', return_value=True) # avoid attempting to leave group during close()
45+
coord.close(timeout_ms=0)
46+
metrics.close()
3647

3748

3849
def test_init(client, coordinator):
@@ -55,6 +66,7 @@ def test_autocommit_enable_api_version(conn, api_version):
5566
assert coordinator.config['enable_auto_commit'] is False
5667
else:
5768
assert coordinator.config['enable_auto_commit'] is True
69+
coordinator.close()
5870

5971

6072
def test_protocol_type(coordinator):
@@ -117,6 +129,7 @@ def test_pattern_subscription(conn, api_version):
117129
else:
118130
assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0),
119131
TopicPartition('foo2', 0)}
132+
coordinator.close()
120133

121134

122135
def test_lookup_assignor(coordinator):
@@ -398,6 +411,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
398411
assert commit_sync.call_count == (1 if commit_offsets else 0)
399412
assert mock_warn.call_count == (1 if warn else 0)
400413
assert mock_exc.call_count == (1 if exc else 0)
414+
coordinator.close()
401415

402416

403417
@pytest.fixture

test/test_producer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
1010
from kafka.producer.buffer import SimpleBufferPool
11-
from test.testutil import env_kafka_version, random_string
11+
from test.testutil import env_kafka_version, random_string, maybe_skip_unsupported_compression
1212

1313

1414
def test_buffer_pool():
@@ -44,6 +44,7 @@ def consumer_factory(**kwargs):
4444
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
4545
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
4646
def test_end_to_end(kafka_broker, compression):
47+
maybe_skip_unsupported_compression(compression)
4748
if compression == 'lz4':
4849
if env_kafka_version() < (0, 8, 2):
4950
pytest.skip('LZ4 requires 0.8.2')
@@ -104,6 +105,7 @@ def test_kafka_producer_gc_cleanup():
104105
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
105106
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
106107
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
108+
maybe_skip_unsupported_compression(compression)
107109
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
108110
pytest.skip('zstd requires 2.1.0 or more')
109111
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])

test/test_subscription_state.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ def test_assign_from_subscribed():
4444

4545
s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)])
4646
assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])
47-
assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)])
48-
assert all([not s.has_valid_position for s in six.itervalues(s.assignment)])
47+
assert all([isinstance(tps, TopicPartitionState) for tps in six.itervalues(s.assignment)])
48+
assert all([not tps.has_valid_position for tps in six.itervalues(s.assignment)])
4949

5050

5151
def test_change_subscription_after_assignment():

test/testutil.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import string
77
import time
88

9+
import pytest
10+
11+
import kafka.codec
12+
913

1014
def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')):
1115
return _matcher.sub('_', string)
@@ -36,6 +40,18 @@ def assert_message_count(messages, num_messages):
3640
assert len(unique_messages) == num_messages, 'Expected %d unique messages, got %d' % (num_messages, len(unique_messages))
3741

3842

43+
def maybe_skip_unsupported_compression(compression_type):
44+
codecs = {1: 'gzip', 2: 'snappy', 3: 'lz4', 4: 'zstd'}
45+
if not compression_type:
46+
return
47+
elif compression_type in codecs:
48+
compression_type = codecs[compression_type]
49+
50+
checker = getattr(kafka.codec, 'has_' + compression_type, None)
51+
if checker and not checker():
52+
pytest.skip("Compression libraries not installed for %s" % (compression_type,))
53+
54+
3955
class Timer(object):
4056
def __enter__(self):
4157
self.start = time.time()

0 commit comments

Comments
 (0)