@@ -5,16 +5,18 @@ package scala.concurrent.java8
5
5
6
6
// Located in this package to access private[concurrent] members
7
7
8
- import scala .concurrent .{ Future , Promise , ExecutionContext , ExecutionContextExecutorService , ExecutionContextExecutor , impl }
8
+ import scala .concurrent ._
9
9
import java .util .concurrent .{ CompletionStage , Executor , ExecutorService , CompletableFuture }
10
+ import scala .concurrent .duration .Duration
11
+ import scala .concurrent .impl .Promise .DefaultPromise
10
12
import scala .util .{ Try , Success , Failure }
11
13
import java .util .function .{ BiConsumer , Function ⇒ JF , Consumer , BiFunction }
12
14
13
15
// TODO: make thie private[scala] when genjavadoc allows for that.
14
16
object FuturesConvertersImpl {
15
17
def InternalCallbackExecutor = Future .InternalCallbackExecutor
16
18
17
- class CF [T ] extends CompletableFuture [T ] with (Try [T ] => Unit ) {
19
+ class CF [T ]( val wrapped : Future [ T ]) extends CompletableFuture [T ] with (Try [T ] => Unit ) {
18
20
override def apply (t : Try [T ]): Unit = t match {
19
21
case Success (v) ⇒ complete(v)
20
22
case Failure (e) ⇒ completeExceptionally(e)
@@ -78,4 +80,37 @@ object FuturesConvertersImpl {
78
80
79
81
override def toString : String = super [CompletableFuture ].toString
80
82
}
83
+
84
+ // TODO Extend DefaultPromise once 2.12.0-M3 is released, and remove `delegate` field and methods that
85
+ // refer to it.
86
+ class P [T ](val wrapped : CompletionStage [T ]) extends Promise [T ] with Future [T ] with BiConsumer [T , Throwable ] {
87
+ private val delegate = Promise [T ]().asInstanceOf [DefaultPromise [T ]]
88
+
89
+ override def future : Future [T ] = this
90
+
91
+ override def tryComplete (result : Try [T ]): Boolean = delegate.tryComplete(result)
92
+
93
+ override def isCompleted : Boolean = delegate.isCompleted
94
+
95
+
96
+ override def onSuccess [U ](pf : PartialFunction [T , U ])(implicit executor : ExecutionContext ): Unit = super .onSuccess(pf)
97
+
98
+
99
+ override def accept (v : T , e : Throwable ): Unit = {
100
+ if (e == null ) complete(Success (v))
101
+ else complete(Failure (e))
102
+ }
103
+
104
+ override def onComplete [U ](f : (Try [T ]) => U )(implicit executor : ExecutionContext ): Unit = ???
105
+
106
+ override def value : Option [Try [T ]] = delegate.value
107
+
108
+ @ throws[Exception ](classOf [Exception ])
109
+ override def result (atMost : Duration )(implicit permit : CanAwait ): T = delegate.result(atMost)(permit)
110
+
111
+ @ throws[InterruptedException ](classOf [InterruptedException ])
112
+ @ throws[TimeoutException ](classOf [TimeoutException ])
113
+ override def ready (atMost : Duration )(implicit permit : CanAwait ): P .this .type = {delegate.ready(atMost)(permit); this }
114
+ }
115
+
81
116
}
0 commit comments