diff --git a/packages/eventstream-serde-universal/src/getUnmarshalledStream.spec.ts b/packages/eventstream-serde-universal/src/getUnmarshalledStream.spec.ts index 34d6fe3da71c9..9ecb3251a42c3 100644 --- a/packages/eventstream-serde-universal/src/getUnmarshalledStream.spec.ts +++ b/packages/eventstream-serde-universal/src/getUnmarshalledStream.spec.ts @@ -10,7 +10,7 @@ import { import { Message } from "@aws-sdk/types"; describe("getUnmarshalledStream", () => { - it("emits parsed message on data", async () => { + it("emits parsed payload on data", async () => { const expectedMessages: Array = [ { headers: { @@ -124,4 +124,23 @@ describe("getUnmarshalledStream", () => { "This is a modeled exception event that would be thrown in deserializer." ); }); + + it("omit the unknown event type", async () => { + const source = async function* () { + yield recordEventMessage; + }; + const unmarshallerStream = getUnmarshalledStream(source(), { + eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8), + deserializer: message => + Promise.resolve({ + $unknown: message + } as any), //deserializer that parse anything into unknown event + toUtf8 + }); + const messages: Array = []; + for await (const message of unmarshallerStream) { + messages.push(message[Object.keys(message)[0]]); + } + expect(messages.length).toEqual(0); + }); }); diff --git a/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts b/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts index 291a7731fefa2..d09dff380801c 100644 --- a/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts +++ b/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts @@ -41,7 +41,9 @@ export function getUnmarshalledStream( const event = { [message.headers[":event-type"].value as string]: message }; - yield await options.deserializer(event); + const deserialized = await options.deserializer(event); + if (deserialized.$unknown) continue; + yield deserialized; } else { throw Error( `Unrecognizable event type: ${message.headers[":event-type"].value}`