Skip to content

Commit 854ce65

Browse files
authored
feat(url-loader): add support for legacy subscription client (#2894)
* feat(url-loader): add support for legacy subscription client * fix(url-loader): pass headers to subscription client * feat(url-loader): pass connectionParams into ws client creation instead of headers only * chore: cleanup * chore(url-loader): added unit test for legacy ws subscriber * chore: added changeset
1 parent 6810454 commit 854ce65

File tree

4 files changed

+138
-13
lines changed

4 files changed

+138
-13
lines changed

.changeset/lemon-days-drop.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@graphql-tools/url-loader': minor
3+
---
4+
5+
- Added support for legacy ws protocol
6+
- Ensured that headers are passed into ws connection params

packages/loaders/url/package.json

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,23 @@
3333
},
3434
"dependencies": {
3535
"@graphql-tools/delegate": "^7.0.1",
36-
"@graphql-tools/wrap": "^7.0.4",
3736
"@graphql-tools/utils": "^7.1.5",
37+
"@graphql-tools/wrap": "^7.0.4",
3838
"@types/websocket": "1.0.2",
3939
"cross-fetch": "3.1.4",
40-
"extract-files": "9.0.0",
4140
"eventsource": "1.1.0",
41+
"extract-files": "9.0.0",
42+
"form-data": "4.0.0",
4243
"graphql-upload": "^11.0.0",
44+
"graphql-ws": "^4.4.1",
4345
"is-promise": "4.0.0",
4446
"isomorphic-ws": "4.0.1",
45-
"form-data": "4.0.0",
47+
"sse-z": "0.3.0",
48+
"subscriptions-transport-ws": "^0.9.18",
49+
"sync-fetch": "0.3.0",
4650
"tslib": "~2.2.0",
4751
"valid-url": "1.0.9",
48-
"graphql-ws": "^4.4.1",
49-
"ws": "7.4.5",
50-
"sync-fetch": "0.3.0",
51-
"sse-z": "0.3.0"
52+
"ws": "7.4.5"
5253
},
5354
"publishConfig": {
5455
"access": "public",

packages/loaders/url/src/index.ts

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
/* eslint-disable no-case-declarations */
22
/// <reference lib="dom" />
3-
import { print, IntrospectionOptions, DocumentNode, GraphQLResolveInfo, Kind, parse, buildASTSchema } from 'graphql';
3+
import {
4+
print,
5+
IntrospectionOptions,
6+
DocumentNode,
7+
GraphQLResolveInfo,
8+
Kind,
9+
parse,
10+
buildASTSchema,
11+
ExecutionResult,
12+
} from 'graphql';
413
import {
514
AsyncExecutor,
615
Executor,
@@ -17,7 +26,7 @@ import { isWebUri } from 'valid-url';
1726
import { fetch as crossFetch } from 'cross-fetch';
1827
import { SubschemaConfig } from '@graphql-tools/delegate';
1928
import { introspectSchema, wrapSchema } from '@graphql-tools/wrap';
20-
import { createClient } from 'graphql-ws';
29+
import { ClientOptions, createClient } from 'graphql-ws';
2130
import WebSocket from 'isomorphic-ws';
2231
import syncFetch from 'sync-fetch';
2332
import isPromise from 'is-promise';
@@ -26,6 +35,7 @@ import FormData from 'form-data';
2635
import 'eventsource/lib/eventsource-polyfill';
2736
import { Subscription, SubscriptionOptions } from 'sse-z';
2837
import { URL } from 'url';
38+
import { ConnectionParamsOptions, SubscriptionClient as LegacySubscriptionClient } from 'subscriptions-transport-ws';
2939

3040
export type AsyncFetchFn = typeof import('cross-fetch').fetch;
3141
export type SyncFetchFn = (input: RequestInfo, init?: RequestInit) => SyncResponse;
@@ -86,6 +96,10 @@ export interface LoadFromUrlOptions extends SingleFileOptions, Partial<Introspec
8696
* Use SSE for subscription instead of WebSocket
8797
*/
8898
useSSEForSubscription?: boolean;
99+
/**
100+
* Use legacy web socket protocol `graphql-ws` instead of the more current standard `graphql-transport-ws`
101+
*/
102+
useWebSocketLegacyProtocol?: boolean;
89103
/**
90104
* Additional options to pass to the constructor of the underlying EventSource instance.
91105
*/
@@ -254,14 +268,19 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
254268
return executor;
255269
}
256270

257-
buildWSSubscriber(pointer: string, webSocketImpl: typeof WebSocket): Subscriber {
271+
buildWSSubscriber(
272+
pointer: string,
273+
webSocketImpl: typeof WebSocket,
274+
connectionParams: ClientOptions['connectionParams']
275+
): Subscriber {
258276
const WS_URL = switchProtocols(pointer, {
259277
https: 'wss',
260278
http: 'ws',
261279
});
262280
const subscriptionClient = createClient({
263281
url: WS_URL,
264282
webSocketImpl,
283+
connectionParams,
265284
});
266285
return async ({ document, variables }: { document: DocumentNode; variables: any }) => {
267286
const query = print(document);
@@ -282,6 +301,33 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
282301
};
283302
}
284303

304+
buildWSLegacySubscriber(
305+
pointer: string,
306+
webSocketImpl: typeof WebSocket,
307+
connectionParams: ConnectionParamsOptions
308+
): Subscriber {
309+
const WS_URL = switchProtocols(pointer, {
310+
https: 'wss',
311+
http: 'ws',
312+
});
313+
const subscriptionClient = new LegacySubscriptionClient(
314+
WS_URL,
315+
{
316+
connectionParams,
317+
},
318+
webSocketImpl
319+
);
320+
321+
return async <TReturn, TArgs>({ document, variables }: { document: DocumentNode; variables: TArgs }) => {
322+
return observableToAsyncIterable(
323+
subscriptionClient.request({
324+
query: document,
325+
variables,
326+
})
327+
) as AsyncIterator<ExecutionResult<TReturn>>;
328+
};
329+
}
330+
285331
buildSSESubscriber(pointer: string, eventSourceOptions?: SubscriptionOptions['eventSourceOptions']): Subscriber {
286332
return async ({ document, variables }: { document: DocumentNode; variables: any }) => {
287333
const query = print(document);
@@ -413,7 +459,12 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
413459
subscriber = this.buildSSESubscriber(pointer, options.eventSourceOptions);
414460
} else {
415461
const webSocketImpl = await this.getWebSocketImpl(options, asyncImport);
416-
subscriber = this.buildWSSubscriber(pointer, webSocketImpl);
462+
463+
if (options.useWebSocketLegacyProtocol) {
464+
subscriber = this.buildWSLegacySubscriber(pointer, webSocketImpl, { headers });
465+
} else {
466+
subscriber = this.buildWSSubscriber(pointer, webSocketImpl, { headers });
467+
}
417468
}
418469

419470
return {
@@ -448,7 +499,11 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
448499
subscriber = this.buildSSESubscriber(pointer, options.eventSourceOptions);
449500
} else {
450501
const webSocketImpl = this.getWebSocketImpl(options, syncImport);
451-
subscriber = this.buildWSSubscriber(pointer, webSocketImpl);
502+
if (options.useWebSocketLegacyProtocol) {
503+
subscriber = this.buildWSLegacySubscriber(pointer, webSocketImpl, { headers });
504+
} else {
505+
subscriber = this.buildWSSubscriber(pointer, webSocketImpl, { headers });
506+
}
452507
}
453508

454509
return {

packages/loaders/url/tests/url-loader.spec.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { join } from 'path';
1212
import { useServer } from 'graphql-ws/lib/use/ws';
1313
import { Server as WSServer } from 'ws';
1414
import http from 'http';
15+
import { SubscriptionServer } from 'subscriptions-transport-ws';
1516

1617
const SHOULD_NOT_GET_HERE_ERROR = 'SHOULD_NOT_GET_HERE';
1718

@@ -333,7 +334,7 @@ input TestInput {
333334
expect(result.document).toBeDefined();
334335
expect(print(result.document)).toBeSimilarGqlDoc(testTypeDefs);
335336
})
336-
it('should handle subscriptions', async (done) => {
337+
it('should handle subscriptions - new protocol', async (done) => {
337338
const testUrl = 'http://localhost:8081/graphql';
338339
const { schema } = await loader.load(testUrl, {
339340
customFetch: async () => ({
@@ -398,6 +399,68 @@ input TestInput {
398399
});
399400

400401
});
402+
it('should handle subscriptions - legacy protocol', async (done) => {
403+
const testUrl = 'http://localhost:8081/graphql';
404+
const { schema } = await loader.load(testUrl, {
405+
customFetch: async () => ({
406+
json: async () => ({
407+
data: introspectionFromSchema(testSchema),
408+
})
409+
}) as any,
410+
useWebSocketLegacyProtocol: true,
411+
});
412+
413+
const httpServer = http.createServer(function weServeSocketsOnly(_, res) {
414+
res.writeHead(404);
415+
res.end();
416+
});
417+
418+
419+
httpServer.listen(8081);
420+
421+
const subscriptionServer = SubscriptionServer.create(
422+
{
423+
schema: testSchema,
424+
execute,
425+
subscribe,
426+
},
427+
{
428+
server: httpServer,
429+
path: '/graphql',
430+
},
431+
);
432+
433+
const asyncIterator = await subscribe({
434+
schema,
435+
document: parse(/* GraphQL */`
436+
subscription TestMessage {
437+
testMessage {
438+
number
439+
}
440+
}
441+
`),
442+
contextValue: {},
443+
}) as AsyncIterableIterator<ExecutionResult>;
444+
445+
expect(asyncIterator['errors']).toBeFalsy();
446+
expect(asyncIterator['errors']?.length).toBeFalsy();
447+
448+
449+
// eslint-disable-next-line no-inner-declarations
450+
async function getNextResult() {
451+
const result = await asyncIterator.next();
452+
expect(result?.done).toBeFalsy();
453+
return result?.value?.data?.testMessage?.number;
454+
}
455+
456+
expect(await getNextResult()).toBe(0);
457+
expect(await getNextResult()).toBe(1);
458+
expect(await getNextResult()).toBe(2);
459+
460+
await asyncIterator.return();
461+
subscriptionServer.close();
462+
httpServer.close(done);
463+
});
401464
it('should handle multipart requests', async () => {
402465
let server = mockGraphQLServer({ schema: testSchema, host: testHost, path: testPathChecker, method: 'POST' });
403466

0 commit comments

Comments
 (0)