Skip to content

Commit 4d0722a

Browse files
committed
Changed randomization to simply randomize the initial starting partition of the sorted list of partition rather than completely randomizing the initial ordering before round-robin cycling the partitions
1 parent a81be57 commit 4d0722a

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

kafka/producer.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,13 @@ def _next_partition(self, topic):
198198
if topic not in self.partition_cycles:
199199
if topic not in self.client.topic_partitions:
200200
self.client.load_metadata_for_topics(topic)
201-
randomly_ordered_partitions = self.client.topic_partitions[topic][:]
202-
random.shuffle(randomly_ordered_partitions)
203-
self.partition_cycles[topic] = cycle(randomly_ordered_partitions)
201+
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
202+
203+
# Randomize the initial partition that is returned
204+
num_partitions = len(self.client.topic_partitions[topic])
205+
for _ in xrange(random.randint(0, num_partitions-1)):
206+
self.partition_cycles[topic].next()
207+
204208
return self.partition_cycles[topic].next()
205209

206210
def send_messages(self, topic, *msg):

0 commit comments

Comments
 (0)