Skip to content

Commit 8dc2960

Browse files
committed
Handle processing chunked event streams
1 parent f79a33f commit 8dc2960

File tree

8 files changed

+580
-119
lines changed

8 files changed

+580
-119
lines changed

README.md

+8
Original file line numberDiff line numberDiff line change
@@ -811,3 +811,11 @@ You can call this method directly to make other requests to the API.
811811
## TypeScript
812812

813813
The `Replicate` constructor and all `replicate.*` methods are fully typed.
814+
815+
## Vendored Dependencies
816+
817+
We have a few dependencies that have been bundled into the vendor directory rather than adding external npm dependencies.
818+
819+
These have been generated using bundlejs.com and copied into the appropriate directory along with the license and repository information.
820+
821+
* [eventsource-parser/stream](https://bundlejs.com/?bundle&q=eventsource-parser%40latest%2Fstream&config=%7B%22esbuild%22%3A%7B%22format%22%3A%22cjs%22%2C%22minify%22%3Afalse%2C%22platform%22%3A%22neutral%22%7D%7D)

biome.json

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
{
22
"$schema": "https://biomejs.dev/schemas/1.0.0/schema.json",
3+
"files": {
4+
"ignore": [".wrangler", "vendor/*"]
5+
},
36
"formatter": {
47
"indentStyle": "space",
58
"indentWidth": 2

index.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const ApiError = require("./lib/error");
22
const ModelVersionIdentifier = require("./lib/identifier");
3-
const { Stream } = require("./lib/stream");
3+
const { createReadableStream } = require("./lib/stream");
44
const {
55
withAutomaticRetries,
66
validateWebhook,
@@ -289,7 +289,7 @@ class Replicate {
289289

290290
if (prediction.urls && prediction.urls.stream) {
291291
const { signal } = options;
292-
const stream = new Stream({
292+
const stream = createReadableStream({
293293
url: prediction.urls.stream,
294294
fetch: this.fetch,
295295
options: { signal },

index.test.ts

+314-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import Replicate, {
77
parseProgressFromLogs,
88
} from "replicate";
99
import nock from "nock";
10-
import fetch from "cross-fetch";
10+
import { createReadableStream } from "./lib/stream";
11+
import { PassThrough } from "node:stream";
1112

1213
let client: Replicate;
1314
const BASE_URL = "https://api.replicate.com/v1";
@@ -21,7 +22,6 @@ describe("Replicate client", () => {
2122

2223
beforeEach(() => {
2324
client = new Replicate({ auth: "test-token" });
24-
client.fetch = fetch;
2525

2626
unmatched = [];
2727
nock.emitter.on("no match", handleNoMatch);
@@ -251,7 +251,7 @@ describe("Replicate client", () => {
251251
let actual: Record<string, any> | undefined;
252252
nock(BASE_URL)
253253
.post("/predictions")
254-
.reply(201, (uri: string, body: Record<string, any>) => {
254+
.reply(201, (_uri: string, body: Record<string, any>) => {
255255
actual = body;
256256
return body;
257257
});
@@ -1010,8 +1010,6 @@ describe("Replicate client", () => {
10101010
});
10111011

10121012
test("Calls the correct API routes for a model", async () => {
1013-
const firstPollingRequest = true;
1014-
10151013
nock(BASE_URL)
10161014
.post("/models/replicate/hello-world/predictions")
10171015
.reply(201, {
@@ -1179,12 +1177,322 @@ describe("Replicate client", () => {
11791177
// This is a test secret and should not be used in production
11801178
const secret = "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw";
11811179

1182-
const isValid = await validateWebhook(request, secret);
1180+
const isValid = validateWebhook(request, secret);
11831181
expect(isValid).toBe(true);
11841182
});
11851183

11861184
// Add more tests for error handling, edge cases, etc.
11871185
});
11881186

11891187
// Continue with tests for other methods
1188+
1189+
describe("createReadableStream", () => {
1190+
function createStream(body: string | NodeJS.ReadableStream, status = 200) {
1191+
const streamEndpoint = "https://stream.replicate.com";
1192+
nock(streamEndpoint)
1193+
.get("/fake_stream")
1194+
.matchHeader("Accept", "text/event-stream")
1195+
.reply(status, body);
1196+
1197+
return createReadableStream({
1198+
url: `${streamEndpoint}/fake_stream`,
1199+
fetch: fetch,
1200+
});
1201+
}
1202+
1203+
test("consumes a server sent event stream", async () => {
1204+
const stream = createStream(
1205+
`
1206+
event: output
1207+
id: EVENT_1
1208+
data: hello world
1209+
1210+
event: done
1211+
id: EVENT_2
1212+
data: {}
1213+
1214+
`.replace(/^[ ]+/gm, "")
1215+
);
1216+
1217+
const iterator = stream[Symbol.asyncIterator]();
1218+
expect(await iterator.next()).toEqual({
1219+
done: false,
1220+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1221+
});
1222+
expect(await iterator.next()).toEqual({
1223+
done: false,
1224+
value: { event: "done", id: "EVENT_2", data: "{}" },
1225+
});
1226+
expect(await iterator.next()).toEqual({ done: true });
1227+
expect(await iterator.next()).toEqual({ done: true });
1228+
});
1229+
1230+
test("consumes multiple events", async () => {
1231+
const stream = createStream(
1232+
`
1233+
event: output
1234+
id: EVENT_1
1235+
data: hello world
1236+
1237+
event: output
1238+
id: EVENT_2
1239+
data: hello dave
1240+
1241+
event: done
1242+
id: EVENT_3
1243+
data: {}
1244+
1245+
`.replace(/^[ ]+/gm, "")
1246+
);
1247+
1248+
const iterator = stream[Symbol.asyncIterator]();
1249+
1250+
expect(await iterator.next()).toEqual({
1251+
done: false,
1252+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1253+
});
1254+
expect(await iterator.next()).toEqual({
1255+
done: false,
1256+
value: { event: "output", id: "EVENT_2", data: "hello dave" },
1257+
});
1258+
expect(await iterator.next()).toEqual({
1259+
done: false,
1260+
value: { event: "done", id: "EVENT_3", data: "{}" },
1261+
});
1262+
expect(await iterator.next()).toEqual({ done: true });
1263+
expect(await iterator.next()).toEqual({ done: true });
1264+
});
1265+
1266+
test("ignores unexpected characters", async () => {
1267+
const stream = createStream(
1268+
`
1269+
: hi
1270+
1271+
event: output
1272+
id: EVENT_1
1273+
data: hello world
1274+
1275+
event: done
1276+
id: EVENT_2
1277+
data: {}
1278+
1279+
`.replace(/^[ ]+/gm, "")
1280+
);
1281+
1282+
const iterator = stream[Symbol.asyncIterator]();
1283+
1284+
expect(await iterator.next()).toEqual({
1285+
done: false,
1286+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1287+
});
1288+
expect(await iterator.next()).toEqual({
1289+
done: false,
1290+
value: { event: "done", id: "EVENT_2", data: "{}" },
1291+
});
1292+
expect(await iterator.next()).toEqual({ done: true });
1293+
expect(await iterator.next()).toEqual({ done: true });
1294+
});
1295+
1296+
test("supports multiple lines of output in a single event", async () => {
1297+
const stream = createStream(
1298+
`
1299+
: hi
1300+
1301+
event: output
1302+
id: EVENT_1
1303+
data: hello,
1304+
data: this is a new line,
1305+
data: and this is a new line too
1306+
1307+
event: done
1308+
id: EVENT_2
1309+
data: {}
1310+
1311+
`.replace(/^[ ]+/gm, "")
1312+
);
1313+
1314+
const iterator = stream[Symbol.asyncIterator]();
1315+
1316+
expect(await iterator.next()).toEqual({
1317+
done: false,
1318+
value: {
1319+
event: "output",
1320+
id: "EVENT_1",
1321+
data: "hello,\nthis is a new line,\nand this is a new line too",
1322+
},
1323+
});
1324+
expect(await iterator.next()).toEqual({
1325+
done: false,
1326+
value: { event: "done", id: "EVENT_2", data: "{}" },
1327+
});
1328+
expect(await iterator.next()).toEqual({ done: true });
1329+
expect(await iterator.next()).toEqual({ done: true });
1330+
});
1331+
1332+
test("supports the server writing data lines in multiple chunks", async () => {
1333+
const body = new PassThrough();
1334+
const stream = createStream(body);
1335+
1336+
// Create a stream of data chunks split on the pipe character for readability.
1337+
const data = `
1338+
event: output
1339+
id: EVENT_1
1340+
data: hello,|
1341+
data: this is a new line,|
1342+
data: and this is a new line too
1343+
1344+
event: done
1345+
id: EVENT_2
1346+
data: {}
1347+
1348+
`.replace(/^[ ]+/gm, "");
1349+
1350+
const chunks = data.split("|");
1351+
1352+
// Consume the iterator in parallel to writing it.
1353+
const reading = new Promise((resolve, reject) => {
1354+
(async () => {
1355+
const iterator = stream[Symbol.asyncIterator]();
1356+
expect(await iterator.next()).toEqual({
1357+
done: false,
1358+
value: {
1359+
event: "output",
1360+
id: "EVENT_1",
1361+
data: "hello,\nthis is a new line,\nand this is a new line too",
1362+
},
1363+
});
1364+
expect(await iterator.next()).toEqual({
1365+
done: false,
1366+
value: { event: "done", id: "EVENT_2", data: "{}" },
1367+
});
1368+
expect(await iterator.next()).toEqual({ done: true });
1369+
})().then(resolve, reject);
1370+
});
1371+
1372+
// Write the chunks to the stream at an interval.
1373+
const writing = new Promise((resolve, reject) => {
1374+
(async () => {
1375+
for await (const chunk of chunks) {
1376+
body.write(chunk);
1377+
await new Promise((resolve) => setTimeout(resolve, 1));
1378+
}
1379+
body.end();
1380+
resolve(null);
1381+
})().then(resolve, reject);
1382+
});
1383+
1384+
// Wait for both promises to resolve.
1385+
await Promise.all([reading, writing]);
1386+
});
1387+
1388+
test("supports the server writing data in a complete mess", async () => {
1389+
const body = new PassThrough();
1390+
const stream = createStream(body);
1391+
1392+
// Create a stream of data chunks split on the pipe character for readability.
1393+
const data = `
1394+
: hi
1395+
1396+
ev|ent: output
1397+
id: EVENT_1
1398+
data: hello,
1399+
data: this |is a new line,|
1400+
data: and this is |a new line too
1401+
1402+
event: d|one
1403+
id: EVENT|_2
1404+
data: {}
1405+
1406+
`.replace(/^[ ]+/gm, "");
1407+
1408+
const chunks = data.split("|");
1409+
1410+
// Consume the iterator in parallel to writing it.
1411+
const reading = new Promise((resolve, reject) => {
1412+
(async () => {
1413+
const iterator = stream[Symbol.asyncIterator]();
1414+
expect(await iterator.next()).toEqual({
1415+
done: false,
1416+
value: {
1417+
event: "output",
1418+
id: "EVENT_1",
1419+
data: "hello,\nthis is a new line,\nand this is a new line too",
1420+
},
1421+
});
1422+
expect(await iterator.next()).toEqual({
1423+
done: false,
1424+
value: { event: "done", id: "EVENT_2", data: "{}" },
1425+
});
1426+
expect(await iterator.next()).toEqual({ done: true });
1427+
})().then(resolve, reject);
1428+
});
1429+
1430+
// Write the chunks to the stream at an interval.
1431+
const writing = new Promise((resolve, reject) => {
1432+
(async () => {
1433+
for await (const chunk of chunks) {
1434+
body.write(chunk);
1435+
await new Promise((resolve) => setTimeout(resolve, 1));
1436+
}
1437+
body.end();
1438+
resolve(null);
1439+
})().then(resolve, reject);
1440+
});
1441+
1442+
// Wait for both promises to resolve.
1443+
await Promise.all([reading, writing]);
1444+
});
1445+
1446+
test("supports ending without a done", async () => {
1447+
const stream = createStream(
1448+
`
1449+
event: output
1450+
id: EVENT_1
1451+
data: hello world
1452+
1453+
`.replace(/^[ ]+/gm, "")
1454+
);
1455+
1456+
const iterator = stream[Symbol.asyncIterator]();
1457+
expect(await iterator.next()).toEqual({
1458+
done: false,
1459+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1460+
});
1461+
expect(await iterator.next()).toEqual({ done: true });
1462+
});
1463+
1464+
test("an error event in the stream raises an exception", async () => {
1465+
const stream = createStream(
1466+
`
1467+
event: output
1468+
id: EVENT_1
1469+
data: hello world
1470+
1471+
event: error
1472+
id: EVENT_2
1473+
data: An unexpected error occurred
1474+
1475+
`.replace(/^[ ]+/gm, "")
1476+
);
1477+
1478+
const iterator = stream[Symbol.asyncIterator]();
1479+
expect(await iterator.next()).toEqual({
1480+
done: false,
1481+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1482+
});
1483+
await expect(iterator.next()).rejects.toThrowError(
1484+
"An unexpected error occurred"
1485+
);
1486+
expect(await iterator.next()).toEqual({ done: true });
1487+
});
1488+
1489+
test("an error when fetching the stream raises an exception", async () => {
1490+
const stream = createStream("{}", 500);
1491+
const iterator = stream[Symbol.asyncIterator]();
1492+
await expect(iterator.next()).rejects.toThrowError(
1493+
"Request to https://stream.replicate.com/fake_stream failed with status 500"
1494+
);
1495+
expect(await iterator.next()).toEqual({ done: true });
1496+
});
1497+
});
11901498
});

0 commit comments

Comments
 (0)