Skip to content

Commit 35a14e1

Browse files
committed
Handle New Topic Creation
Adds ensure_topic_exists to KafkaClient, redirects test case to use that. Fixes dpkp#113 and fixes dpkp#150.
1 parent ae6b49a commit 35a14e1

File tree

4 files changed

+34
-23
lines changed

4 files changed

+34
-23
lines changed

kafka/client.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import collections
12
import copy
3+
import functools
4+
import itertools
25
import logging
3-
import collections
4-
6+
import time
57
import kafka.common
68

7-
from functools import partial
8-
from itertools import count
99
from kafka.common import (TopicAndPartition,
1010
ConnectionError, FailedPayloadsError,
1111
PartitionUnavailableError,
@@ -21,7 +21,7 @@
2121
class KafkaClient(object):
2222

2323
CLIENT_ID = "kafka-python"
24-
ID_GEN = count()
24+
ID_GEN = itertools.count()
2525

2626
# NOTE: The timeout given to the client should always be greater than the
2727
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -213,6 +213,16 @@ def reset_all_metadata(self):
213213
def has_metadata_for_topic(self, topic):
214214
return topic in self.topic_partitions
215215

216+
def ensure_topic_exists(self, topic, timeout = 30):
217+
start_time = time.time()
218+
219+
self.load_metadata_for_topics(topic)
220+
while not self.has_metadata_for_topic(topic):
221+
if time.time() > start_time + timeout:
222+
raise KafkaTimeoutError("Unable to create topic {}".format(topic))
223+
self.load_metadata_for_topics(topic)
224+
time.sleep(.5)
225+
216226
def close(self):
217227
for conn in self.conns.values():
218228
conn.close()
@@ -289,7 +299,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
289299
order of input payloads
290300
"""
291301

292-
encoder = partial(
302+
encoder = functools.partial(
293303
KafkaProtocol.encode_produce_request,
294304
acks=acks,
295305
timeout=timeout)
@@ -321,7 +331,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
321331
to the same brokers.
322332
"""
323333

324-
encoder = partial(KafkaProtocol.encode_fetch_request,
334+
encoder = functools.partial(KafkaProtocol.encode_fetch_request,
325335
max_wait_time=max_wait_time,
326336
min_bytes=min_bytes)
327337

@@ -359,7 +369,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True,
359369

360370
def send_offset_commit_request(self, group, payloads=[],
361371
fail_on_error=True, callback=None):
362-
encoder = partial(KafkaProtocol.encode_offset_commit_request,
372+
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
363373
group=group)
364374
decoder = KafkaProtocol.decode_offset_commit_response
365375
resps = self._send_broker_aware_request(payloads, encoder, decoder)
@@ -378,7 +388,7 @@ def send_offset_commit_request(self, group, payloads=[],
378388
def send_offset_fetch_request(self, group, payloads=[],
379389
fail_on_error=True, callback=None):
380390

381-
encoder = partial(KafkaProtocol.encode_offset_fetch_request,
391+
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
382392
group=group)
383393
decoder = KafkaProtocol.decode_offset_fetch_response
384394
resps = self._send_broker_aware_request(payloads, encoder, decoder)

kafka/producer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from multiprocessing import Queue, Process
1111

1212
from kafka.common import (
13-
ProduceRequest, TopicAndPartition, UnsupportedCodecError
13+
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
1414
)
1515
from kafka.partitioner import HashedPartitioner
1616
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
@@ -216,7 +216,10 @@ def _next_partition(self, topic):
216216
if topic not in self.partition_cycles:
217217
if topic not in self.client.topic_partitions:
218218
self.client.load_metadata_for_topics(topic)
219-
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
219+
try:
220+
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
221+
except KeyError:
222+
raise UnknownTopicOrPartitionError(topic)
220223

221224
# Randomize the initial partition that is returned
222225
if self.random_start:

test/test_producer_integration.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,15 @@ def test_simple_producer(self):
142142

143143
producer.stop()
144144

145+
@kafka_versions("all")
146+
def test_produce__new_topic_fails_with_reasonable_error(self):
147+
new_topic = 'new_topic_{}'.format(str(uuid.uuid4()))
148+
producer = SimpleProducer(self.client)
149+
150+
# At first it doesn't exist
151+
with self.assertRaises(UnknownTopicOrPartitionError):
152+
resp = producer.send_messages(new_topic, self.msg("one"))
153+
145154
@kafka_versions("all")
146155
def test_producer_random_order(self):
147156
producer = SimpleProducer(self.client, random_start = True)

test/testutil.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
__all__ = [
1515
'random_string',
16-
'ensure_topic_creation',
1716
'get_open_port',
1817
'kafka_versions',
1918
'KafkaIntegrationTestCase',
@@ -39,16 +38,6 @@ def wrapper(self):
3938
return wrapper
4039
return kafka_versions
4140

42-
def ensure_topic_creation(client, topic_name, timeout = 30):
43-
start_time = time.time()
44-
45-
client.load_metadata_for_topics(topic_name)
46-
while not client.has_metadata_for_topic(topic_name):
47-
if time.time() > start_time + timeout:
48-
raise Exception("Unable to create topic %s" % topic_name)
49-
client.load_metadata_for_topics(topic_name)
50-
time.sleep(1)
51-
5241
def get_open_port():
5342
sock = socket.socket()
5443
sock.bind(("", 0))
@@ -71,7 +60,7 @@ def setUp(self):
7160
if self.create_client:
7261
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
7362

74-
ensure_topic_creation(self.client, self.topic)
63+
self.client.ensure_topic_exists(self.topic)
7564

7665
self._messages = {}
7766

0 commit comments

Comments
 (0)