18
18
19
19
20
20
import java .util .Collections ;
21
- import java .util .List ;
22
21
import java .util .Map ;
23
22
24
23
import graphql .ErrorType ;
25
24
import graphql .ExecutionResult ;
26
25
import graphql .GraphQLError ;
27
- import org .apache .commons .logging .Log ;
28
- import org .apache .commons .logging .LogFactory ;
29
26
import org .reactivestreams .Publisher ;
30
27
import reactor .core .publisher .Flux ;
31
28
import reactor .core .publisher .Mono ;
32
29
33
- import org .springframework .graphql .ResponseError ;
34
30
import org .springframework .graphql .execution .SubscriptionPublisherException ;
35
31
import org .springframework .graphql .server .WebGraphQlHandler ;
36
- import org .springframework .graphql .server .WebGraphQlRequest ;
32
+ import org .springframework .graphql .server .WebGraphQlResponse ;
37
33
import org .springframework .http .MediaType ;
38
34
import org .springframework .http .codec .ServerSentEvent ;
39
- import org .springframework .util .CollectionUtils ;
40
35
import org .springframework .web .reactive .function .BodyInserters ;
41
36
import org .springframework .web .reactive .function .server .ServerRequest ;
42
37
import org .springframework .web .reactive .function .server .ServerResponse ;
53
48
*/
54
49
public class GraphQlSseHandler extends AbstractGraphQlHttpHandler {
55
50
56
- private static final Log logger = LogFactory .getLog (GraphQlSseHandler .class );
57
-
58
51
private static final Mono <ServerSentEvent <Map <String , Object >>> COMPLETE_EVENT = Mono .just (
59
52
ServerSentEvent .<Map <String , Object >>builder (Collections .emptyMap ()).event ("complete" ).build ());
60
53
@@ -63,59 +56,37 @@ public GraphQlSseHandler(WebGraphQlHandler graphQlHandler) {
63
56
super (graphQlHandler , null );
64
57
}
65
58
66
- /**
67
- * Handle GraphQL requests over HTTP using the Server-Sent Events protocol.
68
- * @param serverRequest the incoming HTTP request
69
- * @return the HTTP response
70
- */
59
+
71
60
@ SuppressWarnings ("unchecked" )
72
- public Mono <ServerResponse > handleRequest (ServerRequest serverRequest ) {
73
- return readRequest (serverRequest )
74
- .flatMap ((body ) -> {
75
- WebGraphQlRequest graphQlRequest = new WebGraphQlRequest (
76
- serverRequest .uri (), serverRequest .headers ().asHttpHeaders (),
77
- serverRequest .cookies (), serverRequest .remoteAddress ().orElse (null ),
78
- serverRequest .attributes (), body ,
79
- serverRequest .exchange ().getRequest ().getId (),
80
- serverRequest .exchange ().getLocaleContext ().getLocale ());
81
- if (logger .isDebugEnabled ()) {
82
- logger .debug ("Executing: " + graphQlRequest );
83
- }
84
- return this .graphQlHandler .handleRequest (graphQlRequest );
85
- })
86
- .flatMap ((response ) -> {
87
- if (logger .isDebugEnabled ()) {
88
- List <ResponseError > errors = response .getErrors ();
89
- logger .debug ("Execution result " +
90
- (!CollectionUtils .isEmpty (errors ) ? "has errors: " + errors : "is ready" ) + "." );
91
- }
92
- Flux <Map <String , Object >> resultFlux ;
93
- if (response .getData () instanceof Publisher ) {
94
- resultFlux = Flux .from ((Publisher <ExecutionResult >) response .getData ())
95
- .map (ExecutionResult ::toSpecification )
96
- .onErrorResume (SubscriptionPublisherException .class , (ex ) -> Mono .just (ex .toMap ()));
97
- }
98
- else {
99
- if (logger .isDebugEnabled ()) {
100
- logger .debug ("A subscription DataFetcher must return a Publisher: " + response .getData ());
101
- }
102
- resultFlux = Flux .just (ExecutionResult .newExecutionResult ()
103
- .addError (GraphQLError .newError ()
104
- .errorType (ErrorType .OperationNotSupported )
105
- .message ("SSE handler supports only subscriptions" )
106
- .build ())
107
- .build ()
108
- .toSpecification ());
109
- }
61
+ @ Override
62
+ protected Mono <ServerResponse > prepareResponse (ServerRequest request , WebGraphQlResponse response ) {
63
+
64
+ Flux <Map <String , Object >> resultFlux ;
65
+ if (response .getData () instanceof Publisher ) {
66
+ resultFlux = Flux .from ((Publisher <ExecutionResult >) response .getData ())
67
+ .map (ExecutionResult ::toSpecification )
68
+ .onErrorResume (SubscriptionPublisherException .class , (ex ) -> Mono .just (ex .toMap ()));
69
+ }
70
+ else {
71
+ if (this .logger .isDebugEnabled ()) {
72
+ this .logger .debug ("A subscription DataFetcher must return a Publisher: " + response .getData ());
73
+ }
74
+ resultFlux = Flux .just (ExecutionResult .newExecutionResult ()
75
+ .addError (GraphQLError .newError ()
76
+ .errorType (ErrorType .OperationNotSupported )
77
+ .message ("SSE handler supports only subscriptions" )
78
+ .build ())
79
+ .build ()
80
+ .toSpecification ());
81
+ }
110
82
111
- Flux <ServerSentEvent <Map <String , Object >>> sseFlux =
112
- resultFlux .map ((event ) -> ServerSentEvent .builder (event ).event ("next" ).build ());
83
+ Flux <ServerSentEvent <Map <String , Object >>> sseFlux =
84
+ resultFlux .map ((event ) -> ServerSentEvent .builder (event ).event ("next" ).build ());
113
85
114
- return ServerResponse .ok ()
115
- .contentType (MediaType .TEXT_EVENT_STREAM )
116
- .body (BodyInserters .fromServerSentEvents (sseFlux .concatWith (COMPLETE_EVENT )))
117
- .onErrorResume (Throwable .class , (ex ) -> ServerResponse .badRequest ().build ());
118
- });
86
+ return ServerResponse .ok ()
87
+ .contentType (MediaType .TEXT_EVENT_STREAM )
88
+ .body (BodyInserters .fromServerSentEvents (sseFlux .concatWith (COMPLETE_EVENT )))
89
+ .onErrorResume (Throwable .class , (ex ) -> ServerResponse .badRequest ().build ());
119
90
}
120
91
121
92
}
0 commit comments