Skip to content

Commit 1fd35a5

Browse files
committed
added random_start param to SimpleProducer to enable/disable randomization of the initial partition messages are published to
1 parent 4d0722a commit 1fd35a5

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

kafka/producer.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,20 @@ class SimpleProducer(Producer):
181181
batch_send - If True, messages are send in batches
182182
batch_send_every_n - If set, messages are send in batches of this size
183183
batch_send_every_t - If set, messages are send after this timeout
184+
random_start - If true, randomize the initial partition which the
185+
the first message block will be published to, otherwise
186+
if false, the first message block will always publish
187+
to partition 0 before cycling through each partition
184188
"""
185189
def __init__(self, client, async=False,
186190
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
187191
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
188192
batch_send=False,
189193
batch_send_every_n=BATCH_SEND_MSG_COUNT,
190-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
194+
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
195+
random_start=False):
191196
self.partition_cycles = {}
197+
self.random_start = random_start
192198
super(SimpleProducer, self).__init__(client, async, req_acks,
193199
ack_timeout, batch_send,
194200
batch_send_every_n,
@@ -201,9 +207,10 @@ def _next_partition(self, topic):
201207
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
202208

203209
# Randomize the initial partition that is returned
204-
num_partitions = len(self.client.topic_partitions[topic])
205-
for _ in xrange(random.randint(0, num_partitions-1)):
206-
self.partition_cycles[topic].next()
210+
if self.random_start:
211+
num_partitions = len(self.client.topic_partitions[topic])
212+
for _ in xrange(random.randint(0, num_partitions-1)):
213+
self.partition_cycles[topic].next()
207214

208215
return self.partition_cycles[topic].next()
209216

0 commit comments

Comments
 (0)