Skip to content

Commit 0260046

Browse files
authored
feat(kinesisfirehose-alpha): refactor sourceStream property to support multiple types of sources (#31723)
### Reason for this change The previous API for `source` was designed under the assumption that a Source would either be a `Stream` or `Direct Put` if not. Since the alpha module was written, support on the service side for MSK as a Source has been added so we should update the `source` property to accept an `ISource` which can then be implemented by different types of Sources. ### Description of changes Replaced the `sourceStream` property with `source`. Changed the `source` property from `IStream` to `ISource`. Added an `ISource` interface which is implemented by classes which represent the different Source types. Currently implemented by the `KinesisStreamSource` class. The `MSKSource` class can be added in a separate PR. Added a `SourceConfig` which contains the property configs for each respective source (as the fields within these property configs are different across each source). In `delivery-stream.ts` we call the `_bind` method which will populate and return the correct property config for the Source and that gets directly injected where the L1 `CFNDeliveryStream` is created. This pattern is also used for Destinations: ```ts const destinationConfig = props.destination.bind(this, {}); const sourceConfig = props.source?._bind(this, this._role?.roleArn); const resource = new CfnDeliveryStream(this, 'Resource', { deliveryStreamEncryptionConfigurationInput: encryptionConfig, deliveryStreamName: props.deliveryStreamName, deliveryStreamType: props.source ? 'KinesisStreamAsSource' : 'DirectPut', ...sourceConfig, ...destinationConfig, }); ``` ### Description of how you validated changes no behavioural changes. the updated integ tests and unit tests still pass existing tests. exempting integ tests because we don't want the generated CFN to change. ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- BREAKING CHANGE: Replaced the `sourceStream` property with `source`. Changed the `source` property type from `IStream` to `ISource`. Instead of passing in the source Stream directly, it will be passed in by calling the appropriate class like so: `source: new source.KinesisStreamSource(sourceStream)`.
1 parent 252cca9 commit 0260046

File tree

6 files changed

+103
-25
lines changed

6 files changed

+103
-25
lines changed

packages/@aws-cdk/aws-kinesisfirehose-alpha/README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,23 @@ The above example defines the following resources:
5454

5555
## Sources
5656

57-
There are two main methods of sourcing input data: Kinesis Data Streams and via a "direct
58-
put".
57+
A Kinesis Data Firehose delivery stream can accept data from three main sources: Kinesis Data Streams, Managed Streaming for Apache Kafka (MSK), or via a "direct put" (API calls).
5958

6059
See: [Sending Data to a Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html)
6160
in the *Kinesis Data Firehose Developer Guide*.
6261

6362
### Kinesis Data Stream
6463

6564
A delivery stream can read directly from a Kinesis data stream as a consumer of the data
66-
stream. Configure this behaviour by providing a data stream in the `sourceStream`
67-
property when constructing a delivery stream:
65+
stream. Configure this behaviour by passing in a data stream in the `source`
66+
property via the `KinesisStreamSource` class when constructing a delivery stream:
6867

6968
```ts
7069
declare const destination: firehose.IDestination;
7170
const sourceStream = new kinesis.Stream(this, 'Source Stream');
71+
7272
new firehose.DeliveryStream(this, 'Delivery Stream', {
73-
sourceStream: sourceStream,
73+
source: new firehose.KinesisStreamSource(sourceStream),
7474
destination: destination,
7575
});
7676
```
@@ -444,7 +444,7 @@ necessary permissions for Kinesis Data Firehose to access the resources referenc
444444
delivery stream. One service role is created for the delivery stream that allows Kinesis
445445
Data Firehose to read from a Kinesis data stream (if one is configured as the delivery
446446
stream source) and for server-side encryption. Note that if the DeliveryStream is created
447-
without specifying `sourceStream` or `encryptionKey`, this role is not created as it is not needed.
447+
without specifying a `source` or `encryptionKey`, this role is not created as it is not needed.
448448

449449
Another service role is created for each destination, which gives Kinesis Data Firehose write
450450
access to the destination resource, as well as the ability to invoke data transformers and

packages/@aws-cdk/aws-kinesisfirehose-alpha/lib/delivery-stream.ts

+10-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
22
import * as ec2 from 'aws-cdk-lib/aws-ec2';
33
import * as iam from 'aws-cdk-lib/aws-iam';
4-
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
54
import * as kms from 'aws-cdk-lib/aws-kms';
65
import * as cdk from 'aws-cdk-lib/core';
76
import { RegionInfo } from 'aws-cdk-lib/region-info';
@@ -10,6 +9,7 @@ import { IDestination } from './destination';
109
import { FirehoseMetrics } from 'aws-cdk-lib/aws-kinesisfirehose/lib/kinesisfirehose-canned-metrics.generated';
1110
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';
1211
import { StreamEncryption } from './encryption';
12+
import { ISource } from './source';
1313

1414
const PUT_RECORD_ACTIONS = [
1515
'firehose:PutRecord',
@@ -201,7 +201,7 @@ export interface DeliveryStreamProps {
201201
*
202202
* @default - data must be written to the delivery stream via a direct put.
203203
*/
204-
readonly sourceStream?: kinesis.IStream;
204+
readonly source?: ISource;
205205

206206
/**
207207
* The IAM role associated with this delivery stream.
@@ -322,14 +322,14 @@ export class DeliveryStream extends DeliveryStreamBase {
322322

323323
this._role = props.role;
324324

325-
if (props.encryption?.encryptionKey || props.sourceStream) {
325+
if (props.encryption?.encryptionKey || props.source) {
326326
this._role = this._role ?? new iam.Role(this, 'Service Role', {
327327
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
328328
});
329329
}
330330

331331
if (
332-
props.sourceStream &&
332+
props.source &&
333333
(props.encryption?.type === StreamEncryptionType.AWS_OWNED || props.encryption?.type === StreamEncryptionType.CUSTOMER_MANAGED)
334334
) {
335335
throw new Error('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
@@ -353,27 +353,24 @@ export class DeliveryStream extends DeliveryStreamBase {
353353
encryptionKey?.grantEncryptDecrypt(this._role);
354354
}
355355

356-
let sourceStreamConfig = undefined;
357356
let readStreamGrant = undefined;
358-
if (this._role && props.sourceStream) {
359-
sourceStreamConfig = {
360-
kinesisStreamArn: props.sourceStream.streamArn,
361-
roleArn: this._role.roleArn,
362-
};
363-
readStreamGrant = props.sourceStream.grantRead(this._role);
357+
if (this._role && props.source) {
358+
readStreamGrant = props.source.grantRead(this._role);
364359
}
365360

366361
const destinationConfig = props.destination.bind(this, {});
362+
const sourceConfig = props.source?._bind(this, this._role?.roleArn);
367363

368364
const resource = new CfnDeliveryStream(this, 'Resource', {
369365
deliveryStreamEncryptionConfigurationInput: encryptionConfig,
370366
deliveryStreamName: props.deliveryStreamName,
371-
deliveryStreamType: props.sourceStream ? 'KinesisStreamAsSource' : 'DirectPut',
372-
kinesisStreamSourceConfiguration: sourceStreamConfig,
367+
deliveryStreamType: props.source ? 'KinesisStreamAsSource' : 'DirectPut',
368+
...sourceConfig,
373369
...destinationConfig,
374370
});
375371

376372
destinationConfig.dependables?.forEach(dependable => resource.node.addDependency(dependable));
373+
377374
if (readStreamGrant) {
378375
resource.node.addDependency(readStreamGrant);
379376
}

packages/@aws-cdk/aws-kinesisfirehose-alpha/lib/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './delivery-stream';
2+
export * from './source';
23
export * from './destination';
34
export * from './encryption';
45
export * from './lambda-function-processor';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { Construct } from 'constructs';
2+
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';
3+
import * as iam from 'aws-cdk-lib/aws-iam';
4+
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
5+
6+
/**
7+
* A Kinesis Data Firehose delivery stream source configuration.
8+
*/
9+
interface SourceConfig {
10+
/**
11+
* Configuration for using a Kinesis Data Stream as a source for the delivery stream.
12+
*
13+
* This will be returned by the _bind method depending on what type of Source class is specified.
14+
*
15+
* @default - Kinesis Data Stream Source configuration property is not provided.
16+
*/
17+
readonly kinesisStreamSourceConfiguration?: CfnDeliveryStream.KinesisStreamSourceConfigurationProperty;
18+
19+
/**
20+
* Configuration for using an MSK (Managed Streaming for Kafka) cluster as a source for the delivery stream.
21+
*
22+
* This will be returned by the _bind method depending on what type of Source class is specified.
23+
*
24+
* @default - MSK Source configuration property is not provided.
25+
*/
26+
readonly mskSourceConfiguration?: CfnDeliveryStream.MSKSourceConfigurationProperty;
27+
}
28+
29+
/**
30+
* An interface for defining a source that can be used in a Kinesis Data Firehose delivery stream.
31+
*/
32+
export interface ISource {
33+
/**
34+
* Binds this source to the Kinesis Data Firehose delivery stream.
35+
*
36+
* @internal
37+
*/
38+
_bind(scope: Construct, roleArn?: string): SourceConfig;
39+
40+
/**
41+
* Grant read permissions for this source resource and its contents to an IAM
42+
* principal (the delivery stream).
43+
*
44+
* If an encryption key is used, permission to use the key to decrypt the
45+
* contents of the stream will also be granted.
46+
*/
47+
grantRead(grantee: iam.IGrantable): iam.Grant;
48+
}
49+
50+
/**
51+
* A Kinesis Data Firehose delivery stream source.
52+
*/
53+
export class KinesisStreamSource implements ISource {
54+
55+
/**
56+
* Creates a new KinesisStreamSource.
57+
*/
58+
constructor(private readonly stream: kinesis.IStream) {}
59+
60+
grantRead(grantee: iam.IGrantable): iam.Grant {
61+
return this.stream.grantRead(grantee);
62+
}
63+
64+
/**
65+
* Binds the Kinesis stream as a source for the Kinesis Data Firehose delivery stream.
66+
*
67+
* @returns The configuration needed to use this Kinesis stream as the delivery stream source.
68+
* @internal
69+
*/
70+
_bind(_scope: Construct, roleArn: string): SourceConfig {
71+
return {
72+
kinesisStreamSourceConfiguration: {
73+
kinesisStreamArn: this.stream.streamArn,
74+
roleArn: roleArn,
75+
},
76+
};
77+
}
78+
}

packages/@aws-cdk/aws-kinesisfirehose-alpha/test/delivery-stream.test.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import * as cdk from 'aws-cdk-lib';
1010
import { Construct, Node } from 'constructs';
1111
import * as firehose from '../lib';
1212
import { StreamEncryption } from '../lib';
13+
import * as source from '../lib/source';
1314

1415
describe('delivery stream', () => {
1516
let stack: cdk.Stack;
@@ -134,7 +135,7 @@ describe('delivery stream', () => {
134135

135136
new firehose.DeliveryStream(stack, 'Delivery Stream', {
136137
destination: mockS3Destination,
137-
sourceStream: sourceStream,
138+
source: new source.KinesisStreamSource(sourceStream),
138139
});
139140

140141
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
@@ -180,7 +181,7 @@ describe('delivery stream', () => {
180181

181182
new firehose.DeliveryStream(stack, 'Delivery Stream', {
182183
destination: mockS3Destination,
183-
sourceStream: sourceStream,
184+
source: new source.KinesisStreamSource(sourceStream),
184185
role: deliveryStreamRole,
185186
});
186187

@@ -318,17 +319,17 @@ describe('delivery stream', () => {
318319
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 1', {
319320
destination: mockS3Destination,
320321
encryption: firehose.StreamEncryption.awsOwnedKey(),
321-
sourceStream,
322+
source: new source.KinesisStreamSource(sourceStream),
322323
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
323324
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 2', {
324325
destination: mockS3Destination,
325326
encryption: firehose.StreamEncryption.customerManagedKey(),
326-
sourceStream,
327+
source: new source.KinesisStreamSource(sourceStream),
327328
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
328329
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 3', {
329330
destination: mockS3Destination,
330331
encryption: StreamEncryption.customerManagedKey(new kms.Key(stack, 'Key')),
331-
sourceStream,
332+
source: new source.KinesisStreamSource(sourceStream),
332333
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
333334
});
334335

packages/@aws-cdk/aws-kinesisfirehose-alpha/test/integ.delivery-stream.source-stream.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as s3 from 'aws-cdk-lib/aws-s3';
55
import * as cdk from 'aws-cdk-lib';
66
import * as constructs from 'constructs';
77
import * as firehose from '../lib';
8+
import * as source from '../lib/source';
89

910
const app = new cdk.App();
1011

@@ -35,7 +36,7 @@ const sourceStream = new kinesis.Stream(stack, 'Source Stream');
3536

3637
new firehose.DeliveryStream(stack, 'Delivery Stream', {
3738
destination: mockS3Destination,
38-
sourceStream,
39+
source: new source.KinesisStreamSource(sourceStream),
3940
});
4041

4142
new firehose.DeliveryStream(stack, 'Delivery Stream No Source Or Encryption Key', {

0 commit comments

Comments
 (0)