Skip to content

Commit f84c6e2

Browse files
committed
refactor to handle GraphQL Subscription requests
issue spring-projects#3501
1 parent 332cf64 commit f84c6e2

File tree

5 files changed

+274
-13
lines changed

5 files changed

+274
-13
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@
2525
import reactor.core.publisher.Mono;
2626

2727
/**
28-
* A <code>MessageHandler</code> capable of fielding GraphQL Query and Mutation requests.
28+
* A <code>MessageHandler</code> capable of fielding GraphQL Query, Mutation and Subscription requests.
2929
*
3030
* @author Daniel Frey
3131
*/
32-
public class GraphqlQueryMutationMessageHandler extends AbstractReplyProducingMessageHandler {
32+
public class GraphQlMessageHandler extends AbstractReplyProducingMessageHandler {
3333

3434
private final GraphQlService graphQlService;
3535

36-
public GraphqlQueryMutationMessageHandler(final GraphQlService graphQlService) {
36+
public GraphQlMessageHandler(final GraphQlService graphQlService) {
3737
this.graphQlService = graphQlService;
3838
setAsync(true);
3939
}
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@
6666
* @author Daniel Frey
6767
*
6868
*/
69-
@SpringJUnitConfig(GraphqlMutationMessageHandlerTests.TestConfig.class)
69+
@SpringJUnitConfig(GraphQlMutationMessageHandlerTests.TestConfig.class)
7070
@DirtiesContext
71-
public class GraphqlMutationMessageHandlerTests {
71+
public class GraphQlMutationMessageHandlerTests {
7272

7373
@Autowired
7474
private FluxMessageChannel inputChannel;
@@ -83,6 +83,7 @@ public class GraphqlMutationMessageHandlerTests {
8383
UpdateRepository updateRepository;
8484

8585
@Test
86+
@SuppressWarnings("unchecked")
8687
void testHandleMessageForMutation() {
8788

8889
String fakeId = UUID.randomUUID().toString();
@@ -179,13 +180,13 @@ Mono<Update> current() {
179180
static class TestConfig {
180181

181182
@Bean
182-
GraphqlQueryMutationMessageHandler handler(GraphQlService graphQlService) {
183+
GraphQlMessageHandler handler(GraphQlService graphQlService) {
183184

184-
return new GraphqlQueryMutationMessageHandler(graphQlService);
185+
return new GraphQlMessageHandler(graphQlService);
185186
}
186187

187188
@Bean
188-
IntegrationFlow graphqlQueryMessageHandlerFlow(GraphqlQueryMutationMessageHandler handler) {
189+
IntegrationFlow graphqlQueryMessageHandlerFlow(GraphQlMessageHandler handler) {
189190

190191
return IntegrationFlows.from(MessageChannels.flux("inputChannel"))
191192
.handle(handler)
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@
6262
* @author Daniel Frey
6363
*
6464
*/
65-
@SpringJUnitConfig(GraphqlQueryMessageHandlerTests.TestConfig.class)
65+
@SpringJUnitConfig(GraphQlQueryMessageHandlerTests.TestConfig.class)
6666
@DirtiesContext
67-
public class GraphqlQueryMessageHandlerTests {
67+
public class GraphQlQueryMessageHandlerTests {
6868

6969
@Autowired
7070
private FluxMessageChannel inputChannel;
@@ -76,6 +76,7 @@ public class GraphqlQueryMessageHandlerTests {
7676
private PollableChannel errorChannel;
7777

7878
@Test
79+
@SuppressWarnings("unchecked")
7980
void testHandleMessageForQuery() {
8081

8182
StepVerifier verifier = StepVerifier.create(
@@ -137,13 +138,13 @@ public Mono<QueryResult> testQuery() {
137138
static class TestConfig {
138139

139140
@Bean
140-
GraphqlQueryMutationMessageHandler handler(GraphQlService graphQlService) {
141+
GraphQlMessageHandler handler(GraphQlService graphQlService) {
141142

142-
return new GraphqlQueryMutationMessageHandler(graphQlService);
143+
return new GraphQlMessageHandler(graphQlService);
143144
}
144145

145146
@Bean
146-
IntegrationFlow graphqlQueryMessageHandlerFlow(GraphqlQueryMutationMessageHandler handler) {
147+
IntegrationFlow graphqlQueryMessageHandlerFlow(GraphQlMessageHandler handler) {
147148

148149
return IntegrationFlows.from(MessageChannels.flux("inputChannel"))
149150
.handle(handler)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.graphql.outbound;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.time.Duration;
22+
import java.util.Collections;
23+
import java.util.Map;
24+
import java.util.Objects;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.io.ClassPathResource;
32+
import org.springframework.graphql.GraphQlService;
33+
import org.springframework.graphql.RequestInput;
34+
import org.springframework.graphql.data.method.annotation.SubscriptionMapping;
35+
import org.springframework.graphql.data.method.annotation.support.AnnotatedControllerConfigurer;
36+
import org.springframework.graphql.execution.BatchLoaderRegistry;
37+
import org.springframework.graphql.execution.DefaultBatchLoaderRegistry;
38+
import org.springframework.graphql.execution.ExecutionGraphQlService;
39+
import org.springframework.graphql.execution.GraphQlSource;
40+
import org.springframework.integration.channel.FluxMessageChannel;
41+
import org.springframework.integration.channel.QueueChannel;
42+
import org.springframework.integration.config.EnableIntegration;
43+
import org.springframework.integration.dsl.IntegrationFlow;
44+
import org.springframework.integration.dsl.IntegrationFlows;
45+
import org.springframework.integration.dsl.MessageChannels;
46+
import org.springframework.messaging.Message;
47+
import org.springframework.messaging.MessageHandlingException;
48+
import org.springframework.messaging.PollableChannel;
49+
import org.springframework.messaging.support.ErrorMessage;
50+
import org.springframework.messaging.support.MessageBuilder;
51+
import org.springframework.stereotype.Controller;
52+
import org.springframework.test.annotation.DirtiesContext;
53+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
54+
55+
import graphql.ExecutionResult;
56+
import graphql.execution.reactive.SubscriptionPublisher;
57+
import reactor.core.publisher.Flux;
58+
import reactor.test.StepVerifier;
59+
60+
/**
61+
*
62+
* @author Daniel Frey
63+
*
64+
*/
65+
@SpringJUnitConfig(GraphQlSubscriptionMessageHandlerTests.TestConfig.class)
66+
@DirtiesContext
67+
public class GraphQlSubscriptionMessageHandlerTests {
68+
69+
@Autowired
70+
private FluxMessageChannel inputChannel;
71+
72+
@Autowired
73+
private FluxMessageChannel resultChannel;
74+
75+
@Autowired
76+
private PollableChannel errorChannel;
77+
78+
@Test
79+
@SuppressWarnings("unchecked")
80+
void testHandleMessageForSubscription() {
81+
82+
StepVerifier verifier = StepVerifier.create(
83+
Flux.from(this.resultChannel)
84+
.map(Message::getPayload)
85+
.cast(ExecutionResult.class)
86+
.map(ExecutionResult::getData)
87+
.cast(SubscriptionPublisher.class)
88+
.map(Flux::from)
89+
.flatMap(data -> data)
90+
)
91+
.consumeNextWith(executionResult -> {
92+
Map<String, Object> results = (Map<String, Object>) executionResult.getData();
93+
assertThat(results).containsKey("results");
94+
95+
Map<String, Object> queryResult = (Map<String, Object>) results.get("results");
96+
assertThat(queryResult)
97+
.containsKey("id")
98+
.containsValue("test-data-01");
99+
100+
})
101+
.expectNextCount(9)
102+
.thenCancel()
103+
.verifyLater();
104+
105+
this.inputChannel.send(
106+
MessageBuilder
107+
.withPayload(new RequestInput("subscription { results { id } }", null, Collections.emptyMap()))
108+
.build()
109+
);
110+
111+
verifier.verify(Duration.ofSeconds(10));
112+
}
113+
114+
@Test
115+
void testHandleMessageForSubscriptionWithInvalidPayload() {
116+
117+
this.inputChannel.send(
118+
MessageBuilder
119+
.withPayload("subscription { results { id } }")
120+
.build()
121+
);
122+
123+
Message<?> errorMessage = errorChannel.receive(10_000);
124+
assertThat(errorMessage).isNotNull()
125+
.isInstanceOf(ErrorMessage.class)
126+
.extracting(Message::getPayload)
127+
.isInstanceOf(MessageHandlingException.class)
128+
.satisfies((ex) -> assertThat((Exception) ex)
129+
.hasMessageContaining(
130+
"Message payload needs to be 'org.springframework.graphql.RequestInput'"));
131+
132+
}
133+
134+
@Controller
135+
static class GraphqlSubscriptionController {
136+
137+
@SubscriptionMapping
138+
public Flux<QueryResult> results() {
139+
return Flux.just(
140+
new QueryResult("test-data-01"),
141+
new QueryResult("test-data-02"),
142+
new QueryResult("test-data-03"),
143+
new QueryResult("test-data-04"),
144+
new QueryResult("test-data-05"),
145+
new QueryResult("test-data-06"),
146+
new QueryResult("test-data-07"),
147+
new QueryResult("test-data-08"),
148+
new QueryResult("test-data-09"),
149+
new QueryResult("test-data-10")
150+
);
151+
}
152+
153+
}
154+
155+
@Configuration
156+
@EnableIntegration
157+
static class TestConfig {
158+
159+
@Bean
160+
GraphQlMessageHandler handler(GraphQlService graphQlService) {
161+
162+
return new GraphQlMessageHandler(graphQlService);
163+
}
164+
165+
@Bean
166+
IntegrationFlow graphqlQueryMessageHandlerFlow(GraphQlMessageHandler handler) {
167+
168+
return IntegrationFlows.from(MessageChannels.flux("inputChannel"))
169+
.handle(handler)
170+
.channel(c -> c.flux("resultChannel"))
171+
.get();
172+
}
173+
174+
@Bean
175+
PollableChannel errorChannel() {
176+
177+
return new QueueChannel();
178+
}
179+
180+
@Bean
181+
GraphqlSubscriptionController graphqlSubscriptionController() {
182+
183+
return new GraphqlSubscriptionController();
184+
}
185+
186+
@Bean
187+
GraphQlService graphQlService(GraphQlSource graphQlSource, BatchLoaderRegistry batchLoaderRegistry) {
188+
189+
ExecutionGraphQlService service = new ExecutionGraphQlService(graphQlSource);
190+
service.addDataLoaderRegistrar(batchLoaderRegistry);
191+
192+
return service;
193+
}
194+
195+
@Bean
196+
GraphQlSource graphQlSource(AnnotatedControllerConfigurer annotatedDataFetcherConfigurer) {
197+
198+
return GraphQlSource.builder()
199+
.schemaResources(new ClassPathResource("graphql/test-schema.graphqls"))
200+
.configureRuntimeWiring(annotatedDataFetcherConfigurer)
201+
.build();
202+
}
203+
204+
@Bean
205+
AnnotatedControllerConfigurer annotatedDataFetcherConfigurer() {
206+
207+
return new AnnotatedControllerConfigurer();
208+
}
209+
210+
@Bean
211+
BatchLoaderRegistry batchLoaderRegistry() {
212+
213+
return new DefaultBatchLoaderRegistry();
214+
}
215+
216+
}
217+
218+
static class QueryResult {
219+
220+
private final String id;
221+
222+
QueryResult(final String id) {
223+
this.id = id;
224+
}
225+
226+
String getId() {
227+
return this.id;
228+
}
229+
230+
@Override
231+
public boolean equals(Object o) {
232+
if (this == o) {
233+
return true;
234+
}
235+
if (!(o instanceof QueryResult)) {
236+
return false;
237+
}
238+
QueryResult that = (QueryResult) o;
239+
return getId().equals(that.getId());
240+
}
241+
242+
@Override
243+
public int hashCode() {
244+
return Objects.hash(getId());
245+
}
246+
247+
@Override
248+
public String toString() {
249+
return "QueryResult{" +
250+
"id='" + id + '\'' +
251+
'}';
252+
}
253+
}
254+
255+
}

spring-integration-graphql/src/test/resources/graphql/test-schema.graphqls

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ type Mutation {
66
update(id: String!): Update!
77
}
88

9+
type Subscription {
10+
results: QueryResult
11+
}
12+
913
type QueryResult {
1014
id: String
1115
}

0 commit comments

Comments
 (0)