Skip to content

Commit 988f363

Browse files
committed
Avoid duplicate upstream subscription during reactive cache put
This commit fixes an issue where a Cacheable method which returns a Flux (or multi-value publisher) will be invoked once, but the returned publisher is actually subscribed twice. By using the Reactor `tap` operator, we ensure that we can emit values downstream AND accumulate emitted values into the List with a single subscription. The SignalListener additionally handles scenarios involving cancel, for instance in case of a `take(1)` in the chain. In that case values emitted up until that point will have been stored into the List buffer, so we can still put it in the cache. In case of error, no caching occurs and the internal buffer is cleared. This implementation also protects against competing onComplete/onError signals and cancel signals. Closes gh-32370
1 parent a0ae849 commit 988f363

File tree

2 files changed

+42
-26
lines changed

2 files changed

+42
-26
lines changed

spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import java.util.Optional;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.atomic.AtomicReference;
2930
import java.util.function.Supplier;
3031

3132
import org.apache.commons.logging.Log;
3233
import org.apache.commons.logging.LogFactory;
33-
import org.reactivestreams.Subscriber;
34-
import org.reactivestreams.Subscription;
34+
import reactor.core.observability.DefaultSignalListener;
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.publisher.Mono;
3737

@@ -90,6 +90,7 @@
9090
* @author Sam Brannen
9191
* @author Stephane Nicoll
9292
* @author Sebastien Deleuze
93+
* @author Simon Baslé
9394
* @since 3.1
9495
*/
9596
public abstract class CacheAspectSupport extends AbstractCacheInvoker
@@ -1036,32 +1037,45 @@ public void performCachePut(@Nullable Object value) {
10361037

10371038

10381039
/**
1039-
* Reactive Streams Subscriber collection for collecting a List to cache.
1040+
* Reactor stateful SignalListener for collecting a List to cache.
10401041
*/
1041-
private class CachePutListSubscriber implements Subscriber<Object> {
1042+
private class CachePutSignalListener extends DefaultSignalListener<Object> {
10421043

1043-
private final CachePutRequest request;
1044+
private final AtomicReference<CachePutRequest> request;
10441045

10451046
private final List<Object> cacheValue = new ArrayList<>();
10461047

1047-
public CachePutListSubscriber(CachePutRequest request) {
1048-
this.request = request;
1048+
public CachePutSignalListener(CachePutRequest request) {
1049+
this.request = new AtomicReference<>(request);
10491050
}
10501051

10511052
@Override
1052-
public void onSubscribe(Subscription s) {
1053-
s.request(Integer.MAX_VALUE);
1053+
public void doOnNext(Object o) {
1054+
this.cacheValue.add(o);
10541055
}
1056+
10551057
@Override
1056-
public void onNext(Object o) {
1057-
this.cacheValue.add(o);
1058+
public void doOnComplete() {
1059+
CachePutRequest r = this.request.get();
1060+
if (this.request.compareAndSet(r, null)) {
1061+
r.performCachePut(this.cacheValue);
1062+
}
10581063
}
1064+
10591065
@Override
1060-
public void onError(Throwable t) {
1066+
public void doOnCancel() {
1067+
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
1068+
CachePutRequest r = this.request.get();
1069+
if (this.request.compareAndSet(r, null)) {
1070+
r.performCachePut(this.cacheValue);
1071+
}
10611072
}
1073+
10621074
@Override
1063-
public void onComplete() {
1064-
this.request.performCachePut(this.cacheValue);
1075+
public void doOnError(Throwable error) {
1076+
if (this.request.getAndSet(null) != null) {
1077+
this.cacheValue.clear();
1078+
}
10651079
}
10661080
}
10671081

@@ -1145,9 +1159,8 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result
11451159
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
11461160
if (adapter != null) {
11471161
if (adapter.isMultiValue()) {
1148-
Flux<?> source = Flux.from(adapter.toPublisher(result));
1149-
source.subscribe(new CachePutListSubscriber(request));
1150-
return adapter.fromPublisher(source);
1162+
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
1163+
.tap(() -> new CachePutSignalListener(request)));
11511164
}
11521165
else {
11531166
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))

spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void cacheHitDetermination(Class<?> configClass) {
6161
Long r3 = service.cacheFuture(key).join();
6262

6363
assertThat(r1).isNotNull();
64-
assertThat(r1).isSameAs(r2).isSameAs(r3);
64+
assertThat(r1).as("cacheFuture").isSameAs(r2).isSameAs(r3);
6565

6666
key = new Object();
6767

@@ -70,7 +70,7 @@ void cacheHitDetermination(Class<?> configClass) {
7070
r3 = service.cacheMono(key).block();
7171

7272
assertThat(r1).isNotNull();
73-
assertThat(r1).isSameAs(r2).isSameAs(r3);
73+
assertThat(r1).as("cacheMono").isSameAs(r2).isSameAs(r3);
7474

7575
key = new Object();
7676

@@ -79,7 +79,7 @@ void cacheHitDetermination(Class<?> configClass) {
7979
r3 = service.cacheFlux(key).blockFirst();
8080

8181
assertThat(r1).isNotNull();
82-
assertThat(r1).isSameAs(r2).isSameAs(r3);
82+
assertThat(r1).as("cacheFlux blockFirst").isSameAs(r2).isSameAs(r3);
8383

8484
key = new Object();
8585

@@ -88,7 +88,7 @@ void cacheHitDetermination(Class<?> configClass) {
8888
List<Long> l3 = service.cacheFlux(key).collectList().block();
8989

9090
assertThat(l1).isNotNull();
91-
assertThat(l1).isEqualTo(l2).isEqualTo(l3);
91+
assertThat(l1).as("cacheFlux collectList").isEqualTo(l2).isEqualTo(l3);
9292

9393
key = new Object();
9494

@@ -97,7 +97,7 @@ void cacheHitDetermination(Class<?> configClass) {
9797
r3 = service.cacheMono(key).block();
9898

9999
assertThat(r1).isNotNull();
100-
assertThat(r1).isSameAs(r2).isSameAs(r3);
100+
assertThat(r1).as("cacheMono common key").isSameAs(r2).isSameAs(r3);
101101

102102
// Same key as for Mono, reusing its cached value
103103

@@ -106,12 +106,11 @@ void cacheHitDetermination(Class<?> configClass) {
106106
r3 = service.cacheFlux(key).blockFirst();
107107

108108
assertThat(r1).isNotNull();
109-
assertThat(r1).isSameAs(r2).isSameAs(r3);
109+
assertThat(r1).as("cacheFlux blockFirst common key").isSameAs(r2).isSameAs(r3);
110110

111111
ctx.close();
112112
}
113113

114-
115114
@CacheConfig(cacheNames = "first")
116115
static class ReactiveCacheableService {
117116

@@ -124,12 +123,16 @@ CompletableFuture<Long> cacheFuture(Object arg) {
124123

125124
@Cacheable
126125
Mono<Long> cacheMono(Object arg) {
127-
return Mono.just(this.counter.getAndIncrement());
126+
// here counter not only reflects invocations of cacheMono but subscriptions to
127+
// the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370
128+
return Mono.defer(() -> Mono.just(this.counter.getAndIncrement()));
128129
}
129130

130131
@Cacheable
131132
Flux<Long> cacheFlux(Object arg) {
132-
return Flux.just(this.counter.getAndIncrement(), 0L);
133+
// here counter not only reflects invocations of cacheFlux but subscriptions to
134+
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
135+
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L));
133136
}
134137
}
135138

0 commit comments

Comments
 (0)