Skip to content

Commit 57376da

Browse files
committed
Use "binding key" lingo for super stream creation
Instead of "routing key".
1 parent 6868fb3 commit 57376da

File tree

7 files changed

+52
-52
lines changed

7 files changed

+52
-52
lines changed

Diff for: src/docs/asciidoc/super-streams.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5
8383
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
8484
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.
8585

86-
It is also possible to specify routing keys when creating a super stream:
86+
It is also possible to specify binding keys when creating a super stream:
8787

88-
.Creating a super stream by specifying the routing keys
88+
.Creating a super stream by specifying the binding keys
8989
[source,java,indent=0]
9090
--------
91-
include::{test-examples}/SuperStreamUsage.java[tag=creation-routing-keys]
91+
include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys]
9292
--------
9393

9494
The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case.

Diff for: src/main/java/com/rabbitmq/stream/StreamCreator.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -174,22 +174,22 @@ interface SuperStreamConfiguration {
174174
/**
175175
* The number of partitions of the super stream.
176176
*
177-
* <p>Mutually exclusive with {@link #routingKeys(String...)}. Default is 3.
177+
* <p>Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3.
178178
*
179179
* @param partitions
180180
* @return this super stream configuration instance
181181
*/
182182
SuperStreamConfiguration partitions(int partitions);
183183

184184
/**
185-
* The routing keys to use when declaring the super stream partitions.
185+
* The binding keys to use when declaring the super stream partitions.
186186
*
187187
* <p>Mutually exclusive with {@link #partitions(int)}. Default is null.
188188
*
189-
* @param routingKeys
189+
* @param bindingKeys
190190
* @return this super stream configuration instance
191191
*/
192-
SuperStreamConfiguration routingKeys(String... routingKeys);
192+
SuperStreamConfiguration bindingKeys(String... bindingKeys);
193193

194194
/**
195195
* Go back to the creator.

Diff for: src/main/java/com/rabbitmq/stream/impl/Client.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -688,14 +688,14 @@ public Response create(String stream, Map<String, String> arguments) {
688688
Response createSuperStream(
689689
String superStream,
690690
List<String> partitions,
691-
List<String> routingKeys,
691+
List<String> bindingKeys,
692692
Map<String, String> arguments) {
693693
this.superStreamManagementCommandVersionsCheck.run();
694-
if (partitions.isEmpty() || routingKeys.isEmpty()) {
694+
if (partitions.isEmpty() || bindingKeys.isEmpty()) {
695695
throw new IllegalArgumentException(
696696
"Partitions and routing keys of a super stream cannot be empty");
697697
}
698-
if (partitions.size() != routingKeys.size()) {
698+
if (partitions.size() != bindingKeys.size()) {
699699
throw new IllegalArgumentException(
700700
"Partitions and routing keys of a super stream must have "
701701
+ "the same number of elements");
@@ -708,7 +708,7 @@ Response createSuperStream(
708708
+ 2
709709
+ superStream.length()
710710
+ collectionSize(partitions)
711-
+ collectionSize(routingKeys)
711+
+ collectionSize(bindingKeys)
712712
+ mapSize(arguments);
713713
int correlationId = correlationSequence.incrementAndGet();
714714
try {
@@ -720,7 +720,7 @@ Response createSuperStream(
720720
bb.writeShort(superStream.length());
721721
bb.writeBytes(superStream.getBytes(CHARSET));
722722
writeCollection(bb, partitions);
723-
writeCollection(bb, routingKeys);
723+
writeCollection(bb, bindingKeys);
724724
writeMap(bb, arguments);
725725
OutstandingRequest<Response> request = outstandingRequest();
726726
outstandingRequests.put(correlationId, request);

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -101,28 +101,28 @@ public void create() {
101101
Function<Client, Client.Response> function;
102102
boolean superStream = this.superStreamConfiguration != null;
103103
if (superStream) {
104-
List<String> partitions, routingKeys;
105-
if (this.superStreamConfiguration.routingKeys == null) {
104+
List<String> partitions, bindingKeys;
105+
if (this.superStreamConfiguration.bindingKeys == null) {
106106
partitions =
107107
IntStream.range(0, this.superStreamConfiguration.partitions)
108108
.mapToObj(i -> this.name + "-" + i)
109109
.collect(toList());
110-
routingKeys =
110+
bindingKeys =
111111
IntStream.range(0, this.superStreamConfiguration.partitions)
112112
.mapToObj(String::valueOf)
113113
.collect(toList());
114114
} else {
115115
partitions =
116-
this.superStreamConfiguration.routingKeys.stream()
116+
this.superStreamConfiguration.bindingKeys.stream()
117117
.map(rk -> this.name + "-" + rk)
118118
.collect(toList());
119-
routingKeys = this.superStreamConfiguration.routingKeys;
119+
bindingKeys = this.superStreamConfiguration.bindingKeys;
120120
}
121121
function =
122122
namedFunction(
123123
c ->
124124
c.createSuperStream(
125-
this.name, partitions, routingKeys, streamParametersBuilder.build()),
125+
this.name, partitions, bindingKeys, streamParametersBuilder.build()),
126126
"Creation of super stream '%s'",
127127
this.name);
128128
} else {
@@ -154,7 +154,7 @@ private static class DefaultSuperStreamConfiguration implements SuperStreamConfi
154154
private final StreamCreator creator;
155155

156156
private int partitions = 3;
157-
private List<String> routingKeys = null;
157+
private List<String> bindingKeys = null;
158158

159159
private DefaultSuperStreamConfiguration(StreamCreator creator) {
160160
this.creator = creator;
@@ -166,16 +166,16 @@ public SuperStreamConfiguration partitions(int partitions) {
166166
throw new IllegalArgumentException("The number of partitions must be greater than 0");
167167
}
168168
this.partitions = partitions;
169-
this.routingKeys = null;
169+
this.bindingKeys = null;
170170
return this;
171171
}
172172

173173
@Override
174-
public SuperStreamConfiguration routingKeys(String... routingKeys) {
175-
if (routingKeys == null || routingKeys.length == 0) {
176-
throw new IllegalArgumentException("There must be at least 1 routing key");
174+
public SuperStreamConfiguration bindingKeys(String... bindingKeys) {
175+
if (bindingKeys == null || bindingKeys.length == 0) {
176+
throw new IllegalArgumentException("There must be at least 1 binding key");
177177
}
178-
this.routingKeys = Arrays.asList(routingKeys);
178+
this.bindingKeys = Arrays.asList(bindingKeys);
179179
this.partitions = -1;
180180
return this;
181181
}

Diff for: src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ void creation() {
3434
.partitions(5).creator()
3535
.create();
3636
// end::creation-partitions[]
37-
// tag::creation-routing-keys[]
37+
// tag::creation-binding-keys[]
3838
environment.streamCreator().name("invoices")
3939
.superStream()
40-
.routingKeys("amer", "emea", "apac").creator()
40+
.bindingKeys("amer", "emea", "apac").creator()
4141
.create();
42-
// end::creation-routing-keys[]
42+
// end::creation-binding-keys[]
4343
}
4444

4545
void producerSimple() {

Diff for: src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -527,23 +527,23 @@ void superStreamCreationSetPartitions(int partitionCount, TestInfo info) {
527527

528528
@Test
529529
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0)
530-
void superStreamCreationSetRoutingKeys(TestInfo info) {
531-
List<String> routingKeys = Arrays.asList("a", "b", "c", "d", "e");
530+
void superStreamCreationSetBindingKeys(TestInfo info) {
531+
List<String> bindingKeys = Arrays.asList("a", "b", "c", "d", "e");
532532
String s = streamName(info);
533533
Client client = cf.get();
534534
Environment env = environmentBuilder.build();
535535
try {
536536
env.streamCreator()
537537
.name(s)
538538
.superStream()
539-
.routingKeys(routingKeys.toArray(new String[] {}))
539+
.bindingKeys(bindingKeys.toArray(new String[] {}))
540540
.creator()
541541
.create();
542542

543543
assertThat(client.partitions(s))
544-
.hasSize(routingKeys.size())
545-
.containsAll(routingKeys.stream().map(rk -> s + "-" + rk).collect(toList()));
546-
routingKeys.forEach(rk -> assertThat(client.route(rk, s)).hasSize(1).contains(s + "-" + rk));
544+
.hasSize(bindingKeys.size())
545+
.containsAll(bindingKeys.stream().map(rk -> s + "-" + rk).collect(toList()));
546+
bindingKeys.forEach(bk -> assertThat(client.route(bk, s)).hasSize(1).contains(s + "-" + bk));
547547
} finally {
548548
env.deleteSuperStream(s);
549549
env.close();

Diff for: src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java

+19-19
Original file line numberDiff line numberDiff line change
@@ -42,28 +42,28 @@ public class SuperStreamManagementTest {
4242
static final int partitionCount = 3;
4343
String s;
4444
List<String> partitions;
45-
List<String> routingKeys;
45+
List<String> bindingKeys;
4646

4747
@BeforeEach
4848
void init(TestInfo info) {
4949
s = streamName(info);
5050
partitions = partitions(s);
51-
routingKeys = routingKeys();
51+
bindingKeys = bindingKeys();
5252
}
5353

5454
@Test
5555
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
5656
void createDelete() {
5757
Client c = cf.get();
58-
Client.Response response = c.createSuperStream(s, partitions, routingKeys, null);
58+
Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null);
5959
assertThat(response).is(ok());
6060
assertThat(c.metadata(partitions))
6161
.hasSameSizeAs(partitions)
6262
.allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue());
6363
assertThat(c.partitions(s)).isEqualTo(partitions);
64-
routingKeys.forEach(rk -> assertThat(c.route(rk, s)).hasSize(1).contains(s + "-" + rk));
64+
bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).hasSize(1).contains(s + "-" + bk));
6565

66-
response = c.createSuperStream(s, partitions, routingKeys, null);
66+
response = c.createSuperStream(s, partitions, bindingKeys, null);
6767
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_STREAM_ALREADY_EXISTS));
6868

6969
response = c.deleteSuperStream(s);
@@ -75,7 +75,7 @@ void createDelete() {
7575
assertThat(streamMetadata.getResponseCode())
7676
.isEqualTo(RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
7777
assertThat(c.partitions(s)).isEmpty();
78-
routingKeys.forEach(rk -> assertThat(c.route(rk, s)).isEmpty());
78+
bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).isEmpty());
7979

8080
response = c.deleteSuperStream(s);
8181
assertThat(response).is(responseCode(RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
@@ -85,7 +85,7 @@ void createDelete() {
8585
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
8686
void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception {
8787
Client c = cf.get();
88-
Client.Response response = c.createSuperStream(s, partitions, routingKeys, null);
88+
Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null);
8989
assertThat(response).is(ok());
9090
Map<String, Short> notifications = new ConcurrentHashMap<>(partitions.size());
9191
AtomicInteger notificationCount = new AtomicInteger();
@@ -114,37 +114,37 @@ void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exceptio
114114
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
115115
void authorisation() throws Exception {
116116
String user = "stream";
117-
// routing keys do not matter for authorisation
118-
routingKeys = asList("1", "2", "3");
117+
// binding keys do not matter for authorisation
118+
bindingKeys = asList("1", "2", "3");
119119
try {
120120
addUser(user, user);
121121
setPermissions(user, asList("stream|partition.*$", "partition.*$", "stream.*$"));
122122
Client c = cf.get(new Client.ClientParameters().username(user).password(user));
123-
Client.Response response = c.createSuperStream("not-allowed", partitions, routingKeys, null);
123+
Client.Response response = c.createSuperStream("not-allowed", partitions, bindingKeys, null);
124124
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED));
125125

126126
s = name("stream");
127-
response = c.createSuperStream(s, asList("1", "2", "3"), routingKeys, null);
127+
response = c.createSuperStream(s, asList("1", "2", "3"), bindingKeys, null);
128128
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED));
129129

130130
partitions = range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList());
131131
// we can create the queues, but can't bind them, as it requires write permission
132-
response = c.createSuperStream(s, partitions, routingKeys, null);
132+
response = c.createSuperStream(s, partitions, bindingKeys, null);
133133
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED));
134134

135135
String partitionName = name("partition");
136136
partitions =
137137
range(0, partitionCount).mapToObj(i -> partitionName + "-" + i).collect(toList());
138-
response = c.createSuperStream(s, partitions, routingKeys, null);
138+
response = c.createSuperStream(s, partitions, bindingKeys, null);
139139
assertThat(response).is(ok());
140140

141141
assertThat(c.metadata(partitions))
142142
.hasSameSizeAs(partitions)
143143
.allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue());
144144
assertThat(c.partitions(s)).isEqualTo(partitions);
145-
for (int i = 0; i < routingKeys.size(); i++) {
146-
String rk = routingKeys.get(i);
147-
assertThat(c.route(rk, s)).hasSize(1).contains(partitions.get(i));
145+
for (int i = 0; i < bindingKeys.size(); i++) {
146+
String bk = bindingKeys.get(i);
147+
assertThat(c.route(bk, s)).hasSize(1).contains(partitions.get(i));
148148
}
149149

150150
response = c.deleteSuperStream(s);
@@ -154,11 +154,11 @@ void authorisation() throws Exception {
154154
}
155155
}
156156

157-
private static List<String> routingKeys() {
158-
return routingKeys(partitionCount);
157+
private static List<String> bindingKeys() {
158+
return bindingKeys(partitionCount);
159159
}
160160

161-
private static List<String> routingKeys(int partitions) {
161+
private static List<String> bindingKeys(int partitions) {
162162
return range(0, partitions).mapToObj(String::valueOf).collect(toList());
163163
}
164164

0 commit comments

Comments
 (0)