Skip to content

Commit 09c1ffe

Browse files
committed
Use eventsource-parser to clean up the implemenation
1 parent 38403a7 commit 09c1ffe

File tree

3 files changed

+43
-80
lines changed

3 files changed

+43
-80
lines changed

index.js

+2-3
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,9 @@ class Replicate {
293293

294294
if (prediction.urls && prediction.urls.stream) {
295295
const { signal } = options;
296-
const stream = await createReadableStream({
296+
const stream = createReadableStream({
297297
url: prediction.urls.stream,
298-
EventSource: this.EventSource,
299-
ReadableStream: this.ReadableStream,
298+
fetch: this.fetch,
300299
options: { signal },
301300
});
302301
yield* stream;

index.test.ts

+5-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import Replicate, {
77
parseProgressFromLogs,
88
} from "replicate";
99
import nock from "nock";
10-
import fetch from "cross-fetch";
1110
import { createReadableStream } from "./lib/stream";
1211
import { PassThrough } from "node:stream";
1312

@@ -23,7 +22,6 @@ describe("Replicate client", () => {
2322

2423
beforeEach(() => {
2524
client = new Replicate({ auth: "test-token" });
26-
client.fetch = fetch;
2725

2826
unmatched = [];
2927
nock.emitter.on("no match", handleNoMatch);
@@ -1199,8 +1197,9 @@ describe("Replicate client", () => {
11991197
.matchHeader("Accept", "text/event-stream")
12001198
.reply(status, body);
12011199

1202-
return await createReadableStream({
1200+
return createReadableStream({
12031201
url: `${streamEndpoint}/fake_stream`,
1202+
fetch: fetch,
12041203
});
12051204
}
12061205

@@ -1484,7 +1483,9 @@ describe("Replicate client", () => {
14841483
done: false,
14851484
value: { event: "output", id: "EVENT_1", data: "hello world" },
14861485
});
1487-
await expect(iterator.next()).rejects.toThrowError("Unexpected Error");
1486+
await expect(iterator.next()).rejects.toThrowError(
1487+
"An unexpected error occurred"
1488+
);
14881489
expect(await iterator.next()).toEqual({ done: true });
14891490
});
14901491

lib/stream.js

+36-73
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Attempt to use readable-stream if available, attempt to use the built-in stream module.
22

33
const ApiError = require("./error");
4+
const { EventSourceParserStream } = require("eventsource-parser/stream");
45

56
/**
67
* A server-sent event.
@@ -38,87 +39,49 @@ class ServerSentEvent {
3839
*
3940
* @param {object} config
4041
* @param {string} config.url The URL to connect to.
41-
* @param {any} [config.EventSource] A standards compliant EventSource implementation.
42-
* @param {any} [config.ReadableStream] A standards compliant ReadableStream implementation.
42+
* @param {typeof fetch} [config.fetch] The URL to connect to.
4343
* @param {object} [config.options] The EventSource options.
4444
* @returns {Promise<ReadableStream & AsyncIterable<ServerSentEvent>>}
4545
*/
46-
async function createReadableStream({
47-
url,
48-
EventSource = globalThis.EventSource,
49-
ReadableStream = globalThis.ReadableStream,
50-
options = {},
51-
}) {
52-
const EventSourceClass = EventSource
53-
? EventSource
54-
: (await import("eventsource")).default;
55-
const source = new EventSourceClass(url, options);
56-
57-
const stream = new ReadableStream({
58-
cancel() {
59-
source.close();
60-
},
61-
46+
function createReadableStream({ url, fetch, options = {} }) {
47+
return new ReadableStream({
6248
async start(controller) {
63-
return new Promise((resolve) => {
64-
source.addEventListener("output", (evt) => {
65-
const entry = new ServerSentEvent(
66-
evt.type,
67-
evt.data,
68-
evt.lastEventId
69-
);
70-
controller.enqueue(entry);
71-
});
72-
73-
source.addEventListener("done", (evt) => {
74-
const entry = new ServerSentEvent(
75-
evt.type,
76-
evt.data,
77-
evt.lastEventId
78-
);
79-
controller.enqueue(entry);
80-
source.close();
81-
controller.close();
82-
});
49+
const init = {
50+
...options,
51+
headers: {
52+
...options.headers,
53+
Accept: "text/event-stream",
54+
},
55+
};
56+
const response = await fetch(url, init);
8357

84-
source.addEventListener("open", (_evt) => {
85-
resolve();
86-
});
58+
if (!response.ok) {
59+
const text = await response.text();
60+
const request = new Request(url, init);
61+
controller.error(
62+
new ApiError(
63+
`Request to ${url} failed with status ${response.status}`,
64+
request,
65+
response
66+
)
67+
);
68+
}
8769

88-
source.addEventListener("error", (evt) => {
89-
// HTTP Error
90-
if (typeof evt.status === "number") {
91-
source.close();
92-
controller.error(
93-
new ApiError(
94-
`Request to ${url} failed with status ${evt.status} ${
95-
evt.message ?? ""
96-
}`.trim()
97-
)
98-
);
99-
return;
100-
}
101-
102-
// Connection closed
103-
if (!evt.message && source.readyState === 0) {
104-
controller.close();
105-
source.close();
106-
return;
107-
}
108-
109-
// Other
110-
source.close();
111-
controller.error(new Error(evt.message ?? "Unexpected Error"));
112-
});
113-
114-
options.signal?.addEventListener("abort", () => {
115-
source.close();
116-
});
117-
});
70+
const stream = response.body
71+
.pipeThrough(new TextDecoderStream())
72+
.pipeThrough(new EventSourceParserStream());
73+
for await (const event of stream) {
74+
if (event.event === "error") {
75+
controller.error(new Error(event.data));
76+
} else {
77+
controller.enqueue(
78+
new ServerSentEvent(event.event, event.data, event.id)
79+
);
80+
}
81+
}
82+
controller.close();
11883
},
11984
});
120-
121-
return stream;
12285
}
12386

12487
module.exports = {

0 commit comments

Comments
 (0)