Skip to content

Commit 1967830

Browse files
committed
More efficient round trip conversions for Futures
Makes `toJava.toScala` and `toScala.toJava` a wrap-and-unwrap operation, rather than a wrap-and-rewrap. Once 2.12.0-M3 is released, we can reduce the overhead by extending `DefaultPromise` rather than `Promise`. See: retronym@4faeac5 scala/scala#4690
1 parent f9d8a9d commit 1967830

File tree

3 files changed

+70
-14
lines changed

3 files changed

+70
-14
lines changed

src/main/scala/scala/compat/java8/FutureConverters.scala

+14-12
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,14 @@ object FutureConverters {
5454
* not support the CompletableFuture interface
5555
*/
5656
def toJava[T](f: Future[T]): CompletionStage[T] = {
57-
val cf = new CF[T]
58-
implicit val ec = InternalCallbackExecutor
59-
f onComplete cf
60-
cf
57+
f match {
58+
case p: P[T] => p.wrapped
59+
case _ =>
60+
val cf = new CF[T](f)
61+
implicit val ec = InternalCallbackExecutor
62+
f onComplete cf
63+
cf
64+
}
6165
}
6266

6367
/**
@@ -71,15 +75,13 @@ object FutureConverters {
7175
* @return a Scala Future that represents the CompletionStage's completion
7276
*/
7377
def toScala[T](cs: CompletionStage[T]): Future[T] = {
74-
val p = Promise[T]()
75-
val bc = new BiConsumer[T, Throwable] {
76-
override def accept(v: T, e: Throwable): Unit = {
77-
if (e == null) p.complete(Success(v))
78-
else p.complete(Failure(e))
79-
}
78+
cs match {
79+
case cf: CF[T] => cf.wrapped
80+
case _ =>
81+
val p = new P[T](cs)
82+
cs whenComplete p
83+
p.future
8084
}
81-
cs whenComplete bc
82-
p.future
8385
}
8486

8587
/**

src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala

+37-2
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@ package scala.concurrent.java8
55

66
// Located in this package to access private[concurrent] members
77

8-
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor, impl }
8+
import scala.concurrent._
99
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, CompletableFuture }
10+
import scala.concurrent.duration.Duration
11+
import scala.concurrent.impl.Promise.DefaultPromise
1012
import scala.util.{ Try, Success, Failure }
1113
import java.util.function.{ BiConsumer, Function JF, Consumer, BiFunction }
1214

1315
// TODO: make thie private[scala] when genjavadoc allows for that.
1416
object FuturesConvertersImpl {
1517
def InternalCallbackExecutor = Future.InternalCallbackExecutor
1618

17-
class CF[T] extends CompletableFuture[T] with (Try[T] => Unit) {
19+
class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) {
1820
override def apply(t: Try[T]): Unit = t match {
1921
case Success(v) complete(v)
2022
case Failure(e) completeExceptionally(e)
@@ -78,4 +80,37 @@ object FuturesConvertersImpl {
7880

7981
override def toString: String = super[CompletableFuture].toString
8082
}
83+
84+
// TODO Extend DefaultPromise once 2.12.0-M3 is released, and remove `delegate` field and methods that
85+
// refer to it.
86+
class P[T](val wrapped: CompletionStage[T]) extends Promise[T] with Future[T] with BiConsumer[T, Throwable] {
87+
private val delegate = Promise[T]().asInstanceOf[DefaultPromise[T]]
88+
89+
override def future: Future[T] = this
90+
91+
override def tryComplete(result: Try[T]): Boolean = delegate.tryComplete(result)
92+
93+
override def isCompleted: Boolean = delegate.isCompleted
94+
95+
96+
override def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = super.onSuccess(pf)
97+
98+
99+
override def accept(v: T, e: Throwable): Unit = {
100+
if (e == null) complete(Success(v))
101+
else complete(Failure(e))
102+
}
103+
104+
override def onComplete[U](f: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = ???
105+
106+
override def value: Option[Try[T]] = delegate.value
107+
108+
@throws[Exception](classOf[Exception])
109+
override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost)(permit)
110+
111+
@throws[InterruptedException](classOf[InterruptedException])
112+
@throws[TimeoutException](classOf[TimeoutException])
113+
override def ready(atMost: Duration)(implicit permit: CanAwait): P.this.type = {delegate.ready(atMost)(permit); this}
114+
}
115+
81116
}

src/test/java/scala/compat/java8/FutureConvertersTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import static java.util.concurrent.TimeUnit.SECONDS;
1616
import static org.junit.Assert.*;
17+
import static org.junit.Assert.assertSame;
1718
import static scala.compat.java8.FutureConverters.*;
1819

1920
public class FutureConvertersTest {
@@ -341,4 +342,22 @@ public void testToJavaToCompletableFutureDoesNotMutateUnderlyingPromise() throws
341342
assertFalse(sf.isCompleted());
342343
assertFalse(p.isCompleted());
343344
}
345+
346+
@Test
347+
public void testToJavaAndBackAvoidsWrappers() {
348+
final Promise<String> p = promise();
349+
final Future<String> sf = p.future();
350+
final CompletionStage<String> cs = toJava(sf);
351+
Future<String> sf1 = toScala(cs);
352+
assertSame(sf, sf1);
353+
}
354+
355+
@Test
356+
public void testToScalaAndBackAvoidsWrappers() {
357+
final CompletableFuture<String> cf = new CompletableFuture<>();
358+
final Future<String> f = toScala(cf);
359+
CompletionStage<String> cs1 = toJava(f);
360+
assertSame(cf, cs1);
361+
362+
}
344363
}

0 commit comments

Comments
 (0)