1
1
from __future__ import absolute_import
2
2
3
+ import pytest
4
+
3
5
from kafka .partitioner import DefaultPartitioner , Murmur2Partitioner , RoundRobinPartitioner
4
6
from kafka .partitioner .hashed import murmur2
5
7
6
8
7
9
def test_default_partitioner ():
8
10
partitioner = DefaultPartitioner ()
9
- all_partitions = list (range (100 ))
10
- available = all_partitions
11
+ all_partitions = available = list (range (100 ))
11
12
# partitioner should return the same partition for the same key
12
13
p1 = partitioner (b'foo' , all_partitions , available )
13
14
p2 = partitioner (b'foo' , all_partitions , available )
@@ -23,8 +24,7 @@ def test_default_partitioner():
23
24
24
25
def test_roundrobin_partitioner ():
25
26
partitioner = RoundRobinPartitioner ()
26
- all_partitions = list (range (100 ))
27
- available = all_partitions
27
+ all_partitions = available = list (range (100 ))
28
28
# partitioner should cycle between partitions
29
29
i = 0
30
30
max_partition = all_partitions [len (all_partitions ) - 1 ]
@@ -53,15 +53,14 @@ def test_roundrobin_partitioner():
53
53
i += 1
54
54
55
55
56
- def test_murmur2_java_compatibility ():
56
+ @pytest .mark .parametrize ("bytes_payload,partition_number" , [
57
+ (b'' , 681 ), (b'a' , 524 ), (b'ab' , 434 ), (b'abc' , 107 ), (b'123456789' , 566 ),
58
+ (b'\x00 ' , 742 )
59
+ ])
60
+ def test_murmur2_java_compatibility (bytes_payload , partition_number ):
57
61
p = Murmur2Partitioner (range (1000 ))
58
62
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
59
- assert p .partition (b'' ) == 681
60
- assert p .partition (b'a' ) == 524
61
- assert p .partition (b'ab' ) == 434
62
- assert p .partition (b'abc' ) == 107
63
- assert p .partition (b'123456789' ) == 566
64
- assert p .partition (b'\x00 ' ) == 742
63
+ assert p .partition (bytes_payload ) == partition_number
65
64
66
65
67
66
def test_murmur2_not_ascii ():
0 commit comments