Skip to content

Commit edef8d6

Browse files
committed
Fix partition calculation in hash routing strategy
Use Integer.remainderUnsigned(int, int) instead of % as the murmur 3 hash is an unsigned 32-bit. Note this can change the partition order for the same key before releases of the client library, this is considered OK as this is fixed before RabbitMQ 3.11 GA is announced. Fixes #199
1 parent 0f1a204 commit edef8d6

File tree

3 files changed

+188
-2
lines changed

3 files changed

+188
-2
lines changed

src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -36,6 +36,7 @@ public List<String> route(Message message, Metadata metadata) {
3636
String routingKey = routingKeyExtractor.apply(message);
3737
int hashValue = hash.applyAsInt(routingKey);
3838
List<String> partitions = metadata.partitions();
39-
return Collections.singletonList(partitions.get((hashValue & 0x7FFFFFFF) % partitions.size()));
39+
return Collections.singletonList(
40+
partitions.get(Integer.remainderUnsigned(hashValue, partitions.size())));
4041
}
4142
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.benchmark;
15+
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.concurrent.ConcurrentHashMap;
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.stream.IntStream;
20+
import org.apache.commons.codec.digest.MurmurHash3;
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.annotations.BenchmarkMode;
23+
import org.openjdk.jmh.annotations.Fork;
24+
import org.openjdk.jmh.annotations.Measurement;
25+
import org.openjdk.jmh.annotations.Mode;
26+
import org.openjdk.jmh.annotations.OutputTimeUnit;
27+
import org.openjdk.jmh.annotations.Param;
28+
import org.openjdk.jmh.annotations.Scope;
29+
import org.openjdk.jmh.annotations.Setup;
30+
import org.openjdk.jmh.annotations.State;
31+
import org.openjdk.jmh.annotations.Threads;
32+
import org.openjdk.jmh.annotations.Warmup;
33+
import org.openjdk.jmh.runner.Runner;
34+
import org.openjdk.jmh.runner.RunnerException;
35+
import org.openjdk.jmh.runner.options.Options;
36+
import org.openjdk.jmh.runner.options.OptionsBuilder;
37+
38+
@BenchmarkMode(Mode.Throughput)
39+
@OutputTimeUnit(TimeUnit.SECONDS)
40+
@State(Scope.Benchmark)
41+
@Warmup(iterations = 5, time = 5)
42+
@Measurement(iterations = 3, time = 5)
43+
@Fork(1)
44+
@Threads(1)
45+
public class ModuloBenchmark {
46+
47+
static final int partitionSize = 3;
48+
49+
static ConcurrentHashMap<String, IntToIntFunction> MODULOS =
50+
new ConcurrentHashMap<String, IntToIntFunction>() {
51+
{
52+
put("mod", i -> i % partitionSize);
53+
put("remainderUnsigned", i -> Integer.remainderUnsigned(i, partitionSize));
54+
}
55+
};
56+
57+
@Param({"mod", "remainderUnsigned"})
58+
String algorithm;
59+
60+
IntToIntFunction modulo;
61+
62+
int[] content;
63+
64+
public static void main(String[] args) throws RunnerException {
65+
Options opt =
66+
new OptionsBuilder()
67+
.include(ModuloBenchmark.class.getSimpleName())
68+
.warmupIterations(3)
69+
.measurementIterations(2)
70+
.forks(1)
71+
.build();
72+
73+
new Runner(opt).run();
74+
}
75+
76+
@Setup
77+
public void setUp() {
78+
modulo = MODULOS.get(algorithm);
79+
content =
80+
IntStream.range(0, 10)
81+
.mapToObj(i -> "hello-" + i)
82+
.mapToInt(
83+
s ->
84+
MurmurHash3.hash32x86(
85+
s.getBytes(StandardCharsets.UTF_8), 0, s.length(), 104729))
86+
.toArray();
87+
}
88+
89+
@Benchmark
90+
public void modulo() {
91+
for (int i : content) {
92+
modulo.applyAsLong(i);
93+
}
94+
}
95+
96+
@FunctionalInterface
97+
public interface IntToIntFunction {
98+
99+
int applyAsLong(int value);
100+
}
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
import com.rabbitmq.stream.Message;
19+
import com.rabbitmq.stream.RoutingStrategy;
20+
import com.rabbitmq.stream.RoutingStrategy.Metadata;
21+
import java.util.Arrays;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.function.Function;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
28+
import org.junit.jupiter.api.Test;
29+
30+
public class HashRoutingStrategyTest {
31+
32+
@Test
33+
void routeShouldComputeAppropriatePartitions() {
34+
List<String> keys =
35+
IntStream.range(1, 11).mapToObj(i -> "hello" + i).collect(Collectors.toList());
36+
RoutingStrategy routingStrategy =
37+
new HashRoutingStrategy(
38+
new Function<Message, String>() {
39+
int count = 0;
40+
41+
@Override
42+
public String apply(Message message) {
43+
return keys.get(count++);
44+
}
45+
},
46+
HashUtils.MURMUR3);
47+
48+
Map<String, String> expectedRoutes =
49+
new HashMap<String, String>() {
50+
{
51+
put("hello1", "invoices-02");
52+
put("hello2", "invoices-01");
53+
put("hello3", "invoices-02");
54+
put("hello4", "invoices-03");
55+
put("hello5", "invoices-01");
56+
put("hello6", "invoices-03");
57+
put("hello7", "invoices-01");
58+
put("hello8", "invoices-02");
59+
put("hello9", "invoices-01");
60+
put("hello10", "invoices-03");
61+
}
62+
};
63+
64+
Metadata metadata =
65+
new Metadata() {
66+
List<String> partitions = Arrays.asList("invoices-01", "invoices-02", "invoices-03");
67+
68+
@Override
69+
public List<String> partitions() {
70+
return partitions;
71+
}
72+
73+
@Override
74+
public List<String> route(String routingKey) {
75+
// not used here
76+
return null;
77+
}
78+
};
79+
for (String key : keys) {
80+
List<String> partitions = routingStrategy.route(null, metadata);
81+
assertThat(partitions).hasSize(1).element(0).isEqualTo(expectedRoutes.get(key));
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)