Skip to content

Commit b0975e4

Browse files
authored
feat(pipes-targets): add step function target (#29987)
### Issue #29665 Closes #29665 ### Reason for this change Step Function target is not supported yet by pipes-targets. ### Description of changes - Added step function as a pipes target. - I've decided to make the `invocationType` a required parameter, since this made the code clearer and make users aware of how they want the step function to be invoked. The [AWS::Pipes::Pipe PipeTargetStateMachineParameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetstatemachineparameters.html) has this as an optional parameter (defaulting to Request-Response), which can lead the user unknowingly in a broken pipe, because cdk's StateMachines [default](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_stepfunctions.StateMachine.html#statemachinetype) to Standard Workflow, which is not compatible with Request-Response Invocation Type. - Currently there seems no way to prevent users from creating a pipe with an imported Standard StateMachine and InvocationType Request-Response as the stateMachineType cant be read (or I dont know how :D) ### Description of how you validated changes - Added unit tests - Added integration test ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- I've talked with @RaphaelManke and he was fine for me opening up a PR (put him as a co-author nevertheless) :) *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 205163f commit b0975e4

File tree

16 files changed

+34071
-1
lines changed

16 files changed

+34071
-1
lines changed

packages/@aws-cdk/aws-pipes-targets-alpha/README.md

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ For more details see the service documentation:
2727

2828
Pipe targets are the end point of a EventBridge Pipe.
2929

30+
The following targets are supported:
31+
32+
1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
33+
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions)
34+
3035
### Amazon SQS
3136

3237
A SQS message queue can be used as a target for a pipe. Messages will be pushed to the queue.
@@ -43,7 +48,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
4348
});
4449
```
4550

46-
The target configuration can be transformed:
51+
The target input can be transformed:
4752

4853
```ts
4954
declare const sourceQueue: sqs.Queue;
@@ -63,3 +68,57 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
6368
target: pipeTarget
6469
});
6570
```
71+
72+
### AWS Step Functions State Machine
73+
74+
A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched/filtered) source payload.
75+
76+
```ts
77+
declare const sourceQueue: sqs.Queue;
78+
declare const targetStateMachine: sfn.IStateMachine;
79+
80+
const pipeTarget = new targets.SfnStateMachine(targetStateMachine,{});
81+
82+
const pipe = new pipes.Pipe(this, 'Pipe', {
83+
source: new SomeSource(sourceQueue),
84+
target: pipeTarget
85+
});
86+
```
87+
88+
Specifying the Invocation Type when the target State Machine is invoked:
89+
90+
```ts
91+
declare const sourceQueue: sqs.Queue;
92+
declare const targetStateMachine: sfn.IStateMachine;
93+
94+
const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
95+
{
96+
invocationType: targets.StateMachineInvocationType.FIRE_AND_FORGET,
97+
}
98+
);
99+
100+
101+
const pipe = new pipes.Pipe(this, 'Pipe', {
102+
source: new SomeSource(sourceQueue),
103+
target: pipeTarget
104+
});
105+
```
106+
107+
The input to the target State Machine can be transformed:
108+
109+
```ts
110+
declare const sourceQueue: sqs.Queue;
111+
declare const targetStateMachine: sfn.IStateMachine;
112+
113+
const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
114+
{
115+
inputTransformation: pipes.InputTransformation.fromObject({ body: '<$.body>' }),
116+
invocationType: targets.StateMachineInvocationType.FIRE_AND_FORGET,
117+
}
118+
);
119+
120+
const pipe = new pipes.Pipe(this, 'Pipe', {
121+
source: new SomeSource(sourceQueue),
122+
target: pipeTarget
123+
});
124+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from './sqs';
2+
export * from './stepfunctions';
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
2+
import { IRole } from 'aws-cdk-lib/aws-iam';
3+
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
4+
import { StateMachine, StateMachineType } from 'aws-cdk-lib/aws-stepfunctions';
5+
6+
/**
7+
* Parameters for the SfnStateMachine target
8+
*/
9+
export interface SfnStateMachineParameters {
10+
/**
11+
* The input transformation to apply to the message before sending it to the target.
12+
*
13+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
14+
* @default none
15+
*/
16+
readonly inputTransformation?: IInputTransformation;
17+
18+
/**
19+
* Specify whether to invoke the State Machine synchronously (`REQUEST_RESPONSE`) or asynchronously (`FIRE_AND_FORGET`).
20+
*
21+
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagededuplicationid
22+
* @default StateMachineInvocationType.FIRE_AND_FORGET
23+
*/
24+
readonly invocationType?: StateMachineInvocationType;
25+
}
26+
27+
/**
28+
* InvocationType for invoking the State Machine.
29+
* @see https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetStateMachineParameters.html
30+
*/
31+
export enum StateMachineInvocationType {
32+
/**
33+
* Invoke StepFunction asynchronously (`StartExecution`). See https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html for more details.
34+
*/
35+
FIRE_AND_FORGET = 'FIRE_AND_FORGET',
36+
37+
/**
38+
* Invoke StepFunction synchronously (`StartSyncExecution`) and wait for the execution to complete. See https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartSyncExecution.html for more details.
39+
*/
40+
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
41+
}
42+
43+
/**
44+
* An EventBridge Pipes target that sends messages to an AWS Step Functions State Machine.
45+
*/
46+
export class SfnStateMachine implements ITarget {
47+
public readonly targetArn: string;
48+
49+
private readonly stateMachine: sfn.IStateMachine;
50+
private readonly invocationType: StateMachineInvocationType;
51+
private readonly inputTemplate?: IInputTransformation;
52+
53+
constructor(stateMachine: sfn.IStateMachine, parameters: SfnStateMachineParameters) {
54+
this.stateMachine = stateMachine;
55+
this.targetArn = stateMachine.stateMachineArn;
56+
this.invocationType = parameters.invocationType?? StateMachineInvocationType.FIRE_AND_FORGET;
57+
this.inputTemplate = parameters.inputTransformation;
58+
59+
if (this.stateMachine instanceof StateMachine
60+
&& this.stateMachine.stateMachineType === StateMachineType.STANDARD
61+
&& this.invocationType === StateMachineInvocationType.REQUEST_RESPONSE) {
62+
throw new Error('STANDARD state machine workflows do not support the REQUEST_RESPONSE invocation type. Use FIRE_AND_FORGET instead.');
63+
}
64+
}
65+
66+
grantPush(grantee: IRole): void {
67+
if (this.invocationType === StateMachineInvocationType.FIRE_AND_FORGET) {
68+
this.stateMachine.grantStartExecution(grantee);
69+
} else {
70+
this.stateMachine.grantStartSyncExecution(grantee);
71+
}
72+
}
73+
74+
bind(pipe: IPipe): TargetConfig {
75+
return {
76+
targetParameters: {
77+
inputTemplate: this.inputTemplate?.bind(pipe).inputTemplate,
78+
stepFunctionStateMachineParameters: {
79+
invocationType: this.invocationType,
80+
},
81+
},
82+
};
83+
}
84+
}

packages/@aws-cdk/aws-pipes-targets-alpha/rosetta/default.ts-fixture

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Fixture with packages imported, but nothing else
22
import * as cdk from 'aws-cdk-lib';
33
import * as sqs from 'aws-cdk-lib/aws-sqs';
4+
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
45
import { Construct } from 'constructs';
56
import * as pipes from '@aws-cdk/aws-pipes-alpha';
67
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`step-function should grant pipe role push access (StartAsyncExecution) with default invocation type (FIRE_AND_FORGET) 1`] = `
4+
{
5+
"MySfnPipeRoleF1D0F697": {
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Statement": [
9+
{
10+
"Action": "sts:AssumeRole",
11+
"Effect": "Allow",
12+
"Principal": {
13+
"Service": "pipes.amazonaws.com",
14+
},
15+
},
16+
],
17+
"Version": "2012-10-17",
18+
},
19+
},
20+
"Type": "AWS::IAM::Role",
21+
},
22+
"MyStateMachineRoleD59FFEBC": {
23+
"Properties": {
24+
"AssumeRolePolicyDocument": {
25+
"Statement": [
26+
{
27+
"Action": "sts:AssumeRole",
28+
"Effect": "Allow",
29+
"Principal": {
30+
"Service": {
31+
"Fn::FindInMap": [
32+
"ServiceprincipalMap",
33+
{
34+
"Ref": "AWS::Region",
35+
},
36+
"states",
37+
],
38+
},
39+
},
40+
},
41+
],
42+
"Version": "2012-10-17",
43+
},
44+
},
45+
"Type": "AWS::IAM::Role",
46+
},
47+
}
48+
`;
49+
50+
exports[`step-function should grant pipe role push access (StartAsyncExecution) with invocation type FIRE_AND_FORGET 1`] = `
51+
{
52+
"MySfnPipeRoleF1D0F697": {
53+
"Properties": {
54+
"AssumeRolePolicyDocument": {
55+
"Statement": [
56+
{
57+
"Action": "sts:AssumeRole",
58+
"Effect": "Allow",
59+
"Principal": {
60+
"Service": "pipes.amazonaws.com",
61+
},
62+
},
63+
],
64+
"Version": "2012-10-17",
65+
},
66+
},
67+
"Type": "AWS::IAM::Role",
68+
},
69+
"MyStateMachineRoleD59FFEBC": {
70+
"Properties": {
71+
"AssumeRolePolicyDocument": {
72+
"Statement": [
73+
{
74+
"Action": "sts:AssumeRole",
75+
"Effect": "Allow",
76+
"Principal": {
77+
"Service": {
78+
"Fn::FindInMap": [
79+
"ServiceprincipalMap",
80+
{
81+
"Ref": "AWS::Region",
82+
},
83+
"states",
84+
],
85+
},
86+
},
87+
},
88+
],
89+
"Version": "2012-10-17",
90+
},
91+
},
92+
"Type": "AWS::IAM::Role",
93+
},
94+
}
95+
`;
96+
97+
exports[`step-function should grant pipe role push access (StartSyncExecution) with invocation type REQUEST-RESPONSE 1`] = `
98+
{
99+
"MySfnPipeRoleF1D0F697": {
100+
"Properties": {
101+
"AssumeRolePolicyDocument": {
102+
"Statement": [
103+
{
104+
"Action": "sts:AssumeRole",
105+
"Effect": "Allow",
106+
"Principal": {
107+
"Service": "pipes.amazonaws.com",
108+
},
109+
},
110+
],
111+
"Version": "2012-10-17",
112+
},
113+
},
114+
"Type": "AWS::IAM::Role",
115+
},
116+
"MyStateMachineRoleD59FFEBC": {
117+
"Properties": {
118+
"AssumeRolePolicyDocument": {
119+
"Statement": [
120+
{
121+
"Action": "sts:AssumeRole",
122+
"Effect": "Allow",
123+
"Principal": {
124+
"Service": {
125+
"Fn::FindInMap": [
126+
"ServiceprincipalMap",
127+
{
128+
"Ref": "AWS::Region",
129+
},
130+
"states",
131+
],
132+
},
133+
},
134+
},
135+
],
136+
"Version": "2012-10-17",
137+
},
138+
},
139+
"Type": "AWS::IAM::Role",
140+
},
141+
}
142+
`;

0 commit comments

Comments
 (0)