Skip to content

Commit cb0536d

Browse files
committed
Create new client properties map on client parameters duplication
To avoid duplicates use the same map instance and so interfere. Fixes #234
1 parent bd33ca1 commit cb0536d

File tree

6 files changed

+56
-31
lines changed

6 files changed

+56
-31
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -2028,11 +2028,11 @@ public int hashCode() {
20282028

20292029
public static class ClientParameters {
20302030

2031-
private final Map<String, String> clientProperties = new HashMap<>();
2031+
private final Map<String, String> clientProperties = new ConcurrentHashMap<>();
20322032
EventLoopGroup eventLoopGroup;
2033-
Codec codec;
2034-
String host = "localhost";
2035-
int port = DEFAULT_PORT;
2033+
private Codec codec;
2034+
private String host = "localhost";
2035+
private int port = DEFAULT_PORT;
20362036
CompressionCodecFactory compressionCodecFactory;
20372037
private String virtualHost = "/";
20382038
private Duration requestedHeartbeat = Duration.ofSeconds(60);
@@ -2227,12 +2227,32 @@ public ClientParameters rpcTimeout(Duration rpcTimeout) {
22272227
return this;
22282228
}
22292229

2230+
String host() {
2231+
return this.host;
2232+
}
2233+
2234+
int port() {
2235+
return this.port;
2236+
}
2237+
2238+
Map<String, String> clientProperties() {
2239+
return Collections.unmodifiableMap(this.clientProperties);
2240+
}
2241+
2242+
Codec codec() {
2243+
return this.codec;
2244+
}
2245+
22302246
ClientParameters duplicate() {
22312247
ClientParameters duplicate = new ClientParameters();
22322248
for (Field field : ClientParameters.class.getDeclaredFields()) {
22332249
field.setAccessible(true);
22342250
try {
2235-
field.set(duplicate, field.get(this));
2251+
Object value = field.get(this);
2252+
if (value instanceof Map) {
2253+
value = new ConcurrentHashMap<>((Map<?, ?>) value);
2254+
}
2255+
field.set(duplicate, value);
22362256
} catch (IllegalAccessException e) {
22372257
throw new StreamException("Error while duplicating client parameters", e);
22382258
}

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -594,12 +594,11 @@ private ClientSubscriptionsManager(
594594
}
595595
return result;
596596
};
597+
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.CONSUMER);
597598
ClientFactoryContext clientFactoryContext =
598599
ClientFactoryContext.fromParameters(
599600
clientParameters
600-
.clientProperty(
601-
"connection_name",
602-
connectionNamingStrategy.apply(ClientConnectionType.CONSUMER))
601+
.clientProperty("connection_name", connectionName)
603602
.chunkListener(chunkListener)
604603
.creditNotification(creditNotification)
605604
.messageListener(messageListener)
@@ -608,6 +607,7 @@ private ClientSubscriptionsManager(
608607
.consumerUpdateListener(consumerUpdateListener))
609608
.key(owner.name);
610609
this.client = clientFactory.client(clientFactoryContext);
610+
LOGGER.debug("Created consumer connection '{}'", connectionName);
611611
maybeExchangeCommandVersions(client);
612612
clientInitializedInManager.set(true);
613613
}

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -490,18 +490,18 @@ private ClientProducersManager(
490490
}
491491
}
492492
};
493+
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.PRODUCER);
493494
ClientFactoryContext connectionFactoryContext =
494495
ClientFactoryContext.fromParameters(
495496
clientParameters
496497
.publishConfirmListener(publishConfirmListener)
497498
.publishErrorListener(publishErrorListener)
498499
.shutdownListener(shutdownListener)
499500
.metadataListener(metadataListener)
500-
.clientProperty(
501-
"connection_name",
502-
connectionNamingStrategy.apply(ClientConnectionType.PRODUCER)))
501+
.clientProperty("connection_name", connectionName))
503502
.key(owner.name);
504503
this.client = cf.client(connectionFactoryContext);
504+
LOGGER.debug("Created producer connection '{}'", connectionName);
505505
clientInitializedInManager.set(true);
506506
ref.set(this.client);
507507
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ class StreamEnvironment implements Environment {
175175
if (uris.isEmpty()) {
176176
this.addresses =
177177
Collections.singletonList(
178-
new Address(clientParametersPrototype.host, clientParametersPrototype.port));
178+
new Address(clientParametersPrototype.host(), clientParametersPrototype.port()));
179179
} else {
180180
int defaultPort = tls ? Client.DEFAULT_TLS_PORT : Client.DEFAULT_PORT;
181181
this.addresses =
@@ -241,15 +241,15 @@ class StreamEnvironment implements Environment {
241241
: addresses.get(random.nextInt(addresses.size()));
242242
address = addressResolver.resolve(address);
243243
LOGGER.debug("Trying to reconnect locator on {}", address);
244+
String connectionName =
245+
connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
244246
Client newLocator =
245247
clientFactory.apply(
246248
newLocatorParameters
247249
.host(address.host())
248250
.port(address.port())
249-
.clientProperty(
250-
"connection_name",
251-
connectionNamingStrategy.apply(
252-
ClientConnectionType.LOCATOR)));
251+
.clientProperty("connection_name", connectionName));
252+
LOGGER.debug("Created locator connection '{}'", connectionName);
253253
LOGGER.debug("Locator connected on {}", address);
254254
return newLocator;
255255
})
@@ -267,17 +267,17 @@ class StreamEnvironment implements Environment {
267267
RuntimeException lastException = null;
268268
for (Address address : addresses) {
269269
address = addressResolver.resolve(address);
270+
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
270271
Client.ClientParameters locatorParameters =
271272
clientParametersForInit
272273
.duplicate()
273274
.host(address.host())
274275
.port(address.port())
275-
.clientProperty(
276-
"connection_name",
277-
connectionNamingStrategy.apply(ClientConnectionType.LOCATOR))
276+
.clientProperty("connection_name", connectionName)
278277
.shutdownListener(shutdownListenerReference.get());
279278
try {
280279
this.locator = clientFactory.apply(locatorParameters);
280+
LOGGER.debug("Created locator connection '{}'", connectionName);
281281
LOGGER.debug("Locator connected to {}", address);
282282
break;
283283
} catch (RuntimeException e) {
@@ -297,7 +297,9 @@ class StreamEnvironment implements Environment {
297297
this.locatorInitializationSequence = () -> {};
298298
}
299299
this.codec =
300-
clientParametersPrototype.codec == null ? Codecs.DEFAULT : clientParametersPrototype.codec;
300+
clientParametersPrototype.codec() == null
301+
? Codecs.DEFAULT
302+
: clientParametersPrototype.codec();
301303
this.clockRefreshFuture =
302304
this.scheduledExecutorService.scheduleAtFixedRate(
303305
() -> this.clock.refresh(), 1, 1, SECONDS);

src/main/java/com/rabbitmq/stream/impl/Utils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ static short encodeResponseCode(Short code) {
114114
static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
115115
return context -> {
116116
ClientParameters parametersCopy = context.parameters().duplicate();
117-
Address address = new Address(parametersCopy.host, parametersCopy.port);
117+
Address address = new Address(parametersCopy.host(), parametersCopy.port());
118118
address = environment.addressResolver().resolve(address);
119119
parametersCopy.host(address.host()).port(address.port());
120120

src/test/java/com/rabbitmq/stream/impl/ClientParametersTest.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,24 @@
1515

1616
import static org.assertj.core.api.Assertions.assertThat;
1717

18-
import java.lang.reflect.Field;
18+
import com.rabbitmq.stream.impl.Client.ClientParameters;
1919
import org.junit.jupiter.api.Test;
2020

2121
public class ClientParametersTest {
2222

2323
@Test
24-
void duplicate() throws Exception {
25-
Client.ClientParameters clientParameters =
24+
void duplicate() {
25+
Client.ClientParameters clientParameters1 =
2626
new Client.ClientParameters().host("rabbitmq").port(5556);
27-
clientParameters = clientParameters.duplicate();
28-
Field hostField = Client.ClientParameters.class.getDeclaredField("host");
29-
hostField.setAccessible(true);
30-
Field portField = Client.ClientParameters.class.getDeclaredField("port");
31-
portField.setAccessible(true);
32-
assertThat(hostField.get(clientParameters)).isEqualTo("rabbitmq");
33-
assertThat(portField.get(clientParameters)).isEqualTo(5556);
27+
clientParameters1.clientProperty("connection_name", "producer");
28+
ClientParameters clientParameters2 = clientParameters1.duplicate();
29+
assertThat(clientParameters2.host()).isEqualTo("rabbitmq");
30+
assertThat(clientParameters2.port()).isEqualTo(5556);
31+
32+
// same as original
33+
assertThat(clientParameters2.clientProperties()).containsEntry("connection_name", "producer");
34+
// changing the copy should not change the original
35+
clientParameters2.clientProperty("connection_name", "consumer");
36+
assertThat(clientParameters1.clientProperties()).containsEntry("connection_name", "producer");
3437
}
3538
}

0 commit comments

Comments
 (0)