Skip to content

Defer support #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions src/main/java/graphql/servlet/AbstractGraphQLHttpServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.common.io.ByteStreams;
import com.google.common.io.CharStreams;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.execution.reactive.SingleSubscriberPublisher;
import graphql.introspection.IntrospectionQuery;
import graphql.schema.GraphQLFieldDefinition;
import graphql.servlet.config.GraphQLConfiguration;
Expand All @@ -13,11 +15,7 @@
import graphql.servlet.core.GraphQLServletListener;
import graphql.servlet.core.internal.GraphQLRequest;
import graphql.servlet.core.internal.VariableMapper;
import graphql.servlet.input.BatchInputPreProcessResult;
import graphql.servlet.input.BatchInputPreProcessor;
import graphql.servlet.input.GraphQLBatchedInvocationInput;
import graphql.servlet.input.GraphQLInvocationInputFactory;
import graphql.servlet.input.GraphQLSingleInvocationInput;
import graphql.servlet.input.*;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -28,24 +26,12 @@
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.io.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -354,13 +340,13 @@ private void doRequest(HttpServletRequest request, HttpServletResponse response,
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
init();
doRequestAsync(req, resp, getHandler);
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
init();
doRequestAsync(req, resp, postHandler);
}
Expand All @@ -373,7 +359,9 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
HttpServletRequest req, HttpServletResponse resp) throws IOException {
ExecutionResult result = queryInvoker.query(invocationInput);

if (!(result.getData() instanceof Publisher)) {
boolean isDeferred = Objects.nonNull(result.getExtensions()) && result.getExtensions().containsKey(GraphQL.DEFERRED_RESULTS);

if (!(result.getData() instanceof Publisher || isDeferred)) {
resp.setContentType(APPLICATION_JSON_UTF8);
resp.setStatus(STATUS_OK);
resp.getWriter().write(graphQLObjectMapper.serializeResultAsJson(result));
Expand All @@ -390,7 +378,16 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
asyncContext.addListener(new SubscriptionAsyncListener(subscriptionRef));
ExecutionResultSubscriber subscriber = new ExecutionResultSubscriber(subscriptionRef, asyncContext, graphQLObjectMapper);
((Publisher<ExecutionResult>) result.getData()).subscribe(subscriber);
List<Publisher<ExecutionResult>> publishers = new ArrayList<>();
if (result.getData() instanceof Publisher) {
publishers.add(result.getData());
} else {
publishers.add(new StaticDataPublisher<>(result));
final Publisher<ExecutionResult> deferredResultsPublisher = (Publisher<ExecutionResult>) result.getExtensions().get(GraphQL.DEFERRED_RESULTS);
publishers.add(deferredResultsPublisher);
}
publishers.forEach(it -> it.subscribe(subscriber));

if (isInAsyncThread) {
// We need to delay the completion of async context until after the subscription has terminated, otherwise the AsyncContext is prematurely closed.
try {
Expand Down Expand Up @@ -537,7 +534,6 @@ public void onStartAsync(AsyncEvent event) {
}
}


private static class ExecutionResultSubscriber implements Subscriber<ExecutionResult> {

private final AtomicReference<Subscription> subscriptionRef;
Expand Down Expand Up @@ -584,4 +580,13 @@ public void await() throws InterruptedException {
completedLatch.await();
}
}

private static class StaticDataPublisher<T> extends SingleSubscriberPublisher<T> implements Publisher<T> {
StaticDataPublisher(T data) {
super();
super.offer(data);
super.noMoreData();
}
}

}
20 changes: 15 additions & 5 deletions src/main/java/graphql/servlet/core/GraphQLObjectMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import graphql.ExecutionResult;
import graphql.ExecutionResultImpl;
import graphql.GraphQLError;
import graphql.*;
import graphql.execution.ExecutionPath;
import graphql.servlet.config.ConfiguringObjectMapperProvider;
import graphql.servlet.config.ObjectMapperConfigurer;
import graphql.servlet.config.ObjectMapperProvider;
Expand Down Expand Up @@ -117,12 +116,19 @@ public ExecutionResult sanitizeErrors(ExecutionResult executionResult) {
} else {
errors = null;
}

return new ExecutionResultImpl(data, errors, extensions);
}

public Map<String, Object> createResultFromExecutionResult(ExecutionResult executionResult) {
return convertSanitizedExecutionResult(sanitizeErrors(executionResult));
ExecutionResult sanitizedExecutionResult = sanitizeErrors(executionResult);
if (executionResult instanceof DeferredExecutionResult) {
sanitizedExecutionResult = DeferredExecutionResultImpl
.newDeferredExecutionResult()
.from(executionResult)
.path(ExecutionPath.fromList(((DeferredExecutionResult) executionResult).getPath()))
.build();
}
return convertSanitizedExecutionResult(sanitizedExecutionResult);
}

public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult executionResult) {
Expand All @@ -144,6 +150,10 @@ public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult execu
result.put("data", executionResult.getData());
}

if (executionResult instanceof DeferredExecutionResult) {
result.put("path", ((DeferredExecutionResult) executionResult).getPath());
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,28 @@ class AbstractGraphQLHttpServletSpec extends Specification {
getBatchedResponseContent()[1].data.echo == "test"
}


def "deferred query over HTTP GET"() {
setup:
request.addParameter('query', 'query { echo(arg:"test") @defer }')

when:
servlet.doGet(request, response)

then:
response.getStatus() == STATUS_OK
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS
getSubscriptionResponseContent()[0].data.echo == null

when:
subscriptionLatch.await(1, TimeUnit.SECONDS)

then:
def content = getSubscriptionResponseContent()
content[1].data == "test"
content[1].path == ["echo"]
}

def "Batch Execution Handler allows limiting batches and sending error messages."() {
setup:
servlet = TestUtils.createBatchCustomizedServlet({ env -> env.arguments.arg }, { env -> env.arguments.arg }, { env ->
Expand Down Expand Up @@ -1030,6 +1052,61 @@ class AbstractGraphQLHttpServletSpec extends Specification {
getSubscriptionResponseContent()[1].data.echo == "Second\n\ntest"
}

def "defer query over HTTP POST"() {
setup:
request.setContent('{"query": "subscription Subscription($arg: String!) { echo(arg: $arg) }", "operationName": "Subscription", "variables": {"arg": "test"}}'.bytes)
request.setAsyncSupported(true)

when:
servlet.doPost(request, response)
then:
response.getStatus() == STATUS_OK
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS

when:
subscriptionLatch.await(1, TimeUnit.SECONDS)
then:
getSubscriptionResponseContent()[0].data.echo == "First\n\ntest"
getSubscriptionResponseContent()[1].data.echo == "Second\n\ntest"
}

def "deferred query that takes longer than initial results, should still be sent second"() {
setup:
servlet = TestUtils.createDefaultServlet({ env ->
if (env.getField().name == "a") {
Thread.sleep(1000)
}
env.arguments.arg
})
request.setContent(mapper.writeValueAsBytes([
query: '''
{
object {
a(arg: "Hello")
b(arg: "World") @defer
}
}
'''
]))
request.setAsyncSupported(true)

when:
servlet.doPost(request, response)

then:
response.getStatus() == STATUS_OK
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS
getSubscriptionResponseContent()[0].data.object.a == "Hello" // a has a Thread.sleep

when:
subscriptionLatch.await(1, TimeUnit.SECONDS)

then:
def content = getSubscriptionResponseContent()
content[1].data == "World"
content[1].path == ["object", "b"]
}

def "errors before graphql schema execution return internal server error"() {
setup:
servlet = SimpleGraphQLHttpServlet.newBuilder(GraphQLInvocationInputFactory.newBuilder {
Expand Down
31 changes: 30 additions & 1 deletion src/test/groovy/graphql/servlet/TestUtils.groovy
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package graphql.servlet

import com.google.common.io.ByteStreams
import graphql.Directives
import graphql.Scalars
import graphql.execution.reactive.SingleSubscriberPublisher
import graphql.schema.*
Expand All @@ -15,6 +16,7 @@ import graphql.servlet.core.ApolloScalars
import graphql.servlet.input.BatchInputPreProcessor
import graphql.servlet.context.ContextSetting

import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference

class TestUtils {
Expand Down Expand Up @@ -95,7 +97,7 @@ class TestUtils {
static def createGraphQlSchema(DataFetcher queryDataFetcher = { env -> env.arguments.arg },
DataFetcher mutationDataFetcher = { env -> env.arguments.arg },
DataFetcher subscriptionDataFetcher = { env ->
AtomicReference<SingleSubscriberPublisher<String>> publisherRef = new AtomicReference<>();
AtomicReference<SingleSubscriberPublisher<String>> publisherRef = new AtomicReference<>()
publisherRef.set(new SingleSubscriberPublisher<>({ subscription ->
publisherRef.get().offer(env.arguments.arg)
publisherRef.get().noMoreData()
Expand All @@ -113,6 +115,32 @@ class TestUtils {
}
field.dataFetcher(queryDataFetcher)
}
.field { GraphQLFieldDefinition.Builder field ->
field.name("object")
field.type(
GraphQLObjectType.newObject()
.name("NestedObject")
.field { nested ->
nested.name("a")
nested.type(Scalars.GraphQLString)
nested.argument { argument ->
argument.name("arg")
argument.type(Scalars.GraphQLString)
}
nested.dataFetcher(queryDataFetcher)
}
.field { nested ->
nested.name("b")
nested.type(Scalars.GraphQLString)
nested.argument { argument ->
argument.name("arg")
argument.type(Scalars.GraphQLString)
}
nested.dataFetcher(queryDataFetcher)
}
)
field.dataFetcher(new StaticDataFetcher([:]))
}
.field { GraphQLFieldDefinition.Builder field ->
field.name("returnsNullIncorrectly")
field.type(new GraphQLNonNull(Scalars.GraphQLString))
Expand Down Expand Up @@ -174,6 +202,7 @@ class TestUtils {
.mutation(mutation)
.subscription(subscription)
.additionalType(ApolloScalars.Upload)
.additionalDirective(Directives.DeferDirective)
.build()
}

Expand Down