Skip to content

Commit ca1710e

Browse files
committed
Upgrade to Apache Cassandra 4.3.1
See gh-19588
1 parent d282eb6 commit ca1710e

File tree

31 files changed

+406
-589
lines changed

31 files changed

+406
-589
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/cassandra/CassandraHealthContributorAutoConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
1818

1919
import java.util.Map;
2020

21-
import com.datastax.driver.core.Cluster;
21+
import com.datastax.oss.driver.api.core.CqlSession;
2222

2323
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthContributorConfiguration;
2424
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
@@ -44,7 +44,7 @@
4444
* @since 2.1.0
4545
*/
4646
@Configuration(proxyBeanMethods = false)
47-
@ConditionalOnClass({ Cluster.class, CassandraOperations.class })
47+
@ConditionalOnClass({ CqlSession.class, CassandraOperations.class })
4848
@ConditionalOnBean(CassandraOperations.class)
4949
@ConditionalOnEnabledHealthIndicator("cassandra")
5050
@AutoConfigureAfter({ CassandraAutoConfiguration.class, CassandraDataAutoConfiguration.class,

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/cassandra/CassandraReactiveHealthContributorAutoConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
1717

1818
import java.util.Map;
1919

20-
import com.datastax.driver.core.Cluster;
20+
import com.datastax.oss.driver.api.core.CqlSession;
2121
import reactor.core.publisher.Flux;
2222

2323
import org.springframework.boot.actuate.autoconfigure.health.CompositeReactiveHealthContributorConfiguration;
@@ -43,7 +43,7 @@
4343
* @since 2.1.0
4444
*/
4545
@Configuration(proxyBeanMethods = false)
46-
@ConditionalOnClass({ Cluster.class, ReactiveCassandraOperations.class, Flux.class })
46+
@ConditionalOnClass({ CqlSession.class, ReactiveCassandraOperations.class, Flux.class })
4747
@ConditionalOnBean(ReactiveCassandraOperations.class)
4848
@ConditionalOnEnabledHealthIndicator("cassandra")
4949
@AutoConfigureAfter(CassandraReactiveDataAutoConfiguration.class)

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraHealthIndicator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,8 @@
1616

1717
package org.springframework.boot.actuate.cassandra;
1818

19-
import com.datastax.driver.core.ResultSet;
20-
import com.datastax.driver.core.querybuilder.QueryBuilder;
21-
import com.datastax.driver.core.querybuilder.Select;
19+
import com.datastax.oss.driver.api.core.cql.ResultSet;
20+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
2221

2322
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
2423
import org.springframework.boot.actuate.health.Health;
@@ -53,9 +52,9 @@ public CassandraHealthIndicator(CassandraOperations cassandraOperations) {
5352

5453
@Override
5554
protected void doHealthCheck(Health.Builder builder) throws Exception {
56-
Select select = QueryBuilder.select("release_version").from("system", "local");
55+
SimpleStatement select = SimpleStatement.newInstance("SELECT release_version FROM system.local");
5756
ResultSet results = this.cassandraOperations.getCqlOperations().queryForResultSet(select);
58-
if (results.isExhausted()) {
57+
if (results.isFullyFetched()) {
5958
builder.up();
6059
return;
6160
}

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraReactiveHealthIndicator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,8 +15,7 @@
1515
*/
1616
package org.springframework.boot.actuate.cassandra;
1717

18-
import com.datastax.driver.core.querybuilder.QueryBuilder;
19-
import com.datastax.driver.core.querybuilder.Select;
18+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
2019
import reactor.core.publisher.Mono;
2120

2221
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
@@ -47,7 +46,7 @@ public CassandraReactiveHealthIndicator(ReactiveCassandraOperations reactiveCass
4746

4847
@Override
4948
protected Mono<Health> doHealthCheck(Health.Builder builder) {
50-
Select select = QueryBuilder.select("release_version").from("system", "local");
49+
SimpleStatement select = SimpleStatement.newInstance("SELECT release_version FROM system.local");
5150
return this.reactiveCassandraOperations.getReactiveCqlOperations().queryForObject(select, String.class)
5251
.map((version) -> builder.up().withDetail("version", version).build()).single();
5352
}

spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraHealthIndicatorTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,9 @@
1616

1717
package org.springframework.boot.actuate.cassandra;
1818

19-
import com.datastax.driver.core.ResultSet;
20-
import com.datastax.driver.core.Row;
21-
import com.datastax.driver.core.querybuilder.Select;
19+
import com.datastax.oss.driver.api.core.cql.ResultSet;
20+
import com.datastax.oss.driver.api.core.cql.Row;
21+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
2222
import org.junit.jupiter.api.Test;
2323

2424
import org.springframework.boot.actuate.health.Health;
@@ -51,8 +51,8 @@ void verifyHealthStatusWhenExhausted() {
5151
ResultSet resultSet = mock(ResultSet.class);
5252
CassandraHealthIndicator healthIndicator = new CassandraHealthIndicator(cassandraOperations);
5353
given(cassandraOperations.getCqlOperations()).willReturn(cqlOperations);
54-
given(cqlOperations.queryForResultSet(any(Select.class))).willReturn(resultSet);
55-
given(resultSet.isExhausted()).willReturn(true);
54+
given(cqlOperations.queryForResultSet(any(SimpleStatement.class))).willReturn(resultSet);
55+
given(resultSet.isFullyFetched()).willReturn(true);
5656
Health health = healthIndicator.health();
5757
assertThat(health.getStatus()).isEqualTo(Status.UP);
5858
}
@@ -65,8 +65,8 @@ void verifyHealthStatusWithVersion() {
6565
Row row = mock(Row.class);
6666
CassandraHealthIndicator healthIndicator = new CassandraHealthIndicator(cassandraOperations);
6767
given(cassandraOperations.getCqlOperations()).willReturn(cqlOperations);
68-
given(cqlOperations.queryForResultSet(any(Select.class))).willReturn(resultSet);
69-
given(resultSet.isExhausted()).willReturn(false);
68+
given(cqlOperations.queryForResultSet(any(SimpleStatement.class))).willReturn(resultSet);
69+
given(resultSet.isFullyFetched()).willReturn(false);
7070
given(resultSet.one()).willReturn(row);
7171
String expectedVersion = "1.0.0";
7272
given(row.getString(0)).willReturn(expectedVersion);

spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraReactiveHealthIndicatorTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
1515
*/
1616
package org.springframework.boot.actuate.cassandra;
1717

18-
import com.datastax.driver.core.querybuilder.Select;
18+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
1919
import org.junit.jupiter.api.Test;
2020
import reactor.core.publisher.Mono;
2121
import reactor.test.StepVerifier;
@@ -42,7 +42,8 @@ class CassandraReactiveHealthIndicatorTests {
4242
@Test
4343
void testCassandraIsUp() {
4444
ReactiveCqlOperations reactiveCqlOperations = mock(ReactiveCqlOperations.class);
45-
given(reactiveCqlOperations.queryForObject(any(Select.class), eq(String.class))).willReturn(Mono.just("6.0.0"));
45+
given(reactiveCqlOperations.queryForObject(any(SimpleStatement.class), eq(String.class)))
46+
.willReturn(Mono.just("6.0.0"));
4647
ReactiveCassandraOperations reactiveCassandraOperations = mock(ReactiveCassandraOperations.class);
4748
given(reactiveCassandraOperations.getReactiveCqlOperations()).willReturn(reactiveCqlOperations);
4849

Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,24 @@
1616

1717
package org.springframework.boot.autoconfigure.cassandra;
1818

19+
import java.security.NoSuchAlgorithmException;
1920
import java.time.Duration;
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
23+
import java.util.Map;
2024

21-
import com.datastax.driver.core.Cluster;
22-
import com.datastax.driver.core.PoolingOptions;
23-
import com.datastax.driver.core.QueryOptions;
24-
import com.datastax.driver.core.SocketOptions;
25+
import javax.net.ssl.SSLContext;
26+
27+
import com.datastax.oss.driver.api.core.CqlSession;
28+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
29+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
30+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
31+
import com.datastax.oss.driver.api.core.config.DriverOption;
32+
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
33+
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
34+
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
35+
import com.typesafe.config.Config;
36+
import com.typesafe.config.ConfigFactory;
2537

2638
import org.springframework.beans.factory.ObjectProvider;
2739
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -31,7 +43,7 @@
3143
import org.springframework.boot.context.properties.PropertyMapper;
3244
import org.springframework.context.annotation.Bean;
3345
import org.springframework.context.annotation.Configuration;
34-
import org.springframework.util.StringUtils;
46+
import org.springframework.context.annotation.Lazy;
3547

3648
/**
3749
* {@link EnableAutoConfiguration Auto-configuration} for Cassandra.
@@ -44,64 +56,131 @@
4456
* @since 1.3.0
4557
*/
4658
@Configuration(proxyBeanMethods = false)
47-
@ConditionalOnClass({ Cluster.class })
59+
@ConditionalOnClass({ CqlSession.class })
4860
@EnableConfigurationProperties(CassandraProperties.class)
4961
public class CassandraAutoConfiguration {
5062

5163
@Bean
5264
@ConditionalOnMissingBean
53-
public Cluster cassandraCluster(CassandraProperties properties,
54-
ObjectProvider<ClusterBuilderCustomizer> builderCustomizers,
55-
ObjectProvider<ClusterFactory> clusterFactory) {
65+
@Lazy
66+
public CqlSession cqlSession(CqlSessionBuilder cqlSessionBuilder) {
67+
return cqlSessionBuilder.build();
68+
}
69+
70+
@Bean
71+
@ConditionalOnMissingBean
72+
public CqlSessionBuilder cqlSessionBuilder(CassandraProperties properties, DriverConfigLoader driverConfigLoader,
73+
ObjectProvider<CqlSessionBuilderCustomizer> builderCustomizers) {
74+
CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(driverConfigLoader);
75+
configureSsl(properties, builder);
76+
builder.withKeyspace(properties.getKeyspaceName());
77+
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
78+
return builder;
79+
}
80+
81+
private void configureSsl(CassandraProperties properties, CqlSessionBuilder builder) {
82+
if (properties.isSsl()) {
83+
try {
84+
builder.withSslContext(SSLContext.getDefault());
85+
}
86+
catch (NoSuchAlgorithmException ex) {
87+
throw new IllegalStateException("Could not setup SSL default context for Cassandra", ex);
88+
}
89+
}
90+
}
91+
92+
@Bean
93+
@ConditionalOnMissingBean
94+
public DriverConfigLoader driverConfigLoader(CassandraProperties properties,
95+
ObjectProvider<DriverConfigLoaderBuilderCustomizer> builderCustomizers) {
96+
ProgrammaticDriverConfigLoaderBuilder builder = new DefaultProgrammaticDriverConfigLoaderBuilder(
97+
() -> cassandraConfiguration(properties), DefaultDriverConfigLoader.DEFAULT_ROOT_PATH);
98+
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customizer(builder));
99+
return builder.build();
100+
}
101+
102+
private Config cassandraConfiguration(CassandraProperties properties) {
103+
CassandraDriverOptions options = new CassandraDriverOptions();
56104
PropertyMapper map = PropertyMapper.get();
57-
Cluster.Builder builder = Cluster.builder().withClusterName(properties.getClusterName())
58-
.withPort(properties.getPort());
105+
map.from(properties.getSessionName()).whenHasText()
106+
.to((sessionName) -> options.add(DefaultDriverOption.SESSION_NAME, sessionName));
59107
map.from(properties::getUsername).whenNonNull()
60-
.to((username) -> builder.withCredentials(username, properties.getPassword()));
61-
map.from(properties::getCompression).whenNonNull().to(builder::withCompression);
62-
QueryOptions queryOptions = getQueryOptions(properties);
63-
map.from(queryOptions).to(builder::withQueryOptions);
64-
SocketOptions socketOptions = getSocketOptions(properties);
65-
map.from(socketOptions).to(builder::withSocketOptions);
66-
map.from(properties::isSsl).whenTrue().toCall(builder::withSSL);
67-
PoolingOptions poolingOptions = getPoolingOptions(properties);
68-
map.from(poolingOptions).to(builder::withPoolingOptions);
69-
map.from(properties::getContactPoints).as(StringUtils::toStringArray).to(builder::addContactPoints);
70-
map.from(properties::isJmxEnabled).whenFalse().toCall(builder::withoutJMXReporting);
71-
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
72-
return clusterFactory.getIfAvailable(() -> Cluster::buildFrom).create(builder);
108+
.to((username) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, username)
109+
.add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, properties.getPassword()));
110+
map.from(properties::getCompression).whenNonNull()
111+
.to((compression) -> options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, compression));
112+
mapQueryOptions(properties, options);
113+
mapSocketOptions(properties, options);
114+
mapPoolingOptions(properties, options);
115+
map.from(properties::getContactPoints)
116+
.to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints));
117+
ConfigFactory.invalidateCaches();
118+
return ConfigFactory.defaultOverrides().withFallback(options.build())
119+
.withFallback(ConfigFactory.defaultReference()).resolve();
73120
}
74121

75-
private QueryOptions getQueryOptions(CassandraProperties properties) {
122+
private void mapQueryOptions(CassandraProperties properties, CassandraDriverOptions options) {
76123
PropertyMapper map = PropertyMapper.get();
77-
QueryOptions options = new QueryOptions();
78-
map.from(properties::getConsistencyLevel).whenNonNull().to(options::setConsistencyLevel);
79-
map.from(properties::getSerialConsistencyLevel).whenNonNull().to(options::setSerialConsistencyLevel);
80-
map.from(properties::getFetchSize).to(options::setFetchSize);
81-
return options;
124+
map.from(properties::getConsistencyLevel).whenNonNull()
125+
.to(((consistency) -> options.add(DefaultDriverOption.REQUEST_CONSISTENCY, consistency)));
126+
map.from(properties::getSerialConsistencyLevel).whenNonNull().to(
127+
(serialConsistency) -> options.add(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, serialConsistency));
128+
map.from(properties::getPageSize)
129+
.to((pageSize) -> options.add(DefaultDriverOption.REQUEST_PAGE_SIZE, pageSize));
82130
}
83131

84-
private SocketOptions getSocketOptions(CassandraProperties properties) {
132+
private void mapSocketOptions(CassandraProperties properties, CassandraDriverOptions options) {
85133
PropertyMapper map = PropertyMapper.get();
86-
SocketOptions options = new SocketOptions();
87134
map.from(properties::getConnectTimeout).whenNonNull().asInt(Duration::toMillis)
88-
.to(options::setConnectTimeoutMillis);
89-
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis).to(options::setReadTimeoutMillis);
90-
return options;
135+
.to((connectTimeout) -> options.add(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, connectTimeout));
136+
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis)
137+
.to((readTimeout) -> options.add(DefaultDriverOption.REQUEST_TIMEOUT, readTimeout));
91138
}
92139

93-
private PoolingOptions getPoolingOptions(CassandraProperties properties) {
140+
private void mapPoolingOptions(CassandraProperties properties, CassandraDriverOptions options) {
94141
PropertyMapper map = PropertyMapper.get();
95142
CassandraProperties.Pool poolProperties = properties.getPool();
96-
PoolingOptions options = new PoolingOptions();
97143
map.from(poolProperties::getIdleTimeout).whenNonNull().asInt(Duration::getSeconds)
98-
.to(options::setIdleTimeoutSeconds);
99-
map.from(poolProperties::getPoolTimeout).whenNonNull().asInt(Duration::toMillis)
100-
.to(options::setPoolTimeoutMillis);
144+
.to((idleTimeout) -> options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeout));
101145
map.from(poolProperties::getHeartbeatInterval).whenNonNull().asInt(Duration::getSeconds)
102-
.to(options::setHeartbeatIntervalSeconds);
103-
map.from(poolProperties::getMaxQueueSize).to(options::setMaxQueueSize);
104-
return options;
146+
.to((heartBeatInterval) -> options.add(DefaultDriverOption.HEARTBEAT_INTERVAL, heartBeatInterval));
147+
map.from(poolProperties::getMaxQueueSize)
148+
.to((maxQueueSize) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize));
149+
}
150+
151+
private static class CassandraDriverOptions {
152+
153+
private final Map<String, String> options = new LinkedHashMap<>();
154+
155+
private CassandraDriverOptions add(DriverOption option, String value) {
156+
String key = createKeyFor(option);
157+
this.options.put(key, value);
158+
return this;
159+
}
160+
161+
private CassandraDriverOptions add(DriverOption option, int value) {
162+
return add(option, String.valueOf(value));
163+
}
164+
165+
private CassandraDriverOptions add(DriverOption option, Enum<?> value) {
166+
return add(option, value.name());
167+
}
168+
169+
private CassandraDriverOptions add(DriverOption option, List<String> values) {
170+
for (int i = 0; i < values.size(); i++) {
171+
this.options.put(String.format("%s.%s", createKeyFor(option), i), values.get(i));
172+
}
173+
return this;
174+
}
175+
176+
private Config build() {
177+
return ConfigFactory.parseMap(this.options, "Environment");
178+
}
179+
180+
private static String createKeyFor(DriverOption option) {
181+
return String.format("%s.%s", DefaultDriverConfigLoader.DEFAULT_ROOT_PATH, option.getPath());
182+
}
183+
105184
}
106185

107186
}

0 commit comments

Comments
 (0)