Skip to content

Commit 4e53dad

Browse files
authored
Merge pull request #491 from rabbitmq/token-expiration-in-stream-connections
Add test for permission updates after re-authentication
2 parents b56fa1c + 2c79a0b commit 4e53dad

File tree

3 files changed

+181
-22
lines changed

3 files changed

+181
-22
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,12 @@ private Address address() {
974974
}
975975

976976
private String label() {
977-
return address.host() + ":" + address.port();
977+
Client c = this.nullableClient();
978+
if (c == null) {
979+
return address.host() + ":" + address.port();
980+
} else {
981+
return c.getHost() + ":" + c.getPort();
982+
}
978983
}
979984

980985
@Override

src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java

+174-21
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,32 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.Constants.*;
1718
import static com.rabbitmq.stream.Host.*;
18-
import static com.rabbitmq.stream.impl.TestUtils.b;
19-
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
19+
import static com.rabbitmq.stream.OffsetSpecification.first;
20+
import static com.rabbitmq.stream.impl.TestUtils.*;
21+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
22+
import static java.util.Arrays.asList;
2023
import static java.util.concurrent.TimeUnit.SECONDS;
2124
import static org.assertj.core.api.Assertions.assertThat;
2225

2326
import com.rabbitmq.stream.Constants;
24-
import com.rabbitmq.stream.OffsetSpecification;
27+
import com.rabbitmq.stream.Message;
28+
import com.rabbitmq.stream.sasl.CredentialsProvider;
29+
import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider;
2530
import java.nio.charset.StandardCharsets;
2631
import java.time.Duration;
27-
import java.util.Collections;
32+
import java.util.*;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.concurrent.ConcurrentMap;
2835
import java.util.concurrent.CountDownLatch;
2936
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.function.Function;
3038
import java.util.stream.IntStream;
3139
import org.junit.jupiter.api.AfterAll;
3240
import org.junit.jupiter.api.BeforeAll;
3341
import org.junit.jupiter.api.Test;
42+
import org.junit.jupiter.api.TestInfo;
3443
import org.junit.jupiter.api.extension.ExtendWith;
3544

3645
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@@ -74,11 +83,11 @@ void createStreamWithAuthorisedNameShouldSucceed() {
7483
String stream = "stream-authorized" + i;
7584
Client.Response response = client.create(stream);
7685
assertThat(response.isOk()).isTrue();
77-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
86+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
7887

7988
response = deletionClient.delete(stream);
8089
assertThat(response.isOk()).isTrue();
81-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
90+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
8291
});
8392
}
8493

@@ -105,11 +114,11 @@ void deleteStreamWithAuthorisedNameShouldSucceed() {
105114
String stream = "stream-authorized" + i;
106115
Client.Response response = creationClient.create(stream);
107116
assertThat(response.isOk()).isTrue();
108-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
117+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
109118

110119
response = client.delete(stream);
111120
assertThat(response.isOk()).isTrue();
112-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
121+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
113122
});
114123
}
115124

@@ -123,7 +132,7 @@ void deleteStreamWithUnauthorisedNameShouldFail() {
123132
String stream = "not-authorized" + i;
124133
Client.Response response = creationClient.create(stream);
125134
assertThat(response.isOk()).isTrue();
126-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
135+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
127136

128137
response = client.delete(stream);
129138
assertThat(response.isOk()).isFalse();
@@ -132,7 +141,7 @@ void deleteStreamWithUnauthorisedNameShouldFail() {
132141

133142
response = creationClient.delete(stream);
134143
assertThat(response.isOk()).isTrue();
135-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
144+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
136145
});
137146
}
138147

@@ -146,15 +155,15 @@ void subscribeToAuthorisedStreamShouldSucceed() {
146155
String stream = "stream-authorized" + i;
147156
Client.Response response = configurationClient.create(stream);
148157
assertThat(response.isOk()).isTrue();
149-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
158+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
150159

151-
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 10);
160+
response = client.subscribe(b(1), stream, first(), 10);
152161
assertThat(response.isOk()).isTrue();
153-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
162+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
154163

155164
response = configurationClient.delete(stream);
156165
assertThat(response.isOk()).isTrue();
157-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
166+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
158167
});
159168
}
160169

@@ -168,16 +177,16 @@ void subscribeToUnauthorisedStreamShouldFail() {
168177
String stream = "not-authorized" + i;
169178
Client.Response response = configurationClient.create(stream);
170179
assertThat(response.isOk()).isTrue();
171-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
180+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
172181

173-
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 10);
182+
response = client.subscribe(b(1), stream, first(), 10);
174183
assertThat(response.isOk()).isFalse();
175184
assertThat(response.getResponseCode())
176185
.isEqualTo(Constants.RESPONSE_CODE_ACCESS_REFUSED);
177186

178187
response = configurationClient.delete(stream);
179188
assertThat(response.isOk()).isTrue();
180-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
189+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
181190
});
182191
}
183192

@@ -191,7 +200,7 @@ void publishToAuthorisedStreamShouldSucceed() {
191200
String stream = "stream-authorized" + i;
192201
Client.Response response = configurationClient.create(stream);
193202
assertThat(response.isOk()).isTrue();
194-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
203+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
195204

196205
int messageCount = 1000;
197206
CountDownLatch publishConfirmLatch = new CountDownLatch(messageCount);
@@ -223,7 +232,7 @@ void publishToAuthorisedStreamShouldSucceed() {
223232

224233
response = configurationClient.delete(stream);
225234
assertThat(response.isOk()).isTrue();
226-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
235+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
227236
});
228237
}
229238

@@ -237,7 +246,7 @@ void publishToUnauthorisedStreamShouldFail() {
237246
String stream = "not-authorized" + i;
238247
Client.Response response = configurationClient.create(stream);
239248
assertThat(response.isOk()).isTrue();
240-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
249+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
241250

242251
int messageCount = 1000;
243252
CountDownLatch publishErrorLatch = new CountDownLatch(messageCount);
@@ -270,7 +279,7 @@ void publishToUnauthorisedStreamShouldFail() {
270279

271280
response = configurationClient.delete(stream);
272281
assertThat(response.isOk()).isTrue();
273-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
282+
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
274283
});
275284
}
276285

@@ -304,6 +313,142 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre
304313
}
305314
}
306315

316+
@Test
317+
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
318+
void shouldReceiveMetadataUpdateAfterUpdateSecret(TestInfo info) throws Exception {
319+
try {
320+
String newPassword = "new-password";
321+
String prefix = "passthrough-";
322+
String pubSub = TestUtils.streamName(info);
323+
String authorizedPubSub = prefix + TestUtils.streamName(info);
324+
String pub = TestUtils.streamName(info);
325+
String authorizedPub = prefix + TestUtils.streamName(info);
326+
String sub = TestUtils.streamName(info);
327+
String authorizedSub = prefix + TestUtils.streamName(info);
328+
setPermissions(USERNAME, VH, ".*");
329+
Set<String> metadataUpdates = ConcurrentHashMap.newKeySet();
330+
ConcurrentMap<Byte, Short> publishConfirms = new ConcurrentHashMap<>();
331+
ConcurrentMap<Byte, Short> creditNotifications = new ConcurrentHashMap<>();
332+
Set<Byte> receivedMessages = ConcurrentHashMap.newKeySet();
333+
Client client =
334+
cf.get(
335+
parameters()
336+
.virtualHost(VH)
337+
.username(USERNAME)
338+
.password(USERNAME)
339+
.publishConfirmListener(
340+
(publisherId, publishingId) ->
341+
publishConfirms.put(publisherId, RESPONSE_CODE_OK))
342+
.publishErrorListener(
343+
(publisherId, publishingId, errorCode) ->
344+
publishConfirms.put(publisherId, errorCode))
345+
.creditNotification(
346+
(subscriptionId, responseCode) ->
347+
creditNotifications.put(subscriptionId, responseCode))
348+
.messageListener(
349+
(subscriptionId,
350+
offset,
351+
chunkTimestamp,
352+
committedChunkId,
353+
chunkContext,
354+
message) -> receivedMessages.add(subscriptionId))
355+
.metadataListener((stream, code) -> metadataUpdates.add(stream)));
356+
assertThat(client.create(pubSub)).is(ok());
357+
assertThat(client.create(authorizedPubSub)).is(ok());
358+
assertThat(client.create(pub)).is(ok());
359+
assertThat(client.create(authorizedPub)).is(ok());
360+
assertThat(client.create(sub)).is(ok());
361+
assertThat(client.create(authorizedSub)).is(ok());
362+
363+
Map<String, Byte> publishers = new HashMap<>();
364+
publishers.put(pubSub, b(0));
365+
publishers.put(authorizedPubSub, b(1));
366+
publishers.put(pub, b(2));
367+
publishers.put(authorizedPub, b(3));
368+
publishers.forEach((s, id) -> assertThat(client.declarePublisher(id, null, s)).is(ok()));
369+
Map<String, Byte> subscriptions = new HashMap<>();
370+
subscriptions.put(pubSub, b(0));
371+
subscriptions.put(authorizedPubSub, b(1));
372+
subscriptions.put(sub, b(2));
373+
subscriptions.put(authorizedSub, b(3));
374+
subscriptions.forEach((s, id) -> assertThat(client.subscribe(id, s, first(), 1)).is(ok()));
375+
376+
Function<String, Byte> toPub = publishers::get;
377+
Function<String, Byte> toSub = subscriptions::get;
378+
379+
// change password and permissions and re-authenticate
380+
changePassword(USERNAME, newPassword);
381+
setPermissions(USERNAME, VH, "^passthrough.*$");
382+
client.authenticate(credentialsProvider(USERNAME, newPassword));
383+
384+
waitAtMost(() -> metadataUpdates.containsAll(asList(pubSub, pub, sub)));
385+
386+
List<Message> message = Collections.singletonList(client.messageBuilder().build());
387+
388+
// publishers for unauthorized streams should be gone
389+
asList(toPub.apply(pubSub), toPub.apply(pub))
390+
.forEach(
391+
wrap(
392+
pubId -> {
393+
assertThat(publishConfirms).doesNotContainKey(pubId);
394+
client.publish(pubId, message);
395+
waitAtMost(() -> publishConfirms.containsKey(pubId));
396+
assertThat(publishConfirms)
397+
.containsEntry(pubId, RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST);
398+
}));
399+
400+
// subscriptions for unauthorized streams should be gone
401+
asList(toSub.apply(pubSub), toSub.apply(sub))
402+
.forEach(
403+
wrap(
404+
subId -> {
405+
assertThat(creditNotifications).doesNotContainKey(subId);
406+
client.credit(subId, 1);
407+
waitAtMost(() -> creditNotifications.containsKey(subId));
408+
assertThat(creditNotifications)
409+
.containsEntry(subId, RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST);
410+
}));
411+
412+
// subscriptions for authorized streams still work
413+
asList(toSub.apply(authorizedPubSub), toSub.apply(authorizedSub))
414+
.forEach(subId -> client.credit(subId, 1));
415+
416+
assertThat(receivedMessages).isEmpty();
417+
// publishers for authorized streams should still work
418+
asList(toPub.apply(authorizedPubSub), toPub.apply(authorizedPub))
419+
.forEach(
420+
wrap(
421+
pubId -> {
422+
client.publish(pubId, message);
423+
waitAtMost(() -> publishConfirms.containsKey(pubId));
424+
assertThat(publishConfirms).containsEntry(pubId, RESPONSE_CODE_OK);
425+
}));
426+
427+
waitAtMost(() -> receivedMessages.contains(b(1)));
428+
429+
// send message to authorized subscription stream
430+
assertThat(client.declarePublisher(b(5), null, authorizedSub)).is(ok());
431+
client.publish(b(5), message);
432+
waitAtMost(() -> receivedMessages.contains(toSub.apply(authorizedSub)));
433+
434+
// last checks to make sure nothing unexpected arrived late
435+
assertThat(metadataUpdates).hasSize(3);
436+
assertThat(creditNotifications).containsOnlyKeys(b(0), b(2));
437+
assertThat(publishConfirms)
438+
.hasSize(4 + 1)
439+
.containsEntry(toPub.apply(pubSub), RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST)
440+
.containsEntry(toPub.apply(authorizedPubSub), RESPONSE_CODE_OK)
441+
.containsEntry(toPub.apply(pub), RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST)
442+
.containsEntry(toPub.apply(authorizedPub), RESPONSE_CODE_OK);
443+
assertThat(receivedMessages).hasSize(2);
444+
445+
client.close();
446+
} finally {
447+
changePassword(USERNAME, PASSWORD);
448+
setPermissions(USERNAME, VH, "^stream.*$");
449+
}
450+
}
451+
307452
Client configurationClient() {
308453
return cf.get(new Client.ClientParameters().virtualHost(VH));
309454
}
@@ -315,4 +460,12 @@ Client client() {
315460
Client client(Client.ClientParameters parameters) {
316461
return cf.get(parameters.virtualHost(VH).username(USERNAME).password(PASSWORD));
317462
}
463+
464+
private static Client.ClientParameters parameters() {
465+
return new Client.ClientParameters();
466+
}
467+
468+
private static CredentialsProvider credentialsProvider(String username, String password) {
469+
return new DefaultUsernamePasswordCredentialsProvider(username, password);
470+
}
318471
}

src/test/resources/logback-test.xml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
</appender>
77

88
<logger name="com.rabbitmq.stream" level="warn" />
9+
<!-- <logger name="com.rabbitmq.stream.impl.DefaultExecutorServiceFactory" level="warn" />-->
910

1011
<logger name="com.rabbitmq.stream.perf.Version" level="error" />
1112

0 commit comments

Comments
 (0)