diff --git a/lib/stream.js b/lib/stream.js index 012d6d0..6312682 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,14 +1,3 @@ -// Attempt to use readable-stream if available, attempt to use the built-in stream module. -let Readable; -try { - Readable = require("readable-stream").Readable; -} catch (e) { - try { - Readable = require("stream").Readable; - } catch (e) { - Readable = null; - } -} /** * A server-sent event. @@ -41,95 +30,100 @@ class ServerSentEvent { } } -/** - * A stream of server-sent events. - */ -class Stream extends Readable { - /** - * Create a new stream of server-sent events. - * - * @param {string} url The URL to connect to. - * @param {object} options The fetch options. - */ +class Stream { constructor(url, options) { - if (!Readable) { - throw new Error( - "Readable streams are not supported. Please use Node.js 18 or later, or install the readable-stream package." - ); - } - - super(); this.url = url; this.options = options; + this.readableStream = new ReadableStream({ + start: async (controller) => { + const response = await fetch(this.url, { + ...this.options, + headers: { + Accept: 'text/event-stream', + }, + }); + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let eventBuffer = ''; + + const processChunk = (chunk) => { + eventBuffer += decoder.decode(chunk, {stream: true}); + let eolIndex = eventBuffer.indexOf('\n'); + while (eolIndex >= 0) { + const line = eventBuffer.slice(0, eolIndex).trim(); + eventBuffer = eventBuffer.slice(eolIndex + 1); + if (line === '') { + // End of an event + const event = this.parseEvent(eventBuffer); + controller.enqueue(event); + eventBuffer = ''; + } else { + // Accumulate data + eventBuffer += `${line}\n` + } + + eolIndex = eventBuffer.indexOf('\n'); + } + }; - this.event = null; - this.data = []; - this.lastEventId = null; - this.retry = null; - } + const push = async () => { + const {done, value} = await reader.read(); + if (done) { + controller.close(); + return; + } + processChunk(value); + push(); + }; - decode(line) { - if (!line) { - if (!this.event && !this.data.length && !this.lastEventId) { - return null; + push(); } - - const sse = new ServerSentEvent( - this.event, - this.data.join("\n"), - this.lastEventId - ); - - this.event = null; - this.data = []; - this.retry = null; - - return sse; - } - - if (line.startsWith(":")) { - return null; - } - - const [field, value] = line.split(": "); - if (field === "event") { - this.event = value; - } else if (field === "data") { - this.data.push(value); - } else if (field === "id") { - this.lastEventId = value; - } - - return null; - } - - async *[Symbol.asyncIterator]() { - const response = await fetch(this.url, { - ...this.options, - headers: { - Accept: "text/event-stream", - }, }); + } - for await (const chunk of response.body) { - const decoder = new TextDecoder("utf-8"); - const text = decoder.decode(chunk); - const lines = text.split("\n"); - for (const line of lines) { - const sse = this.decode(line); - if (sse) { - if (sse.event === "error") { - throw new Error(sse.data); - } - - yield sse; - - if (sse.event === "done") { - return; - } - } + parseEvent(rawData) { + const lines = rawData.trim().split('\n'); + let event = 'message'; + let data = ''; + let id = null; + let retry = null; + + for (const line of lines) { + const [fieldName, value] = line.split(/:(.*)/, 2); + switch (fieldName) { + case 'event': + event = value.trim(); + break; + case 'data': + data += `${value.trim()}\n`; + break; + case 'id': + id = value.trim(); + break; + case 'retry': + retry = parseInt(value.trim(), 10); + break; } } + return new ServerSentEvent(event, data.trim(), id, retry); + } + + [Symbol.asyncIterator]() { + const reader = this.readableStream.getReader(); + return { + next: async () => { + const { done, value } = await reader.read(); + return { done, value }; + }, + return: () => { + reader.releaseLock(); + return { done: true }; + }, + throw: (error) => { + reader.releaseLock(); + throw error; + }, + }; } }