Skip to content

Commit a5eaf2f

Browse files
committed
Accept CqlSessionBuilder and unwrap instrumented session for reactive observability.
We now accept `CqlSessionBuilder` in ObservableReactiveSessionFactoryBean to use the session directly to avoid duplicate instrumentation. Also, DefaultBridgedReactiveSession tries to unwrap any observability-proxied sessions. Closes #1366
1 parent 24380a9 commit a5eaf2f

File tree

7 files changed

+177
-4
lines changed

7 files changed

+177
-4
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/aot/CassandraRuntimeHints.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.springframework.data.cassandra.observability.CassandraObservationSupplier;
3232
import org.springframework.data.cassandra.repository.support.SimpleCassandraRepository;
3333
import org.springframework.data.cassandra.repository.support.SimpleReactiveCassandraRepository;
34-
import org.springframework.data.repository.util.ReactiveWrappers;
34+
import org.springframework.data.util.ReactiveWrappers;
3535
import org.springframework.lang.Nullable;
3636
import org.springframework.util.ClassUtils;
3737

@@ -91,6 +91,19 @@ public void registerHints(org.springframework.aot.hint.RuntimeHints hints, @Null
9191
}
9292

9393
hints.proxies().registerJdkProxy(CqlSession.class, SpringProxy.class, Advised.class, DecoratingProxy.class);
94+
Class<?> observationDecorated;
95+
try {
96+
observationDecorated = Class.forName(
97+
"org.springframework.data.cassandra.observability.CqlSessionObservationInterceptor.ObservationDecoratedProxy",
98+
false, classLoader);
99+
} catch (Exception e) {
100+
observationDecorated = null;
101+
}
102+
103+
if (observationDecorated != null) {
104+
hints.proxies().registerJdkProxy(CqlSession.class, SpringProxy.class, Advised.class, DecoratingProxy.class,
105+
observationDecorated);
106+
}
94107
}
95108
}
96109
}

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/session/DefaultBridgedReactiveSession.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import org.apache.commons.logging.Log;
2929
import org.apache.commons.logging.LogFactory;
30+
import org.springframework.aop.TargetSource;
31+
import org.springframework.aop.framework.AopProxyUtils;
3032
import org.springframework.data.cassandra.ReactiveResultSet;
3133
import org.springframework.data.cassandra.ReactiveSession;
3234
import org.springframework.util.Assert;
@@ -54,7 +56,7 @@
5456
* Calls are deferred until a subscriber subscribes to the resulting {@link org.reactivestreams.Publisher}. The calls
5557
* are executed by subscribing to {@link CompletionStage} and returning the result as calls complete.
5658
* <p>
57-
* Elements are emitted on netty EventLoop threads. {@link AsyncResultSet} allows {@link AsyncResultSet#fetchNextPage()}
59+
* Elements are emitted on netty EventLoop threads. {@link AsyncResultSet} allows {@link AsyncResultSet#fetchNextPage()
5860
* asynchronous requesting} of subsequent pages. The next page is requested after emitting all elements of the previous
5961
* page. However, this is an intermediate solution until Datastax can provide a fully reactive driver.
6062
* <p>
@@ -85,6 +87,18 @@ public DefaultBridgedReactiveSession(CqlSession session) {
8587

8688
Assert.notNull(session, "Session must not be null");
8789

90+
// potentially unwrap a ObservationDecoratedProxy as reactive observability
91+
// requires its own approach to span creation. We do not want to participate in
92+
// async API spans but rather drive our own spans.
93+
if (session instanceof TargetSource) {
94+
Class<?>[] interfaces = session.getClass().getInterfaces();
95+
for (Class<?> anInterface : interfaces) {
96+
if (anInterface.getName().endsWith("ObservationDecoratedProxy")) {
97+
session = (CqlSession) AopProxyUtils.getSingletonTarget(session);
98+
}
99+
}
100+
}
101+
88102
this.session = session;
89103
}
90104

@@ -239,7 +253,6 @@ public Flux<Row> availableRows() {
239253
return Flux.fromIterable(resultSet.currentPage());
240254
}
241255

242-
243256
@Override
244257
public ColumnDefinitions getColumnDefinitions() {
245258
return this.resultSet.getColumnDefinitions();

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/observability/CqlSessionObservationInterceptor.java

+8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.aopalliance.intercept.MethodInterceptor;
2727
import org.aopalliance.intercept.MethodInvocation;
28+
import org.springframework.aop.TargetSource;
2829

2930
import com.datastax.oss.driver.api.core.CqlIdentifier;
3031
import com.datastax.oss.driver.api.core.CqlSession;
@@ -183,4 +184,11 @@ private Observation startObservation(Statement<?> statement, boolean prepare, St
183184
return observation.start();
184185
}
185186

187+
/**
188+
* Marker interface for components that want to participate in observation but do not want to work with a
189+
* {@code CqlSession} that is already decorated for observation.
190+
*/
191+
public interface ObservationDecoratedProxy extends TargetSource {
192+
193+
}
186194
}

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/observability/ObservableCqlSessionFactory.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
import io.micrometer.observation.ObservationRegistry;
1919

20+
import org.springframework.aop.RawTargetAccess;
21+
import org.springframework.aop.TargetSource;
2022
import org.springframework.aop.framework.ProxyFactory;
23+
import org.springframework.data.cassandra.observability.CqlSessionObservationInterceptor.ObservationDecoratedProxy;
2124
import org.springframework.util.Assert;
2225

2326
import com.datastax.oss.driver.api.core.CqlSession;
@@ -65,7 +68,10 @@ public static CqlSession wrap(CqlSession session, String remoteServiceName, Obse
6568
proxyFactory.setTarget(session);
6669
proxyFactory.addAdvice(new CqlSessionObservationInterceptor(session, remoteServiceName, observationRegistry));
6770
proxyFactory.addInterface(CqlSession.class);
71+
proxyFactory.addInterface(ObservationDecoratedProxy.class);
6872

6973
return (CqlSession) proxyFactory.getProxy();
7074
}
75+
76+
7177
}

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/observability/ObservableReactiveSession.java

+1
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,5 @@ private Observation startObservation(@Nullable Observation parent, Statement<?>
180180
private static Observation getParentObservation(ContextView contextView) {
181181
return contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
182182
}
183+
183184
}

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/observability/ObservableReactiveSessionFactoryBean.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import io.micrometer.observation.ObservationRegistry;
1919

20+
import org.springframework.aop.TargetSource;
21+
import org.springframework.aop.framework.AopProxyUtils;
2022
import org.springframework.beans.factory.config.AbstractFactoryBean;
2123
import org.springframework.data.cassandra.ReactiveSession;
2224
import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession;
@@ -25,6 +27,7 @@
2527
import org.springframework.util.ObjectUtils;
2628

2729
import com.datastax.oss.driver.api.core.CqlSession;
30+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
2831

2932
/**
3033
* Factory bean to construct a {@link ReactiveSession} integrated with given {@link ObservationRegistry}. The required
@@ -41,10 +44,30 @@ public class ObservableReactiveSessionFactoryBean extends AbstractFactoryBean<Re
4144

4245
private final CqlSession cqlSession;
4346

47+
private final boolean requiresDestroy;
48+
4449
private final ObservationRegistry observationRegistry;
4550

4651
private @Nullable String remoteServiceName;
4752

53+
/**
54+
* Construct a new {@link ObservableReactiveSessionFactoryBean}.
55+
*
56+
* @param cqlSessionBuilder must not be {@literal null}.
57+
* @param observationRegistry must not be {@literal null}.
58+
* @since 4.0.5
59+
*/
60+
public ObservableReactiveSessionFactoryBean(CqlSessionBuilder cqlSessionBuilder,
61+
ObservationRegistry observationRegistry) {
62+
63+
Assert.notNull(cqlSessionBuilder, "CqlSessionBuilder must not be null");
64+
Assert.notNull(observationRegistry, "ObservationRegistry must not be null");
65+
66+
this.cqlSession = cqlSessionBuilder.build();
67+
this.requiresDestroy = true;
68+
this.observationRegistry = observationRegistry;
69+
}
70+
4871
/**
4972
* Construct a new {@link ObservableReactiveSessionFactoryBean}.
5073
*
@@ -56,7 +79,9 @@ public ObservableReactiveSessionFactoryBean(CqlSession cqlSession, ObservationRe
5679
Assert.notNull(cqlSession, "CqlSession must not be null");
5780
Assert.notNull(observationRegistry, "ObservationRegistry must not be null");
5881

59-
this.cqlSession = cqlSession;
82+
this.cqlSession = cqlSession instanceof TargetSource c ? (CqlSession) AopProxyUtils.getSingletonTarget(c)
83+
: cqlSession;
84+
this.requiresDestroy = false;
6085
this.observationRegistry = observationRegistry;
6186
}
6287

@@ -81,6 +106,14 @@ public String getRemoteServiceName() {
81106
return remoteServiceName;
82107
}
83108

109+
@Override
110+
public void destroy() {
111+
112+
if (requiresDestroy) {
113+
cqlSession.close();
114+
}
115+
}
116+
84117
/**
85118
* Set the remote service name.
86119
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.cassandra.observability;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import io.micrometer.observation.ObservationRegistry;
22+
23+
import org.junit.jupiter.api.Test;
24+
import org.springframework.data.cassandra.ReactiveSession;
25+
import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession;
26+
import org.springframework.test.util.ReflectionTestUtils;
27+
28+
import com.datastax.oss.driver.api.core.CqlSession;
29+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
30+
31+
/**
32+
* Unit tests for {@link ObservableReactiveSessionFactoryBean}.
33+
*
34+
* @author Mark Paluch
35+
*/
36+
class ObservableReactiveSessionFactoryBeanUnitTests {
37+
38+
@Test // GH-1366
39+
void sessionFactoryBeanUnwrapsObservationProxy() throws Exception {
40+
41+
CqlSession sessionMock = mock(CqlSession.class);
42+
ObservationRegistry registry = ObservationRegistry.NOOP;
43+
44+
CqlSession wrapped = ObservableCqlSessionFactory.wrap(sessionMock, registry);
45+
46+
ObservableReactiveSessionFactoryBean bean = new ObservableReactiveSessionFactoryBean(wrapped, registry);
47+
bean.afterPropertiesSet();
48+
49+
ReactiveSession object = bean.getObject();
50+
Object cqlSession = ReflectionTestUtils.getField(ReflectionTestUtils.getField(object, "delegate"), "session");
51+
52+
assertThat(cqlSession).isSameAs(sessionMock);
53+
}
54+
55+
@Test // GH-1366
56+
void reactiveSessionDelegateBeanUnwrapsObservationProxy() throws Exception {
57+
58+
CqlSession sessionMock = mock(CqlSession.class);
59+
ObservationRegistry registry = ObservationRegistry.NOOP;
60+
61+
CqlSession wrapped = ObservableCqlSessionFactory.wrap(sessionMock, registry);
62+
63+
ReactiveSession object = new DefaultBridgedReactiveSession(wrapped);
64+
Object cqlSession = ReflectionTestUtils.getField(object, "session");
65+
66+
assertThat(cqlSession).isSameAs(sessionMock);
67+
}
68+
69+
@Test // GH-1366
70+
void closesCqlSessionViaBuilder() throws Exception {
71+
72+
CqlSessionBuilder builderMock = mock(CqlSessionBuilder.class);
73+
CqlSession sessionMock = mock(CqlSession.class);
74+
75+
doReturn(sessionMock).when(builderMock).build();
76+
77+
ObservationRegistry registry = ObservationRegistry.NOOP;
78+
79+
ObservableReactiveSessionFactoryBean bean = new ObservableReactiveSessionFactoryBean(builderMock, registry);
80+
bean.afterPropertiesSet();
81+
bean.destroy();
82+
83+
verify(sessionMock).close();
84+
}
85+
86+
@Test // GH-1366
87+
void doesNotCloseCqlSession() throws Exception {
88+
89+
CqlSession sessionMock = mock(CqlSession.class);
90+
91+
ObservationRegistry registry = ObservationRegistry.NOOP;
92+
93+
ObservableReactiveSessionFactoryBean bean = new ObservableReactiveSessionFactoryBean(sessionMock, registry);
94+
bean.afterPropertiesSet();
95+
bean.destroy();
96+
97+
verifyNoInteractions(sessionMock);
98+
}
99+
}

0 commit comments

Comments
 (0)