Skip to content

Commit 8b495f9

Browse files
authored
feat(pipes-enrichments): add Step Functions enrichment eventbridge pipes (#30495)
### Issue # (if applicable) Closes #29385. ### Reason for this change To use Step Functions state machine enrichment for eventbrige pipes ### Description of changes Add `StepFunctionsEnrichment` class. ### Description of how you validated changes Add unit test and integ tests. ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent f93c2ef commit 8b495f9

16 files changed

+33695
-0
lines changed

packages/@aws-cdk/aws-pipes-enrichments-alpha/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,26 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
4646
target: new SomeTarget(targetQueue),
4747
});
4848
```
49+
50+
### Step Functions state machine
51+
52+
Step Functions state machine can be used to enrich events of a pipe.
53+
54+
**Note:** EventBridge Pipes only supports Express workflows invoked synchronously.
55+
56+
> Visit [Amazon EventBridge Pipes event enrichment](https://docs.aws.amazon.com/eventbridge/latest/userguide/pipes-enrichment.html) for more details.
57+
58+
```ts
59+
declare const sourceQueue: sqs.Queue;
60+
declare const targetQueue: sqs.Queue;
61+
62+
declare const enrichmentStateMachine: stepfunctions.StateMachine;
63+
64+
const enrichment = new enrichments.StepFunctionsEnrichment(enrichmentStateMachine);
65+
66+
const pipe = new pipes.Pipe(this, 'Pipe', {
67+
source: new SomeSource(sourceQueue),
68+
enrichment,
69+
target: new SomeTarget(targetQueue),
70+
});
71+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from './lambda';
2+
export * from './stepfunctions';
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { EnrichmentParametersConfig, IEnrichment, IPipe, InputTransformation } from '@aws-cdk/aws-pipes-alpha';
2+
import { IRole } from 'aws-cdk-lib/aws-iam';
3+
import { IStateMachine, StateMachine, StateMachineType } from 'aws-cdk-lib/aws-stepfunctions';
4+
5+
/**
6+
* Properties for a StepFunctionsEnrichment
7+
*/
8+
export interface StepFunctionsEnrichmentProps {
9+
/**
10+
* The input transformation for the enrichment
11+
* @see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html
12+
* @default - None
13+
*/
14+
readonly inputTransformation?: InputTransformation;
15+
}
16+
17+
/**
18+
* A StepFunctions enrichment for a pipe
19+
*/
20+
export class StepFunctionsEnrichment implements IEnrichment {
21+
public readonly enrichmentArn: string;
22+
23+
private readonly inputTransformation?: InputTransformation;
24+
constructor(private readonly stateMachine: IStateMachine, props?: StepFunctionsEnrichmentProps) {
25+
if (stateMachine instanceof StateMachine
26+
&& (stateMachine.stateMachineType !== StateMachineType.EXPRESS)
27+
) {
28+
throw new Error(`EventBridge pipes only support EXPRESS workflows as enrichment, got ${stateMachine.stateMachineType}`);
29+
}
30+
this.enrichmentArn = stateMachine.stateMachineArn;
31+
this.inputTransformation = props?.inputTransformation;
32+
}
33+
34+
bind(pipe: IPipe): EnrichmentParametersConfig {
35+
return {
36+
enrichmentParameters: {
37+
inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate,
38+
},
39+
};
40+
}
41+
42+
grantInvoke(pipeRole: IRole): void {
43+
this.stateMachine.grantStartSyncExecution(pipeRole);
44+
}
45+
}
46+

packages/@aws-cdk/aws-pipes-enrichments-alpha/rosetta/default.ts-fixture

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import * as cdk from 'aws-cdk-lib';
33
import * as sqs from 'aws-cdk-lib/aws-sqs';
44
import * as lambda from 'aws-cdk-lib/aws-lambda';
5+
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
56
import { Construct } from 'constructs';
67
import * as pipes from '@aws-cdk/aws-pipes-alpha';
78
import * as enrichments from '@aws-cdk/aws-pipes-enrichments-alpha';
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`stepfunctions should grant pipe role invoke access 1`] = `
4+
{
5+
"EnrichmentStateMachineRoleDE810FCA": {
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Statement": [
9+
{
10+
"Action": "sts:AssumeRole",
11+
"Effect": "Allow",
12+
"Principal": {
13+
"Service": {
14+
"Fn::FindInMap": [
15+
"ServiceprincipalMap",
16+
{
17+
"Ref": "AWS::Region",
18+
},
19+
"states",
20+
],
21+
},
22+
},
23+
},
24+
],
25+
"Version": "2012-10-17",
26+
},
27+
},
28+
"Type": "AWS::IAM::Role",
29+
},
30+
"MyPipeRoleCBC8E9AB": {
31+
"Properties": {
32+
"AssumeRolePolicyDocument": {
33+
"Statement": [
34+
{
35+
"Action": "sts:AssumeRole",
36+
"Effect": "Allow",
37+
"Principal": {
38+
"Service": "pipes.amazonaws.com",
39+
},
40+
},
41+
],
42+
"Version": "2012-10-17",
43+
},
44+
},
45+
"Type": "AWS::IAM::Role",
46+
},
47+
}
48+
`;
49+
50+
exports[`stepfunctions should grant pipe role invoke access 2`] = `
51+
{
52+
"MyPipeRoleDefaultPolicy31387C20": {
53+
"Properties": {
54+
"PolicyDocument": {
55+
"Statement": [
56+
{
57+
"Action": "states:StartSyncExecution",
58+
"Effect": "Allow",
59+
"Resource": {
60+
"Ref": "EnrichmentStateMachine8BED6C4E",
61+
},
62+
},
63+
],
64+
"Version": "2012-10-17",
65+
},
66+
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
67+
"Roles": [
68+
{
69+
"Ref": "MyPipeRoleCBC8E9AB",
70+
},
71+
],
72+
},
73+
"Type": "AWS::IAM::Policy",
74+
},
75+
}
76+
`;

0 commit comments

Comments
 (0)