We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 09c053a commit 92fefb0Copy full SHA for 92fefb0
kafka/producer.py
@@ -2,6 +2,7 @@
2
3
import logging
4
import time
5
+import random
6
7
from Queue import Empty
8
from collections import defaultdict
@@ -197,7 +198,9 @@ def _next_partition(self, topic):
197
198
if topic not in self.partition_cycles:
199
if topic not in self.client.topic_partitions:
200
self.client.load_metadata_for_topics(topic)
- self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
201
+ randomly_ordered_partitions = self.client.topic_partitions[topic][:]
202
+ random.shuffle(randomly_ordered_partitions)
203
+ self.partition_cycles[topic] = cycle(randomly_ordered_partitions)
204
return self.partition_cycles[topic].next()
205
206
def send_messages(self, topic, *msg):
0 commit comments