Skip to content

Commit 2801d8a

Browse files
committed
Mono support and ReactiveAdapter integration
1 parent 5ed933d commit 2801d8a

File tree

6 files changed

+309
-210
lines changed

6 files changed

+309
-210
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ project("spring-context") {
515515
optional("org.codehaus.groovy:groovy-all:${groovyVersion}")
516516
optional("org.beanshell:bsh:2.0b4")
517517
optional("io.reactivex:rxjava:${rxjavaVersion}")
518+
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
518519
testCompile("javax.inject:javax.inject-tck:1")
519520
testCompile("javax.el:javax.el-api:${elApiVersion}")
520521
testCompile("org.glassfish:javax.el:3.0.1-b08")

spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,12 +384,10 @@ public Object call() throws Exception {
384384
updateCache(cacheValue, contexts, cachePutRequests);
385385
}
386386
else {
387-
// TODO: cacheValue = ObjectUtils.unwrapOptional(returnValue); use this to wrap/unwrap
388-
389387
// Invoke the method if we don't have a cache hit
390388
Object originalReturnValue = invokeOperation(invoker);
391389

392-
returnValue = cacheResultWrapperManager.asyncUnwrap(originalReturnValue, new AsyncWrapResult(new AsyncWrapResult.CallBack() {
390+
returnValue = cacheResultWrapperManager.asyncUnwrap(originalReturnValue, method.getReturnType(), new AsyncWrapResult(new AsyncWrapResult.CallBack() {
393391
@Override
394392
public void onValue(Object cacheValue) {
395393
updateCache(cacheValue, contexts, cachePutRequests);

spring-context/src/main/java/org/springframework/cache/interceptor/CacheResultWrapper.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package org.springframework.cache.interceptor;
1717

1818
/**
19-
* Wrapper/Unwrapper, it allows to unwrap values to be cached and wraps it back
19+
* Wrapper/Unwrapper, it allows to notifyResult values to be cached and wraps it back
2020
* in order to be consumed.
2121
* @author Pablo Diaz-Lopez
2222
*/
@@ -36,10 +36,6 @@ interface CacheResultWrapper {
3636
* @param asyncResult it will call it when the value it's available
3737
* @return the same value wrapped or a version decorated.
3838
*/
39-
Object unwrap(Object valueWrapped, AsyncWrapResult asyncResult);
39+
Object notifyResult(Object valueWrapped, AsyncWrapResult asyncResult);
4040

41-
/**
42-
* @return The target class the Wrapper can handle
43-
*/
44-
Class<?> getWrapClass();
4541
}

spring-context/src/main/java/org/springframework/cache/interceptor/CacheResultWrapperManager.java

Lines changed: 80 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
*/
1616
package org.springframework.cache.interceptor;
1717

18-
import org.springframework.util.Assert;
18+
import org.springframework.core.ReactiveAdapter;
19+
import org.springframework.core.ReactiveAdapterRegistry;
1920
import org.springframework.util.ClassUtils;
2021
import org.springframework.util.ObjectUtils;
21-
import rx.Observable;
22-
import rx.functions.Action0;
23-
import rx.functions.Action1;
22+
import reactor.core.publisher.Mono;
23+
import rx.Single;
2424

25-
import java.util.*;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
import java.util.function.Supplier;
2629

2730
/**
2831
* Manages the {@link CacheResultWrapper} instances to apply only the appropriate.
@@ -32,29 +35,41 @@ public class CacheResultWrapperManager {
3235
private Map<Class<?>, CacheResultWrapper> unwrapperByClass;
3336

3437
public CacheResultWrapperManager() {
35-
unwrapperByClass = new HashMap<Class<?>, CacheResultWrapper>();
38+
unwrapperByClass = new HashMap<>();
3639

37-
List<CacheResultWrapper> unwrapperList = new ArrayList<CacheResultWrapper>();
40+
unwrapperByClass.put(Optional.class, new OptionalUnWrapper());
41+
registerReactiveWrappers();
42+
}
3843

39-
unwrapperList.add(new OptionalUnWrapper());
44+
private void registerReactiveWrappers() {
45+
if(tryRegisterResultWrapper("reactor.core.publisher.Mono", MonoReactiveWrapper::new)) {
46+
ReactiveAdapterRegistry adapterRegistry = new ReactiveAdapterRegistry();
4047

41-
if(ClassUtils.isPresent("rx.Observable", CacheAspectSupport.class.getClassLoader())) {
42-
unwrapperList.add(new ObservableWrapper());
48+
tryRegisterResultWrapper("rx.Single", () -> new SingleReactiveWrapper(adapterRegistry));
4349
}
50+
}
4451

45-
for(CacheResultWrapper unwrapper: unwrapperList) {
46-
unwrapperByClass.put(unwrapper.getWrapClass(), unwrapper);
52+
private boolean tryRegisterResultWrapper(String className, Supplier<CacheResultWrapper> cacheResultWrapperSupplier) {
53+
try {
54+
Class<?> clazz = ClassUtils.forName(className, CacheAspectSupport.class.getClassLoader());
55+
CacheResultWrapper cacheResultWrapper = cacheResultWrapperSupplier.get();
56+
unwrapperByClass.put(clazz, cacheResultWrapper);
57+
return true;
58+
} catch (ClassNotFoundException e) {
59+
// Cannot register wrapper
60+
return false;
4761
}
4862
}
4963

5064
/**
5165
* Wraps a value
66+
*
5267
* @param clazz the target class wanted
5368
* @param value the value to be wrapped
5469
* @return the value wrapped if it can, or the same value if it cannot handle it
5570
*/
5671
public Object wrap(Class<?> clazz, Object value) {
57-
if(value != null) {
72+
if (value != null) {
5873
CacheResultWrapper unwrapper = unwrapperByClass.get(clazz);
5974

6075
if (unwrapper != null) {
@@ -67,16 +82,17 @@ public Object wrap(Class<?> clazz, Object value) {
6782

6883
/**
6984
* Unwraps the value asynchronously
85+
*
7086
* @param valueWrapped the value wrapped to be unwrapped
71-
* @param asyncResult where the result will be notified
87+
* @param asyncResult where the result will be notified
7288
* @return the same value wrapped or decorated in order to notify when it finish.
7389
*/
74-
public Object asyncUnwrap(Object valueWrapped, AsyncWrapResult asyncResult) {
75-
if(valueWrapped != null) {
76-
CacheResultWrapper unwrapper = unwrapperByClass.get(valueWrapped.getClass());
90+
public Object asyncUnwrap(Object valueWrapped, Class<?> classWrapped, AsyncWrapResult asyncResult) {
91+
if (valueWrapped != null) {
92+
CacheResultWrapper unwrapper = unwrapperByClass.get(classWrapped);
7793

7894
if (unwrapper != null) {
79-
return unwrapper.unwrap(valueWrapped, asyncResult);
95+
return unwrapper.notifyResult(valueWrapped, asyncResult);
8096
}
8197
}
8298

@@ -85,77 +101,73 @@ public Object asyncUnwrap(Object valueWrapped, AsyncWrapResult asyncResult) {
85101
return valueWrapped;
86102
}
87103

104+
private class SingleReactiveWrapper extends MonoReactiveAdapterWrapper {
105+
public SingleReactiveWrapper(ReactiveAdapterRegistry registry) {
106+
super(registry, Single.class);
107+
}
108+
}
88109

89-
/**
90-
* Inner class to avoid a hard dependency on Java 8.
91-
*/
92-
private class OptionalUnWrapper implements CacheResultWrapper {
110+
private class MonoReactiveWrapper implements CacheResultWrapper {
111+
@Override
112+
public Mono<?> wrap(Object value) {
113+
return Mono.justOrEmpty(value);
114+
}
93115

94116
@Override
95-
public Object unwrap(Object optionalObject, AsyncWrapResult asyncResult) {
96-
Optional<?> optional = (Optional<?>) optionalObject;
117+
public Mono<?> notifyResult(Object objectWrapped, AsyncWrapResult asyncResult) {
118+
Mono<?> monoWrapped = (Mono<?>) objectWrapped;
97119

98-
Object value = ObjectUtils.unwrapOptional(optional);
120+
return monoWrapped
121+
.doOnSuccess(asyncResult::complete)
122+
.doOnError(asyncResult::error);
123+
}
124+
}
99125

100-
asyncResult.complete(value);
101126

102-
return optionalObject;
127+
private abstract class MonoReactiveAdapterWrapper implements CacheResultWrapper {
128+
private ReactiveAdapter adapter;
129+
private MonoReactiveWrapper monoReactiveWrapper;
130+
131+
MonoReactiveAdapterWrapper(ReactiveAdapterRegistry registry, Class<?> wrapperClass) {
132+
this.adapter = registry.getAdapterFrom(wrapperClass);
133+
this.monoReactiveWrapper = new MonoReactiveWrapper();
103134
}
104135

136+
@SuppressWarnings("unchecked")
105137
@Override
106-
public Class<?> getWrapClass() {
107-
return Optional.class;
138+
public Object wrap(Object value) {
139+
return adapter.fromPublisher(monoReactiveWrapper.wrap(value));
108140
}
109141

142+
@SuppressWarnings("unchecked")
110143
@Override
111-
public Object wrap(Object value) {
112-
return Optional.ofNullable(value);
144+
public Object notifyResult(Object valueWrapped, AsyncWrapResult asyncResult) {
145+
Mono<?> monoWrapped = adapter.toMono(valueWrapped);
146+
Mono<?> monoCacheWrapped = monoReactiveWrapper.notifyResult(monoWrapped, asyncResult);
147+
148+
return adapter.fromPublisher(monoCacheWrapped);
113149
}
114150
}
115151

116-
private class ObservableWrapper implements CacheResultWrapper {
152+
/**
153+
* Inner class to avoid a hard dependency on Java 8.
154+
*/
155+
private class OptionalUnWrapper implements CacheResultWrapper {
117156

118157
@Override
119-
public Object wrap(Object value) {
120-
if(value instanceof Iterable) {
121-
return Observable.from((Iterable<?>) value);
122-
}
123-
else {
124-
// Not sure if it's a good idea... At least a warning maybe be a good idea
125-
return Observable.just(value);
126-
}
127-
}
158+
public Optional<?> notifyResult(Object optionalObject, AsyncWrapResult asyncResult) {
159+
Optional<?> optional = (Optional<?>) optionalObject;
128160

129-
@Override
130-
public Object unwrap(Object valueWrapped, final AsyncWrapResult asyncResult) {
131-
Observable<?> valueObservable = (Observable<?>) valueWrapped;
132-
133-
final List<Object> values = new ArrayList<Object>();
134-
135-
return valueObservable
136-
.doOnNext(new Action1<Object>() {
137-
@Override
138-
public void call(Object o) {
139-
values.add(o);
140-
}
141-
})
142-
.doOnCompleted(new Action0() {
143-
@Override
144-
public void call() {
145-
asyncResult.complete(values);
146-
}
147-
})
148-
.doOnError(new Action1<Throwable>() {
149-
@Override
150-
public void call(Throwable throwable) {
151-
asyncResult.error(throwable);
152-
}
153-
});
161+
Object value = ObjectUtils.unwrapOptional(optional);
162+
163+
asyncResult.complete(value);
164+
165+
return optional;
154166
}
155167

156168
@Override
157-
public Class<?> getWrapClass() {
158-
return Observable.class;
169+
public Optional<?> wrap(Object value) {
170+
return Optional.ofNullable(value);
159171
}
160172
}
161173
}

0 commit comments

Comments
 (0)