Skip to content

Commit ebf1ea2

Browse files
authored
feat(pipes-targets-alpha): support Amazon Data Firehose target (#33860)
### Issue # (if applicable) N/A ### Reason for this change Amazon EventBridge Pipes supports an Amazon Data Firehose delivery stream as a target. https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-target.html ### Description of changes Added the `FirehoseTarget` integration class. ### Describe any new or updated permissions being added Grants following actions on the target delivery stream to the pipe - `firehose:PutRecord` - `firehose:PutRecordBatch` ### Description of how you validated changes Unit tests and integ test ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 7f378b6 commit ebf1ea2

File tree

16 files changed

+32745
-11
lines changed

16 files changed

+32745
-11
lines changed

packages/@aws-cdk/aws-pipes-targets-alpha/README.md

+45-11
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
2929
* `targets.ApiGatewayTarget`: [Send event source to an API Gateway REST API](#amazon-api-gateway-rest-api)
3030
* `targets.CloudWatchLogsTarget`: [Send event source to a CloudWatch Logs log group](#amazon-cloudwatch-logs-log-group)
3131
* `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
32+
* `targets.FirehoseTarget`: [Send event source to an Amazon Data Firehose delivery stream](#amazon-data-firehose-delivery-stream)
3233
* `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
3334
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
3435
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
@@ -37,7 +38,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
3738

3839
### Amazon EventBridge API Destination
3940

40-
An EventBridge API destination can be used as a target for a pipe.
41+
An EventBridge API destination can be used as a target for a pipe.
4142
The API destination will receive the (enriched/filtered) source payload.
4243

4344
```ts
@@ -70,7 +71,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
7071

7172
### Amazon API Gateway Rest API
7273

73-
A REST API can be used as a target for a pipe.
74+
A REST API can be used as a target for a pipe.
7475
The REST API will receive the (enriched/filtered) source payload.
7576

7677
```ts
@@ -115,7 +116,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
115116

116117
### Amazon CloudWatch Logs Log Group
117118

118-
A CloudWatch Logs log group can be used as a target for a pipe.
119+
A CloudWatch Logs log group can be used as a target for a pipe.
119120
The log group will receive the (enriched/filtered) source payload.
120121

121122
```ts
@@ -148,7 +149,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
148149

149150
### Amazon EventBridge Event Bus
150151

151-
An EventBridge event bus can be used as a target for a pipe.
152+
An EventBridge event bus can be used as a target for a pipe.
152153
The event bus will receive the (enriched/filtered) source payload.
153154

154155
```ts
@@ -179,9 +180,42 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
179180
});
180181
```
181182

183+
### Amazon Data Firehose Delivery Stream
184+
185+
An Amazon Data Firehose delivery stream can be used as a target for a pipe.
186+
The delivery stream will receive the (enriched/filtered) source payload.
187+
188+
```ts
189+
declare const sourceQueue: sqs.Queue;
190+
declare const targetDeliveryStream: firehose.DeliveryStream;
191+
192+
const deliveryStreamTarget = new targets.FirehoseTarget(targetDeliveryStream);
193+
194+
const pipe = new pipes.Pipe(this, 'Pipe', {
195+
source: new SqsSource(sourceQueue),
196+
target: deliveryStreamTarget,
197+
});
198+
```
199+
200+
The input to the target delivery stream can be transformed:
201+
202+
```ts
203+
declare const sourceQueue: sqs.Queue;
204+
declare const targetDeliveryStream: firehose.DeliveryStream;
205+
206+
const deliveryStreamTarget = new targets.FirehoseTarget(targetDeliveryStream, {
207+
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
208+
});
209+
210+
const pipe = new pipes.Pipe(this, 'Pipe', {
211+
source: new SqsSource(sourceQueue),
212+
target: deliveryStreamTarget,
213+
});
214+
```
215+
182216
### Amazon Kinesis Data Stream
183217

184-
A Kinesis data stream can be used as a target for a pipe.
218+
A Kinesis data stream can be used as a target for a pipe.
185219
The data stream will receive the (enriched/filtered) source payload.
186220

187221
```ts
@@ -217,7 +251,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
217251

218252
### AWS Lambda Function
219253

220-
A Lambda function can be used as a target for a pipe.
254+
A Lambda function can be used as a target for a pipe.
221255
The Lambda function will be invoked with the (enriched/filtered) source payload.
222256

223257
```ts
@@ -266,7 +300,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
266300

267301
### Amazon SageMaker Pipeline
268302

269-
A SageMaker pipeline can be used as a target for a pipe.
303+
A SageMaker pipeline can be used as a target for a pipe.
270304
The pipeline will receive the (enriched/filtered) source payload.
271305

272306
```ts
@@ -299,7 +333,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
299333

300334
### AWS Step Functions State Machine
301335

302-
A Step Functions state machine can be used as a target for a pipe.
336+
A Step Functions state machine can be used as a target for a pipe.
303337
The state machine will be invoked with the (enriched/filtered) source payload.
304338

305339
```ts
@@ -351,7 +385,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
351385

352386
### Amazon SQS Queue
353387

354-
An SQS queue can be used as a target for a pipe.
388+
An SQS queue can be used as a target for a pipe.
355389
The queue will receive the (enriched/filtered) source payload.
356390

357391
```ts
@@ -374,8 +408,8 @@ declare const targetQueue: sqs.Queue;
374408

375409
const pipeTarget = new targets.SqsTarget(targetQueue,
376410
{
377-
inputTransformation: pipes.InputTransformation.fromObject(
378-
{
411+
inputTransformation: pipes.InputTransformation.fromObject(
412+
{
379413
"SomeKey": pipes.DynamicInput.fromEventPath('$.body')
380414
})
381415
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
2+
import { IRole } from 'aws-cdk-lib/aws-iam';
3+
import { IDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';
4+
5+
/**
6+
* Amazon Data Firehose target properties.
7+
*/
8+
export interface FirehoseTargetParameters {
9+
/**
10+
* The input transformation to apply to the message before sending it to the target.
11+
*
12+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
13+
* @default - none
14+
*/
15+
readonly inputTransformation?: IInputTransformation;
16+
}
17+
18+
/**
19+
* An EventBridge Pipes target that sends messages to an Amazon Data Firehose delivery stream.
20+
*/
21+
export class FirehoseTarget implements ITarget {
22+
private deliveryStream: IDeliveryStream;
23+
private deliveryStreamParameters: FirehoseTargetParameters;
24+
public readonly targetArn: string;
25+
26+
constructor(deliveryStream: IDeliveryStream, parameters: FirehoseTargetParameters = {}) {
27+
this.deliveryStream = deliveryStream;
28+
this.targetArn = deliveryStream.deliveryStreamArn;
29+
this.deliveryStreamParameters = parameters;
30+
}
31+
32+
grantPush(grantee: IRole): void {
33+
this.deliveryStream.grantPutRecords(grantee);
34+
}
35+
36+
bind(pipe: IPipe): TargetConfig {
37+
return {
38+
targetParameters: {
39+
inputTemplate: this.deliveryStreamParameters.inputTransformation?.bind(pipe).inputTemplate,
40+
},
41+
};
42+
}
43+
}

packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export * from './api-destination';
22
export * from './api-gateway';
33
export * from './cloudwatch-logs';
44
export * from './event-bridge';
5+
export * from './firehose';
56
export * from './kinesis';
67
export * from './lambda';
78
export * from './sagemaker';

packages/@aws-cdk/aws-pipes-targets-alpha/rosetta/default.ts-fixture

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
33
import * as api from 'aws-cdk-lib/aws-apigateway';
44
import * as events from 'aws-cdk-lib/aws-events';
55
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
6+
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
67
import * as logs from 'aws-cdk-lib/aws-logs';
78
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
89
import * as sqs from 'aws-cdk-lib/aws-sqs';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import { InputTransformation, Pipe } from '@aws-cdk/aws-pipes-alpha';
2+
import { App, Stack } from 'aws-cdk-lib';
3+
import { Template } from 'aws-cdk-lib/assertions';
4+
import { DeliveryStream, S3Bucket } from 'aws-cdk-lib/aws-kinesisfirehose';
5+
import { Bucket } from 'aws-cdk-lib/aws-s3';
6+
import { TestSource } from './test-classes';
7+
import { FirehoseTarget } from '../lib/firehose';
8+
9+
describe('Firehose', () => {
10+
it('should have target arn', () => {
11+
// ARRANGE
12+
const app = new App();
13+
const stack = new Stack(app, 'TestStack');
14+
const bucket = new Bucket(stack, 'Bucket');
15+
const deliveryStream = new DeliveryStream(stack, 'MyDeliveryStream', {
16+
destination: new S3Bucket(bucket),
17+
});
18+
19+
const target = new FirehoseTarget(deliveryStream);
20+
21+
new Pipe(stack, 'MyPipe', {
22+
source: new TestSource(),
23+
target,
24+
});
25+
26+
// ACT
27+
const template = Template.fromStack(stack);
28+
29+
// ASSERT
30+
template.hasResourceProperties('AWS::Pipes::Pipe', {
31+
Target: {
32+
'Fn::GetAtt': [
33+
'MyDeliveryStream79822137',
34+
'Arn',
35+
],
36+
},
37+
});
38+
});
39+
40+
it('should have input transformation', () => {
41+
// ARRANGE
42+
const app = new App();
43+
const stack = new Stack(app, 'TestStack');
44+
const bucket = new Bucket(stack, 'Bucket');
45+
const deliveryStream = new DeliveryStream(stack, 'MyDeliveryStream', {
46+
destination: new S3Bucket(bucket),
47+
});
48+
49+
const inputTransformation = InputTransformation.fromObject({
50+
key: 'value',
51+
});
52+
const target = new FirehoseTarget(deliveryStream, {
53+
inputTransformation,
54+
});
55+
56+
new Pipe(stack, 'MyPipe', {
57+
source: new TestSource(),
58+
target,
59+
});
60+
61+
// ACT
62+
const template = Template.fromStack(stack);
63+
64+
// ASSERT
65+
template.hasResourceProperties('AWS::Pipes::Pipe', {
66+
TargetParameters: {
67+
InputTemplate: '{"key":"value"}',
68+
},
69+
});
70+
});
71+
72+
it('should grant pipe role putRecord access', () => {
73+
// ARRANGE
74+
const app = new App();
75+
const stack = new Stack(app, 'TestStack');
76+
const bucket = new Bucket(stack, 'Bucket');
77+
const deliveryStream = new DeliveryStream(stack, 'MyDeliveryStream', {
78+
destination: new S3Bucket(bucket),
79+
});
80+
81+
const target = new FirehoseTarget(deliveryStream);
82+
83+
new Pipe(stack, 'MyPipe', {
84+
source: new TestSource(),
85+
target,
86+
});
87+
88+
// ACT
89+
const template = Template.fromStack(stack);
90+
91+
// ASSERT
92+
template.hasResourceProperties('AWS::IAM::Policy', {
93+
PolicyDocument: {
94+
Statement: [
95+
{
96+
Action: ['firehose:PutRecord', 'firehose:PutRecordBatch'],
97+
Resource: { 'Fn::GetAtt': ['MyDeliveryStream79822137', 'Arn'] },
98+
},
99+
],
100+
},
101+
Roles: [{ Ref: 'MyPipeRoleCBC8E9AB' }],
102+
});
103+
});
104+
});

packages/@aws-cdk/aws-pipes-targets-alpha/test/integ.firehose.js.snapshot/asset.44e9c4d7a5d3fd2d677e1a7e416b2b56f6b0104bd5eff9cac5557b4c65a9dc61/index.js

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)