Skip to content

Commit 842f49a

Browse files
authored
feat(pipes-targets): add EventBridge (#30654)
Add EventBridge event bus as a Pipes target.
1 parent 4ada3ea commit 842f49a

16 files changed

+34187
-5
lines changed

packages/@aws-cdk/aws-pipes-targets-alpha/README.md

+36-4
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ Pipe targets are the end point of a EventBridge Pipe.
2929

3030
The following targets are supported:
3131

32-
1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
32+
1. `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs)
3333
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions-state-machine)
34-
3. `targets.LambdaFunction`: [Send event source to a Lambda Function](#aws-lambda-function)
35-
4. `targets.ApiDestinationTarget`: [Send event source to an EventBridge API Destination](#amazon-eventbridge-api-destination)
34+
3. `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
35+
4. `targets.ApiDestinationTarget`: [Send event source to an EventBridge API destination](#amazon-eventbridge-api-destination)
3636
5. `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
37+
6. `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
3738

3839
### Amazon SQS
3940

@@ -100,7 +101,6 @@ const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
100101
}
101102
);
102103

103-
104104
const pipe = new pipes.Pipe(this, 'Pipe', {
105105
source: new SomeSource(sourceQueue),
106106
target: pipeTarget
@@ -241,3 +241,35 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
241241
target: streamTarget,
242242
});
243243
```
244+
245+
### Amazon EventBridge Event Bus
246+
247+
An event bus can be used as a target for a pipe. The event bus will receive the (enriched/filtered) source payload.
248+
249+
```ts
250+
declare const sourceQueue: sqs.Queue;
251+
declare const targetEventBus: events.EventBus;
252+
253+
const eventBusTarget = new targets.EventBridgeTarget(targetEventBus);
254+
255+
const pipe = new pipes.Pipe(this, 'Pipe', {
256+
source: new SqsSource(sourceQueue),
257+
target: eventBusTarget,
258+
});
259+
```
260+
261+
The input to the target event bus can be transformed:
262+
263+
```ts
264+
declare const sourceQueue: sqs.Queue;
265+
declare const targetEventBus: events.EventBus;
266+
267+
const eventBusTarget = new targets.EventBridgeTarget(targetEventBus, {
268+
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
269+
});
270+
271+
const pipe = new pipes.Pipe(this, 'Pipe', {
272+
source: new SqsSource(sourceQueue),
273+
target: eventBusTarget,
274+
});
275+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
2+
import { Token } from 'aws-cdk-lib';
3+
import { IEventBus } from 'aws-cdk-lib/aws-events';
4+
import { IRole } from 'aws-cdk-lib/aws-iam';
5+
6+
/**
7+
* EventBridge target properties.
8+
*/
9+
export interface EventBridgeTargetParameters {
10+
/**
11+
* The input transformation to apply to the message before sending it to the target.
12+
*
13+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
14+
* @default - none
15+
*/
16+
readonly inputTransformation?: IInputTransformation;
17+
18+
/**
19+
* A free-form string used to decide what fields to expect in the event detail.
20+
*
21+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-detailtype
22+
* @default - none
23+
*/
24+
readonly detailType?: string;
25+
26+
/**
27+
* The URL subdomain of the endpoint.
28+
*
29+
* @example
30+
* // if the URL for the endpoint is https://abcde.veo.endpoints.event.amazonaws.com
31+
* 'abcde.veo'
32+
*
33+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-endpointid
34+
* @default - none
35+
*/
36+
readonly endpointId?: string;
37+
38+
/**
39+
* AWS resources, identified by Amazon Resource Name (ARN), which the event primarily concerns.
40+
*
41+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-resources
42+
* @default - none
43+
*/
44+
readonly resources?: string[];
45+
46+
/**
47+
* The source of the event.
48+
*
49+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-source
50+
* @default - none
51+
*/
52+
readonly source?: string;
53+
54+
/**
55+
* The time stamp of the event, per RFC3339.
56+
*
57+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-time
58+
* @default - the time stamp of the PutEvents call
59+
*/
60+
readonly time?: string;
61+
}
62+
63+
/**
64+
* An EventBridge Pipes target that sends messages to an EventBridge event bus.
65+
*/
66+
export class EventBridgeTarget implements ITarget {
67+
private eventBus: IEventBus;
68+
private eventBridgeParameters?: EventBridgeTargetParameters;
69+
public readonly targetArn: string;
70+
71+
constructor(eventBus: IEventBus, parameters?: EventBridgeTargetParameters) {
72+
this.eventBus = eventBus;
73+
this.targetArn = eventBus.eventBusArn;
74+
if (parameters) {
75+
this.eventBridgeParameters = parameters;
76+
for (const validate of [validateDetailType, validateEndpointId, validateSource, validateTime]) {
77+
validate(parameters);
78+
}
79+
}
80+
}
81+
82+
grantPush(grantee: IRole): void {
83+
this.eventBus.grantPutEventsTo(grantee);
84+
}
85+
86+
bind(pipe: IPipe): TargetConfig {
87+
if (!this.eventBridgeParameters) {
88+
return {
89+
targetParameters: {},
90+
};
91+
}
92+
93+
return {
94+
targetParameters: {
95+
inputTemplate: this.eventBridgeParameters.inputTransformation?.bind(pipe).inputTemplate,
96+
eventBridgeEventBusParameters: this.eventBridgeParameters,
97+
},
98+
};
99+
}
100+
}
101+
102+
function validateDetailType({ detailType }: EventBridgeTargetParameters) {
103+
if (detailType !== undefined && !Token.isUnresolved(detailType)) {
104+
if (detailType.length < 1 || detailType.length > 128) {
105+
throw new Error(`Detail type must be between 1 and 128 characters, received ${detailType.length}`);
106+
}
107+
}
108+
}
109+
110+
function validateEndpointId({ endpointId }: EventBridgeTargetParameters) {
111+
if (endpointId !== undefined && !Token.isUnresolved(endpointId)) {
112+
if (endpointId.length < 1 || endpointId.length > 50) {
113+
throw new Error(`Endpoint id must be between 1 and 50 characters, received ${endpointId.length}`);
114+
}
115+
}
116+
}
117+
118+
function validateSource({ source }: EventBridgeTargetParameters) {
119+
if (source !== undefined && !Token.isUnresolved(source)) {
120+
if (source.length < 1 || source.length > 256) {
121+
throw new Error(`Source must be between 1 and 256 characters, received ${source.length}`);
122+
}
123+
}
124+
}
125+
126+
function validateTime({ time }: EventBridgeTargetParameters) {
127+
if (time !== undefined && !Token.isUnresolved(time)) {
128+
if (time.length < 1 || time.length > 256) {
129+
throw new Error(`Time must be between 1 and 256 characters, received ${time.length}`);
130+
}
131+
}
132+
}

packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './api-destination';
2+
export * from './event-bridge';
23
export * from './kinesis';
34
export * from './lambda';
45
export * from './sqs';

packages/@aws-cdk/aws-pipes-targets-alpha/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@
8989
"aws-cdk-lib": "0.0.0",
9090
"constructs": "^10.0.0",
9191
"@aws-cdk/aws-pipes-alpha": "0.0.0",
92-
"@aws-cdk/integ-tests-alpha": "0.0.0"
92+
"@aws-cdk/integ-tests-alpha": "0.0.0",
93+
"@aws-cdk/aws-pipes-sources-alpha": "0.0.0"
9394
},
9495
"dependencies": {},
9596
"peerDependencies": {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`EventBridge should grant pipe role push access 1`] = `
4+
{
5+
"MyPipeRoleCBC8E9AB": {
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Statement": [
9+
{
10+
"Action": "sts:AssumeRole",
11+
"Effect": "Allow",
12+
"Principal": {
13+
"Service": "pipes.amazonaws.com",
14+
},
15+
},
16+
],
17+
"Version": "2012-10-17",
18+
},
19+
},
20+
"Type": "AWS::IAM::Role",
21+
},
22+
}
23+
`;
24+
25+
exports[`EventBridge should grant pipe role push access 2`] = `
26+
{
27+
"MyPipeRoleDefaultPolicy31387C20": {
28+
"Properties": {
29+
"PolicyDocument": {
30+
"Statement": [
31+
{
32+
"Action": "events:PutEvents",
33+
"Effect": "Allow",
34+
"Resource": {
35+
"Fn::GetAtt": [
36+
"MyEventBus251E60F8",
37+
"Arn",
38+
],
39+
},
40+
},
41+
],
42+
"Version": "2012-10-17",
43+
},
44+
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
45+
"Roles": [
46+
{
47+
"Ref": "MyPipeRoleCBC8E9AB",
48+
},
49+
],
50+
},
51+
"Type": "AWS::IAM::Policy",
52+
},
53+
}
54+
`;

0 commit comments

Comments
 (0)