Skip to content

Commit 2bc3759

Browse files
authored
chore(eventstream-serde-node): use Readable.from for serializing (#4359)
1 parent 4035e36 commit 2bc3759

File tree

1 file changed

+1
-33
lines changed

1 file changed

+1
-33
lines changed

packages/eventstream-serde-node/src/EventStreamMarshaller.ts

+1-33
Original file line numberDiff line numberDiff line change
@@ -29,38 +29,6 @@ export class EventStreamMarshaller {
2929
}
3030

3131
serialize<T>(input: AsyncIterable<T>, serializer: (event: T) => Message): Readable {
32-
const serializedIterable = this.universalMarshaller.serialize(input, serializer);
33-
if (typeof Readable.from === "function") {
34-
//reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options
35-
return Readable.from(serializedIterable);
36-
} else {
37-
const iterator = serializedIterable[Symbol.asyncIterator]();
38-
const serializedStream = new Readable({
39-
autoDestroy: true,
40-
objectMode: true,
41-
async read() {
42-
iterator
43-
.next()
44-
.then(({ done, value }) => {
45-
if (done) {
46-
this.push(null);
47-
} else {
48-
this.push(value);
49-
}
50-
})
51-
.catch((err) => {
52-
this.destroy(err);
53-
});
54-
},
55-
});
56-
//TODO: use 'autoDestroy' when targeting Node 11
57-
serializedStream.on("error", () => {
58-
serializedStream.destroy();
59-
});
60-
serializedStream.on("end", () => {
61-
serializedStream.destroy();
62-
});
63-
return serializedStream;
64-
}
32+
return Readable.from(this.universalMarshaller.serialize(input, serializer));
6533
}
6634
}

0 commit comments

Comments
 (0)