From d18bbaeae42f2e6576eba4be7e89d45db5a5cf9d Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Wed, 7 Feb 2024 14:01:25 -0800 Subject: [PATCH] Refactor SSE construction to static method on ServerSentEvent --- lib/stream.js | 83 +++++++++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index e2e564f..9f244bf 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,4 +1,3 @@ - /** * A server-sent event. */ @@ -18,6 +17,39 @@ class ServerSentEvent { this.retry = retry; } + /** + * Parse raw event data into a ServerSentEvent instance. + * + * @param {string} rawData The raw event data. + * @returns {ServerSentEvent} The parsed server-sent event. + */ + static parse(rawData) { + const lines = rawData.trim().split("\n"); + let event = "message"; + let data = ""; + let id = null; + let retry = null; + + for (const line of lines) { + const [fieldName, value] = line.split(/:(.*)/, 2); + switch (fieldName) { + case "event": + event = value.trim(); + break; + case "data": + data += `${value.trim()}\n`; + break; + case "id": + id = value.trim(); + break; + case "retry": + retry = parseInt(value.trim(), 10); + break; + } + } + return new ServerSentEvent(event, data.trim(), id, retry); + } + /** * Convert the event to a string. */ @@ -39,35 +71,35 @@ class Stream { const response = await fetch(this.url, { ...this.options, headers: { - Accept: 'text/event-stream', + Accept: "text/event-stream", }, }); const reader = response.body.getReader(); const decoder = new TextDecoder(); - let eventBuffer = ''; + let eventBuffer = ""; const processChunk = (chunk) => { - eventBuffer += decoder.decode(chunk, {stream: true}); - let eolIndex = eventBuffer.indexOf('\n'); + eventBuffer += decoder.decode(chunk, { stream: true }); + let eolIndex = eventBuffer.indexOf("\n"); while (eolIndex >= 0) { const line = eventBuffer.slice(0, eolIndex).trim(); eventBuffer = eventBuffer.slice(eolIndex + 1); - if (line === '') { + if (line === "") { // End of an event - const event = this.parseEvent(eventBuffer); + const event = ServerSentEvent.parse(eventBuffer); controller.enqueue(event); - eventBuffer = ''; + eventBuffer = ""; } else { // Accumulate data - eventBuffer += `${line}\n` + eventBuffer += `${line}\n`; } - eolIndex = eventBuffer.indexOf('\n'); + eolIndex = eventBuffer.indexOf("\n"); } }; const push = async () => { - const {done, value} = await reader.read(); + const { done, value } = await reader.read(); if (done) { controller.close(); return; @@ -77,36 +109,9 @@ class Stream { }; push(); - } + }, }); } - - parseEvent(rawData) { - const lines = rawData.trim().split('\n'); - let event = 'message'; - let data = ''; - let id = null; - let retry = null; - - for (const line of lines) { - const [fieldName, value] = line.split(/:(.*)/, 2); - switch (fieldName) { - case 'event': - event = value.trim(); - break; - case 'data': - data += `${value.trim()}\n`; - break; - case 'id': - id = value.trim(); - break; - case 'retry': - retry = parseInt(value.trim(), 10); - break; - } - } - return new ServerSentEvent(event, data.trim(), id, retry); - } } module.exports = { -- 2.43.0