Skip to content

Can't use thenCompose with FutureConvertersImpl - doThenCompose calls toCompletableFuture #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jasongoodwin opened this issue Feb 21, 2015 · 16 comments
Assignees
Labels
Milestone

Comments

@jasongoodwin
Copy link

Hey I found an issue with this.
The FutureConvertersImpl throws an exception when toCompletableFuture is called:

override def toCompletableFuture(): CompletableFuture[T] ={
  throw new UnsupportedOperationException("this CompletionStage represents a read-only Scala Future")
}

Which is fine. However, if you do something like this:

@Test
public void testToJavaThenComposeWithToJavaThenAccept() throws InterruptedException,
        ExecutionException, TimeoutException {
    final Promise<String> p1 = promise();
    final CompletableFuture future = new CompletableFuture();

    final CompletionStage<String> second =
            CompletableFuture.supplyAsync(() -> "Hello").
                    thenCompose(x -> toJava(p1.future()));

    second.handle((x, t) -> {
        if(t!=null){
            t.printStackTrace();
            future.completeExceptionally(t);
        }else{
            future.complete(x);
        }
        return null;
    });

    p1.success("Hello");
    assertEquals("Hello", future.get(1000, MILLISECONDS));
}

thenCompose calls doThenCompose:

public <U> CompletableFuture<U> thenCompose
    (Function<? super T, ? extends CompletionStage<U>> fn) {
    return doThenCompose(fn, null);
}

And if there aren't any exceptions, then it calls toCompletableFuture which causes failure.
Removing the exception causes everything to complete just fine.
Is there another way this can be handled other than throwing an exception?

    if (ex == null) {
            if (e != null) {
                if (dst == null)
                    dst = new CompletableFuture<U>();
                execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
            }
            else {
                try {
                    CompletionStage<U> cs = fn.apply(t);
                    if (cs == null ||
                        (dst = cs.toCompletableFuture()) == null)
                        ex = new NullPointerException();
                } catch (Throwable rex) {
                    ex = rex;
                }
            }
        }



private <U> CompletableFuture<U> doThenCompose
    (Function<? super T, ? extends CompletionStage<U>> fn,
     Executor e) {
    if (fn == null) throw new NullPointerException();
    CompletableFuture<U> dst = null;
    ThenCompose<T,U> d = null;
    Object r;
    if ((r = result) == null) {
        dst = new CompletableFuture<U>();
        CompletionNode p = new CompletionNode
            (d = new ThenCompose<T,U>(this, fn, dst, e));
        while ((r = result) == null) {
            if (UNSAFE.compareAndSwapObject
                (this, COMPLETIONS, p.next = completions, p))
                break;
        }
    }
    if (r != null && (d == null || d.compareAndSet(0, 1))) {
        T t; Throwable ex;
        if (r instanceof AltResult) {
            ex = ((AltResult)r).ex;
            t = null;
        }
        else {
            ex = null;
            @SuppressWarnings("unchecked") T tr = (T) r;
            t = tr;
        }
        if (ex == null) {
            if (e != null) {
                if (dst == null)
                    dst = new CompletableFuture<U>();
                execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
            }
            else {
                try {
                    CompletionStage<U> cs = fn.apply(t);
                    if (cs == null ||
                        (dst = cs.toCompletableFuture()) == null)
                        ex = new NullPointerException();
                } catch (Throwable rex) {
                    ex = rex;
                }
            }
        }
        if (dst == null)
            dst = new CompletableFuture<U>();
        if (ex != null)
            dst.internalComplete(null, ex);
    }
    helpPostComplete();
    dst.helpPostComplete();
    return dst;
}
@retronym
Copy link
Member

Hey @rkuhn or @bantonsson, could I ask one of you to a look at this?

@jasongoodwin
Copy link
Author

I believe the exception should be removed, and should return 'this' in the toCompletableFuture method but I don't understand all of the impacts. The implementation needs to work with thenCompose to be useful though. I'm writing a book at the moment (Learning Akka, Packt) - I'm trying to introduce this library as the mechanism for handling future's from Java8, which it otherwise seems to be a good fit for.

Also where I'm working we're starting to lift out code from a play application into libraries and I'd like to replace play Promises with completableFutures to break the Play2 dependency.

If there is another approach I should look at, or if you need any help, please let me know. I can make a PR and test cases if desired.

@fehmicansaglam
Copy link

I am not sure if I am having the same problem. But maybe you can point me in the right direction.

I have a Scala API which returns scala.concurrent.Future. Besides the Scala API I want to provide a Java API which returns java.util.concurrent.CompletionStage. I have actually implemented the API using scala-java8-compat. But when I try to compose the returned CompletionStages I get an exception: java.lang.UnsupportedOperationException: this CompletionStage represents a read-only. I understand that the provided CompletionStage implementation is read-only. But what is the correct way of composition in this implementation?

@jasongoodwin
Copy link
Author

Right - it throws the UnsupportedOperationException because the Java8 CompletableFuture implementation calls toCompletableFuture in it:

                if (cs == null ||
                    (dst = cs.toCompletableFuture()) == null)
                    ex = new NullPointerException();

thenCompose should handle the CompletionStage returned from toJava but it won't as long as the exception is thrown. The scala-java8-compat library's toJava returns a CompletableFuture implementation, but won't let you call the toCompletableFuture method with the intention of hiding the completion methods of the future (as the scala future will be completed). You can cast to CompletableFuture to use get etc as the CompletionStage passed is actually a CompletableFuture. I think the only option is to replace the toCompletableFuture method with one that returns this. Otherwise this library is not very useful.

@jasongoodwin
Copy link
Author

Any word on this? Can I make a PR to stop throwing the exception?

@rkuhn
Copy link
Contributor

rkuhn commented Jun 3, 2015

Sorry for the delay, this flew under my radar. I have not fully penetrated the whole problem, but from what you write it seems that indeed returning this is reasonable. It is a shame that we cannot just remain within CompletionStage API, though.

@rkuhn
Copy link
Contributor

rkuhn commented Jun 15, 2015

Another possibility would be to override doThenCompose to not call toCompletableFuture: we don’t need to return this more specific type, in particular if our CompletionStage is not supposed to implement the CompletableFuture contract (which I think is the point).

@jasongoodwin
Copy link
Author

Oh ya - I don't know why I didn't think of that. The oracle code wasn't
exactly clear to me when I first read it but I don't think this would be
too difficult. I'll look at doing this through the week after hours. Thanks
Roland.
On Jun 15, 2015 2:34 AM, "Roland Kuhn" [email protected] wrote:

Another possibility would be to override doThenCompose to not call
toCompletableFuture: we don’t need to return this more specific type, in
particular if our CompletionStage is not supposed to implement the
CompletableFuture contract (which I think is the point).


Reply to this email directly or view it on GitHub
#29 (comment)
.

@rkuhn
Copy link
Contributor

rkuhn commented Jun 15, 2015

Yeah, somehow I also needed two weeks to figure this out ;-) BTW: the reason behind the slightly unorthodox code layout is that Doug’s goal was to make individual methods overridable without affecting the others—for better or worse.

@jasongoodwin
Copy link
Author

Hey Roland,
I'm looking at a version of Java8 on a new laptop, trying to prove this will work - I'm running into a few hiccups.

  1. The internal APIs appear different since I last looked - uniComposeStage is called instead of doThenCompose. This indicates that overriding some internal methods may not be the safest approach.

I thought this was the same version I was using before but not sure - other laptop is long gone.
$ java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
public CompletableFuture thenCompose(
Function<? super T, ? extends CompletionStage> fn) {
return uniComposeStage(null, fn);
}

private <V> CompletableFuture<V> uniComposeStage(
    Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
    if (f == null) throw new NullPointerException();
    Object r; Throwable x;
    if (e == null && (r = result) != null) {
        // try to return function result directly
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                return new CompletableFuture<V>(encodeThrowable(x, r));
            }
            r = null;
        }
        try {
            @SuppressWarnings("unchecked") T t = (T) r;
            return f.apply(t).toCompletableFuture();
        } catch (Throwable ex) {
            return new CompletableFuture<V>(encodeThrowable(ex));
        }
    }
    CompletableFuture<V> d = new CompletableFuture<V>();
    UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
    push(c);
    c.tryFire(SYNC);
    return d;
}
  1. The result field is default scope - to try to implement the methods the CompletableFuture implementation has to be in java.util.concurrent which is not so pretty.

  2. The CompletableFuture constructor is private.

Maybe you have some ideas - I only looked briefly at this. Looks a bit scary though.

retronym added a commit to retronym/scala-java8-compat that referenced this issue Jul 28, 2015
Since `toCompletableFuture` stopped throwing an
`UnsupportedOperationException` in the previous commit, this use
case is now supported.

Fixes scala#29
@retronym retronym self-assigned this Jul 28, 2015
@retronym retronym added the bug label Jul 28, 2015
@retronym retronym added this to the 0.5.0 milestone Jul 28, 2015
@retronym
Copy link
Member

My proposal in #49 (related to issue #43) would also fix this bug.

@jasongoodwin
Copy link
Author

Confirmed w/ example in the soon-to-be-published "Learning Akka" - Seems to be resolved in 0.6.0-SNAPSHOT. Just in time for it to go to print too :) thanks. I give this library mention for use in Java8. Worked w/ Java8 for a while and this is a very useful library for working with futures across the languages.

@jasongoodwin
Copy link
Author

0.6.0-SNAPSHOT I built and published locally. Confirmed the issue is fixed in my use cases.

@jasongoodwin
Copy link
Author

Hey, sorry to harass you around the holidays - are you able to publish 0.6.0 in the next couple weeks?

@retronym
Copy link
Member

@jasongoodwin 0.7.0 is the latest release, see the badge at the top of the README for a link:

image

@jasongoodwin
Copy link
Author

Sorry! I was looking at the 2.10 releases published in maven central.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants