diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index ac97403f9d..c457b44796 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -974,7 +974,12 @@ private Address address() { } private String label() { - return address.host() + ":" + address.port(); + Client c = this.nullableClient(); + if (c == null) { + return address.host() + ":" + address.port(); + } else { + return c.getHost() + ":" + c.getPort(); + } } @Override diff --git a/src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java b/src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java index 47d595da6b..8a86547fbf 100644 --- a/src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java @@ -14,23 +14,32 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.Host.*; -import static com.rabbitmq.stream.impl.TestUtils.b; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.OffsetSpecification.first; +import static com.rabbitmq.stream.impl.TestUtils.*; +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.sasl.CredentialsProvider; +import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Collections; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) @@ -74,11 +83,11 @@ void createStreamWithAuthorisedNameShouldSucceed() { String stream = "stream-authorized" + i; Client.Response response = client.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); response = deletionClient.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -105,11 +114,11 @@ void deleteStreamWithAuthorisedNameShouldSucceed() { String stream = "stream-authorized" + i; Client.Response response = creationClient.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); response = client.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -123,7 +132,7 @@ void deleteStreamWithUnauthorisedNameShouldFail() { String stream = "not-authorized" + i; Client.Response response = creationClient.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); response = client.delete(stream); assertThat(response.isOk()).isFalse(); @@ -132,7 +141,7 @@ void deleteStreamWithUnauthorisedNameShouldFail() { response = creationClient.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -146,15 +155,15 @@ void subscribeToAuthorisedStreamShouldSucceed() { String stream = "stream-authorized" + i; Client.Response response = configurationClient.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); - response = client.subscribe(b(1), stream, OffsetSpecification.first(), 10); + response = client.subscribe(b(1), stream, first(), 10); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); response = configurationClient.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -168,16 +177,16 @@ void subscribeToUnauthorisedStreamShouldFail() { String stream = "not-authorized" + i; Client.Response response = configurationClient.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); - response = client.subscribe(b(1), stream, OffsetSpecification.first(), 10); + response = client.subscribe(b(1), stream, first(), 10); assertThat(response.isOk()).isFalse(); assertThat(response.getResponseCode()) .isEqualTo(Constants.RESPONSE_CODE_ACCESS_REFUSED); response = configurationClient.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -191,7 +200,7 @@ void publishToAuthorisedStreamShouldSucceed() { String stream = "stream-authorized" + i; Client.Response response = configurationClient.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); int messageCount = 1000; CountDownLatch publishConfirmLatch = new CountDownLatch(messageCount); @@ -223,7 +232,7 @@ void publishToAuthorisedStreamShouldSucceed() { response = configurationClient.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -237,7 +246,7 @@ void publishToUnauthorisedStreamShouldFail() { String stream = "not-authorized" + i; Client.Response response = configurationClient.create(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); int messageCount = 1000; CountDownLatch publishErrorLatch = new CountDownLatch(messageCount); @@ -270,7 +279,7 @@ void publishToUnauthorisedStreamShouldFail() { response = configurationClient.delete(stream); assertThat(response.isOk()).isTrue(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK); }); } @@ -304,6 +313,142 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre } } + @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) + void shouldReceiveMetadataUpdateAfterUpdateSecret(TestInfo info) throws Exception { + try { + String newPassword = "new-password"; + String prefix = "passthrough-"; + String pubSub = TestUtils.streamName(info); + String authorizedPubSub = prefix + TestUtils.streamName(info); + String pub = TestUtils.streamName(info); + String authorizedPub = prefix + TestUtils.streamName(info); + String sub = TestUtils.streamName(info); + String authorizedSub = prefix + TestUtils.streamName(info); + setPermissions(USERNAME, VH, ".*"); + Set metadataUpdates = ConcurrentHashMap.newKeySet(); + ConcurrentMap publishConfirms = new ConcurrentHashMap<>(); + ConcurrentMap creditNotifications = new ConcurrentHashMap<>(); + Set receivedMessages = ConcurrentHashMap.newKeySet(); + Client client = + cf.get( + parameters() + .virtualHost(VH) + .username(USERNAME) + .password(USERNAME) + .publishConfirmListener( + (publisherId, publishingId) -> + publishConfirms.put(publisherId, RESPONSE_CODE_OK)) + .publishErrorListener( + (publisherId, publishingId, errorCode) -> + publishConfirms.put(publisherId, errorCode)) + .creditNotification( + (subscriptionId, responseCode) -> + creditNotifications.put(subscriptionId, responseCode)) + .messageListener( + (subscriptionId, + offset, + chunkTimestamp, + committedChunkId, + chunkContext, + message) -> receivedMessages.add(subscriptionId)) + .metadataListener((stream, code) -> metadataUpdates.add(stream))); + assertThat(client.create(pubSub)).is(ok()); + assertThat(client.create(authorizedPubSub)).is(ok()); + assertThat(client.create(pub)).is(ok()); + assertThat(client.create(authorizedPub)).is(ok()); + assertThat(client.create(sub)).is(ok()); + assertThat(client.create(authorizedSub)).is(ok()); + + Map publishers = new HashMap<>(); + publishers.put(pubSub, b(0)); + publishers.put(authorizedPubSub, b(1)); + publishers.put(pub, b(2)); + publishers.put(authorizedPub, b(3)); + publishers.forEach((s, id) -> assertThat(client.declarePublisher(id, null, s)).is(ok())); + Map subscriptions = new HashMap<>(); + subscriptions.put(pubSub, b(0)); + subscriptions.put(authorizedPubSub, b(1)); + subscriptions.put(sub, b(2)); + subscriptions.put(authorizedSub, b(3)); + subscriptions.forEach((s, id) -> assertThat(client.subscribe(id, s, first(), 1)).is(ok())); + + Function toPub = publishers::get; + Function toSub = subscriptions::get; + + // change password and permissions and re-authenticate + changePassword(USERNAME, newPassword); + setPermissions(USERNAME, VH, "^passthrough.*$"); + client.authenticate(credentialsProvider(USERNAME, newPassword)); + + waitAtMost(() -> metadataUpdates.containsAll(asList(pubSub, pub, sub))); + + List message = Collections.singletonList(client.messageBuilder().build()); + + // publishers for unauthorized streams should be gone + asList(toPub.apply(pubSub), toPub.apply(pub)) + .forEach( + wrap( + pubId -> { + assertThat(publishConfirms).doesNotContainKey(pubId); + client.publish(pubId, message); + waitAtMost(() -> publishConfirms.containsKey(pubId)); + assertThat(publishConfirms) + .containsEntry(pubId, RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST); + })); + + // subscriptions for unauthorized streams should be gone + asList(toSub.apply(pubSub), toSub.apply(sub)) + .forEach( + wrap( + subId -> { + assertThat(creditNotifications).doesNotContainKey(subId); + client.credit(subId, 1); + waitAtMost(() -> creditNotifications.containsKey(subId)); + assertThat(creditNotifications) + .containsEntry(subId, RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST); + })); + + // subscriptions for authorized streams still work + asList(toSub.apply(authorizedPubSub), toSub.apply(authorizedSub)) + .forEach(subId -> client.credit(subId, 1)); + + assertThat(receivedMessages).isEmpty(); + // publishers for authorized streams should still work + asList(toPub.apply(authorizedPubSub), toPub.apply(authorizedPub)) + .forEach( + wrap( + pubId -> { + client.publish(pubId, message); + waitAtMost(() -> publishConfirms.containsKey(pubId)); + assertThat(publishConfirms).containsEntry(pubId, RESPONSE_CODE_OK); + })); + + waitAtMost(() -> receivedMessages.contains(b(1))); + + // send message to authorized subscription stream + assertThat(client.declarePublisher(b(5), null, authorizedSub)).is(ok()); + client.publish(b(5), message); + waitAtMost(() -> receivedMessages.contains(toSub.apply(authorizedSub))); + + // last checks to make sure nothing unexpected arrived late + assertThat(metadataUpdates).hasSize(3); + assertThat(creditNotifications).containsOnlyKeys(b(0), b(2)); + assertThat(publishConfirms) + .hasSize(4 + 1) + .containsEntry(toPub.apply(pubSub), RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST) + .containsEntry(toPub.apply(authorizedPubSub), RESPONSE_CODE_OK) + .containsEntry(toPub.apply(pub), RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST) + .containsEntry(toPub.apply(authorizedPub), RESPONSE_CODE_OK); + assertThat(receivedMessages).hasSize(2); + + client.close(); + } finally { + changePassword(USERNAME, PASSWORD); + setPermissions(USERNAME, VH, "^stream.*$"); + } + } + Client configurationClient() { return cf.get(new Client.ClientParameters().virtualHost(VH)); } @@ -315,4 +460,12 @@ Client client() { Client client(Client.ClientParameters parameters) { return cf.get(parameters.virtualHost(VH).username(USERNAME).password(PASSWORD)); } + + private static Client.ClientParameters parameters() { + return new Client.ClientParameters(); + } + + private static CredentialsProvider credentialsProvider(String username, String password) { + return new DefaultUsernamePasswordCredentialsProvider(username, password); + } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 4bec720537..6931d5f21a 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -6,6 +6,7 @@ +