Skip to content

Commit 7758ba3

Browse files
committed
Refactor MonoToListenableFutureAdapter
Closes gh-25561
1 parent a7f71f4 commit 7758ba3

File tree

1 file changed

+6
-67
lines changed

1 file changed

+6
-67
lines changed
Lines changed: 6 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,83 +16,22 @@
1616

1717
package org.springframework.util.concurrent;
1818

19-
import java.time.Duration;
20-
import java.util.concurrent.TimeUnit;
21-
2219
import reactor.core.publisher.Mono;
23-
import reactor.core.publisher.MonoProcessor;
24-
25-
import org.springframework.lang.Nullable;
26-
import org.springframework.util.Assert;
2720

2821
/**
29-
* Adapts a {@link Mono} into a {@link ListenableFuture}.
22+
* Adapts a {@link Mono} into a {@link ListenableFuture} by obtaining a
23+
* {@code CompletableFuture} from the {@code Mono} via {@link Mono#toFuture()}
24+
* and then adapting it with {@link CompletableToListenableFutureAdapter}.
3025
*
3126
* @author Rossen Stoyanchev
3227
* @author Stephane Maldini
3328
* @since 5.1
3429
* @param <T> the object type
3530
*/
36-
@SuppressWarnings("deprecation")
37-
public class MonoToListenableFutureAdapter<T> implements ListenableFuture<T> {
38-
39-
private final MonoProcessor<T> processor;
40-
41-
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>();
42-
31+
public class MonoToListenableFutureAdapter<T> extends CompletableToListenableFutureAdapter<T> {
4332

4433
public MonoToListenableFutureAdapter(Mono<T> mono) {
45-
Assert.notNull(mono, "Mono must not be null");
46-
this.processor = mono
47-
.doOnSuccess(this.registry::success)
48-
.doOnError(this.registry::failure)
49-
.toProcessor();
50-
}
51-
52-
53-
@Override
54-
@Nullable
55-
public T get() {
56-
return this.processor.block();
57-
}
58-
59-
@Override
60-
@Nullable
61-
public T get(long timeout, TimeUnit unit) {
62-
Assert.notNull(unit, "TimeUnit must not be null");
63-
Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit));
64-
return this.processor.block(duration);
65-
}
66-
67-
@Override
68-
public boolean cancel(boolean mayInterruptIfRunning) {
69-
if (isCancelled()) {
70-
return false;
71-
}
72-
this.processor.cancel();
73-
// isCancelled may still return false, if mono completed before the cancel
74-
return this.processor.isCancelled();
75-
}
76-
77-
@Override
78-
public boolean isCancelled() {
79-
return this.processor.isCancelled();
80-
}
81-
82-
@Override
83-
public boolean isDone() {
84-
return this.processor.isTerminated();
85-
}
86-
87-
@Override
88-
public void addCallback(ListenableFutureCallback<? super T> callback) {
89-
this.registry.addCallback(callback);
90-
}
91-
92-
@Override
93-
public void addCallback(SuccessCallback<? super T> success, FailureCallback failure) {
94-
this.registry.addSuccessCallback(success);
95-
this.registry.addFailureCallback(failure);
34+
super(mono.toFuture());
9635
}
9736

9837
}

0 commit comments

Comments
 (0)