Skip to content

Commit ad88942

Browse files
committed
Upgrade to Apache Cassandra 4.3.1
See spring-projectsgh-19588
1 parent 6774c05 commit ad88942

16 files changed

+322
-253
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java

Lines changed: 133 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,24 @@
1616

1717
package org.springframework.boot.autoconfigure.cassandra;
1818

19+
import java.security.NoSuchAlgorithmException;
1920
import java.time.Duration;
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
23+
import java.util.Map;
2024

21-
import com.datastax.driver.core.Cluster;
22-
import com.datastax.driver.core.PoolingOptions;
23-
import com.datastax.driver.core.QueryOptions;
24-
import com.datastax.driver.core.SocketOptions;
25+
import javax.net.ssl.SSLContext;
26+
27+
import com.datastax.oss.driver.api.core.CqlSession;
28+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
29+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
30+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
31+
import com.datastax.oss.driver.api.core.config.DriverOption;
32+
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
33+
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
34+
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
35+
import com.typesafe.config.Config;
36+
import com.typesafe.config.ConfigFactory;
2537

2638
import org.springframework.beans.factory.ObjectProvider;
2739
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -31,7 +43,7 @@
3143
import org.springframework.boot.context.properties.PropertyMapper;
3244
import org.springframework.context.annotation.Bean;
3345
import org.springframework.context.annotation.Configuration;
34-
import org.springframework.util.StringUtils;
46+
import org.springframework.context.annotation.Lazy;
3547

3648
/**
3749
* {@link EnableAutoConfiguration Auto-configuration} for Cassandra.
@@ -44,64 +56,142 @@
4456
* @since 1.3.0
4557
*/
4658
@Configuration(proxyBeanMethods = false)
47-
@ConditionalOnClass({ Cluster.class })
59+
@ConditionalOnClass({ CqlSession.class })
4860
@EnableConfigurationProperties(CassandraProperties.class)
4961
public class CassandraAutoConfiguration {
5062

5163
@Bean
5264
@ConditionalOnMissingBean
53-
public Cluster cassandraCluster(CassandraProperties properties,
54-
ObjectProvider<ClusterBuilderCustomizer> builderCustomizers,
55-
ObjectProvider<ClusterFactory> clusterFactory) {
65+
@Lazy
66+
public CqlSession cqlSession(CqlSessionBuilder cqlSessionBuilder) {
67+
return cqlSessionBuilder.build();
68+
}
69+
70+
@Bean
71+
@ConditionalOnMissingBean
72+
public CqlSessionBuilder cqlSessionBuilder(CassandraProperties properties, DriverConfigLoader driverConfigLoader,
73+
ObjectProvider<CqlSessionBuilderCustomizer> builderCustomizers) {
74+
CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(driverConfigLoader);
75+
configureSsl(properties, builder);
76+
builder.withKeyspace(properties.getKeyspaceName());
77+
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
78+
return builder;
79+
}
80+
81+
private void configureSsl(CassandraProperties properties, CqlSessionBuilder builder) {
82+
if (properties.isSsl()) {
83+
try {
84+
builder.withSslContext(SSLContext.getDefault());
85+
}
86+
catch (NoSuchAlgorithmException ex) {
87+
throw new IllegalStateException("Could not setup SSL default context for Cassandra", ex);
88+
}
89+
}
90+
}
91+
92+
@Bean
93+
@ConditionalOnMissingBean
94+
public DriverConfigLoader driverConfigLoader(CassandraProperties properties,
95+
ObjectProvider<DriverConfigLoaderBuilderCustomizer> builderCustomizers) {
96+
ProgrammaticDriverConfigLoaderBuilder builder = new DefaultProgrammaticDriverConfigLoaderBuilder(
97+
() -> cassandraConfiguration(properties), DefaultDriverConfigLoader.DEFAULT_ROOT_PATH);
98+
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customizer(builder));
99+
return builder.build();
100+
}
101+
102+
private Config cassandraConfiguration(CassandraProperties properties) {
103+
CassandraDriverOptions options = new CassandraDriverOptions();
56104
PropertyMapper map = PropertyMapper.get();
57-
Cluster.Builder builder = Cluster.builder().withClusterName(properties.getClusterName())
58-
.withPort(properties.getPort());
105+
map.from(properties.getClusterName()).whenHasText()
106+
.to((sessionName) -> options.add(DefaultDriverOption.SESSION_NAME, sessionName));
107+
// CONFIRMED: port is gone
59108
map.from(properties::getUsername).whenNonNull()
60-
.to((username) -> builder.withCredentials(username, properties.getPassword()));
61-
map.from(properties::getCompression).whenNonNull().to(builder::withCompression);
62-
QueryOptions queryOptions = getQueryOptions(properties);
63-
map.from(queryOptions).to(builder::withQueryOptions);
64-
SocketOptions socketOptions = getSocketOptions(properties);
65-
map.from(socketOptions).to(builder::withSocketOptions);
66-
map.from(properties::isSsl).whenTrue().toCall(builder::withSSL);
67-
PoolingOptions poolingOptions = getPoolingOptions(properties);
68-
map.from(poolingOptions).to(builder::withPoolingOptions);
69-
map.from(properties::getContactPoints).as(StringUtils::toStringArray).to(builder::addContactPoints);
70-
map.from(properties::isJmxEnabled).whenFalse().toCall(builder::withoutJMXReporting);
71-
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
72-
return clusterFactory.getIfAvailable(() -> Cluster::buildFrom).create(builder);
109+
.to((username) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, username)
110+
.add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, properties.getPassword()));
111+
map.from(properties::getCompression).whenNonNull()
112+
.to((compression) -> options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, compression));
113+
mapQueryOptions(properties, options);
114+
mapSocketOptions(properties, options);
115+
mapPoolingOptions(properties, options);
116+
map.from(properties::getContactPoints)
117+
.to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints));
118+
// TODO: JMX moved to dropwizard (metrics-jmx)
119+
ConfigFactory.invalidateCaches();
120+
return ConfigFactory.defaultOverrides().withFallback(options.build())
121+
.withFallback(ConfigFactory.defaultReference()).resolve();
73122
}
74123

75-
private QueryOptions getQueryOptions(CassandraProperties properties) {
124+
private void mapQueryOptions(CassandraProperties properties, CassandraDriverOptions options) {
76125
PropertyMapper map = PropertyMapper.get();
77-
QueryOptions options = new QueryOptions();
78-
map.from(properties::getConsistencyLevel).whenNonNull().to(options::setConsistencyLevel);
79-
map.from(properties::getSerialConsistencyLevel).whenNonNull().to(options::setSerialConsistencyLevel);
80-
map.from(properties::getFetchSize).to(options::setFetchSize);
81-
return options;
126+
map.from(properties::getConsistencyLevel).whenNonNull()
127+
.to(((consistency) -> options.add(DefaultDriverOption.REQUEST_CONSISTENCY, consistency)));
128+
map.from(properties::getSerialConsistencyLevel).whenNonNull().to(
129+
(serialConsistency) -> options.add(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, serialConsistency));
130+
map.from(properties::getFetchSize)
131+
.to((fetchSize) -> options.add(DefaultDriverOption.REQUEST_PAGE_SIZE, fetchSize));
82132
}
83133

84-
private SocketOptions getSocketOptions(CassandraProperties properties) {
134+
private void mapSocketOptions(CassandraProperties properties, CassandraDriverOptions options) {
85135
PropertyMapper map = PropertyMapper.get();
86-
SocketOptions options = new SocketOptions();
136+
// TODO: change of semantic?
87137
map.from(properties::getConnectTimeout).whenNonNull().asInt(Duration::toMillis)
88-
.to(options::setConnectTimeoutMillis);
89-
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis).to(options::setReadTimeoutMillis);
90-
return options;
138+
.to((connectTimeout) -> options.add(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, connectTimeout));
139+
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis)
140+
.to((readTimeout) -> options.add(DefaultDriverOption.REQUEST_TIMEOUT, readTimeout));
91141
}
92142

93-
private PoolingOptions getPoolingOptions(CassandraProperties properties) {
143+
// TODO: review option mapping
144+
private void mapPoolingOptions(CassandraProperties properties, CassandraDriverOptions options) {
94145
PropertyMapper map = PropertyMapper.get();
95146
CassandraProperties.Pool poolProperties = properties.getPool();
96-
PoolingOptions options = new PoolingOptions();
97147
map.from(poolProperties::getIdleTimeout).whenNonNull().asInt(Duration::getSeconds)
98-
.to(options::setIdleTimeoutSeconds);
99-
map.from(poolProperties::getPoolTimeout).whenNonNull().asInt(Duration::toMillis)
100-
.to(options::setPoolTimeoutMillis);
148+
.to((idleTimeout) -> options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeout));
149+
/*
150+
* TODO: looks gone
151+
* map.from(poolProperties::getPoolTimeout).whenNonNull().asInt(Duration::
152+
* toMillis) .to((poolTimeout) ->
153+
* options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, poolTimeout));
154+
*
155+
*/
101156
map.from(poolProperties::getHeartbeatInterval).whenNonNull().asInt(Duration::getSeconds)
102-
.to(options::setHeartbeatIntervalSeconds);
103-
map.from(poolProperties::getMaxQueueSize).to(options::setMaxQueueSize);
104-
return options;
157+
.to((heartBeatInterval) -> options.add(DefaultDriverOption.HEARTBEAT_INTERVAL, heartBeatInterval));
158+
map.from(poolProperties::getMaxQueueSize)
159+
.to((maxQueueSize) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize));
160+
}
161+
162+
private static class CassandraDriverOptions {
163+
164+
private final Map<String, String> options = new LinkedHashMap<>();
165+
166+
private CassandraDriverOptions add(DriverOption option, String value) {
167+
String key = createKeyFor(option);
168+
this.options.put(key, value);
169+
return this;
170+
}
171+
172+
private CassandraDriverOptions add(DriverOption option, int value) {
173+
return add(option, String.valueOf(value));
174+
}
175+
176+
private CassandraDriverOptions add(DriverOption option, Enum<?> value) {
177+
return add(option, value.name());
178+
}
179+
180+
private CassandraDriverOptions add(DriverOption option, List<String> values) {
181+
for (int i = 0; i < values.size(); i++) {
182+
this.options.put(String.format("%s.%s", createKeyFor(option), i), values.get(i));
183+
}
184+
return this;
185+
}
186+
187+
private Config build() {
188+
return ConfigFactory.parseMap(this.options, "Environment");
189+
}
190+
191+
private static String createKeyFor(DriverOption option) {
192+
return String.format("%s.%s", DefaultDriverConfigLoader.DEFAULT_ROOT_PATH, option.getPath());
193+
}
194+
105195
}
106196

107197
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraProperties.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424

25-
import com.datastax.driver.core.ConsistencyLevel;
26-
import com.datastax.driver.core.ProtocolOptions;
27-
import com.datastax.driver.core.ProtocolOptions.Compression;
28-
import com.datastax.driver.core.QueryOptions;
25+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
2926

3027
import org.springframework.boot.context.properties.ConfigurationProperties;
3128
import org.springframework.boot.convert.DurationUnit;
@@ -55,12 +52,12 @@ public class CassandraProperties {
5552
/**
5653
* Cluster node addresses.
5754
*/
58-
private final List<String> contactPoints = new ArrayList<>(Collections.singleton("localhost"));
55+
private final List<String> contactPoints = new ArrayList<>(Collections.singleton("127.0.0.1:9042"));
5956

6057
/**
6158
* Port of the Cassandra server.
6259
*/
63-
private int port = ProtocolOptions.DEFAULT_PORT;
60+
private int port = 9042;
6461

6562
/**
6663
* Login user of the server.
@@ -80,17 +77,17 @@ public class CassandraProperties {
8077
/**
8178
* Queries consistency level.
8279
*/
83-
private ConsistencyLevel consistencyLevel;
80+
private DefaultConsistencyLevel consistencyLevel;
8481

8582
/**
8683
* Queries serial consistency level.
8784
*/
88-
private ConsistencyLevel serialConsistencyLevel;
85+
private DefaultConsistencyLevel serialConsistencyLevel;
8986

9087
/**
9188
* Queries default fetch size.
9289
*/
93-
private int fetchSize = QueryOptions.DEFAULT_FETCH_SIZE;
90+
private int fetchSize = 5000;
9491

9592
/**
9693
* Socket option: connection time out.
@@ -175,19 +172,19 @@ public void setCompression(Compression compression) {
175172
this.compression = compression;
176173
}
177174

178-
public ConsistencyLevel getConsistencyLevel() {
175+
public DefaultConsistencyLevel getConsistencyLevel() {
179176
return this.consistencyLevel;
180177
}
181178

182-
public void setConsistencyLevel(ConsistencyLevel consistency) {
179+
public void setConsistencyLevel(DefaultConsistencyLevel consistency) {
183180
this.consistencyLevel = consistency;
184181
}
185182

186-
public ConsistencyLevel getSerialConsistencyLevel() {
183+
public DefaultConsistencyLevel getSerialConsistencyLevel() {
187184
return this.serialConsistencyLevel;
188185
}
189186

190-
public void setSerialConsistencyLevel(ConsistencyLevel serialConsistency) {
187+
public void setSerialConsistencyLevel(DefaultConsistencyLevel serialConsistency) {
191188
this.serialConsistencyLevel = serialConsistency;
192189
}
193190

@@ -307,4 +304,26 @@ public void setMaxQueueSize(int maxQueueSize) {
307304

308305
}
309306

307+
/**
308+
* Name of the algorithm used to compress protocol frames.
309+
*/
310+
public enum Compression {
311+
312+
/**
313+
* Requires 'net.jpountz.lz4:lz4'.
314+
*/
315+
LZ4,
316+
317+
/**
318+
* Requires org.xerial.snappy:snappy-java.
319+
*/
320+
SNAPPY,
321+
322+
/**
323+
* No compression.
324+
*/
325+
NONE;
326+
327+
}
328+
310329
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/ClusterFactory.java

Lines changed: 0 additions & 39 deletions
This file was deleted.
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,24 @@
1616

1717
package org.springframework.boot.autoconfigure.cassandra;
1818

19-
import com.datastax.driver.core.Cluster;
20-
import com.datastax.driver.core.Cluster.Builder;
19+
import com.datastax.oss.driver.api.core.CqlSession;
20+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
2121

2222
/**
2323
* Callback interface that can be implemented by beans wishing to customize the
24-
* {@link Cluster} via a {@link Builder Cluster.Builder} whilst retaining default
24+
* {@link CqlSession} via a {@link CqlSessionBuilder} whilst retaining default
2525
* auto-configuration.
2626
*
27-
* @author Eddú Meléndez
28-
* @since 1.5.0
27+
* @author Stephane Nicoll
28+
* @since 2.3.0
2929
*/
3030
@FunctionalInterface
31-
public interface ClusterBuilderCustomizer {
31+
public interface CqlSessionBuilderCustomizer {
3232

3333
/**
34-
* Customize the {@link Builder}.
35-
* @param clusterBuilder the builder to customize
34+
* Customize the {@link CqlSessionBuilder}.
35+
* @param cqlSessionBuilder the builder to customize
3636
*/
37-
void customize(Builder clusterBuilder);
37+
void customize(CqlSessionBuilder cqlSessionBuilder);
3838

3939
}

0 commit comments

Comments
 (0)