Skip to content

Commit a5fdf57

Browse files
authored
feat(pipes-targets): add SageMaker (#30696)
Add SageMaker pipeline as a Pipes target.
1 parent 048e753 commit a5fdf57

File tree

20 files changed

+34692
-5
lines changed

20 files changed

+34692
-5
lines changed

packages/@aws-cdk/aws-pipes-targets-alpha/README.md

+34
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
3030
* `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
3131
* `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
3232
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
33+
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
3334
* `targets.SfnStateMachine`: [Invoke a Step Functions state machine from an event source](#aws-step-functions-state-machine)
3435
* `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs)
3536

@@ -217,6 +218,39 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
217218
});
218219
```
219220

221+
### Amazon SageMaker Pipeline
222+
223+
A SageMaker pipeline can be used as a target for a pipe.
224+
The pipeline will receive the (enriched/filtered) source payload.
225+
226+
```ts
227+
declare const sourceQueue: sqs.Queue;
228+
declare const targetPipeline: sagemaker.IPipeline;
229+
230+
const pipelineTarget = new targets.SageMakerTarget(targetPipeline);
231+
232+
const pipe = new pipes.Pipe(this, 'Pipe', {
233+
source: new SqsSource(sourceQueue),
234+
target: pipelineTarget,
235+
});
236+
```
237+
238+
The input to the target pipeline can be transformed:
239+
240+
```ts
241+
declare const sourceQueue: sqs.Queue;
242+
declare const targetPipeline: sagemaker.IPipeline;
243+
244+
const pipelineTarget = new targets.SageMakerTarget(targetPipeline, {
245+
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
246+
});
247+
248+
const pipe = new pipes.Pipe(this, 'Pipe', {
249+
source: new SqsSource(sourceQueue),
250+
target: pipelineTarget,
251+
});
252+
```
253+
220254
### AWS Step Functions State Machine
221255

222256
A Step Functions state machine can be used as a target for a pipe.

packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ export * from './cloudwatch-logs';
33
export * from './event-bridge';
44
export * from './kinesis';
55
export * from './lambda';
6+
export * from './sagemaker';
67
export * from './sqs';
78
export * from './stepfunctions';

packages/@aws-cdk/aws-pipes-targets-alpha/lib/lambda.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export interface LambdaFunctionParameters {
1111
* The input transformation to apply to the message before sending it to the target.
1212
*
1313
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
14-
* @default none
14+
* @default - none
1515
*/
1616
readonly inputTransformation?: IInputTransformation;
1717

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
2+
import { IRole } from 'aws-cdk-lib/aws-iam';
3+
import { IPipeline } from 'aws-cdk-lib/aws-sagemaker';
4+
5+
/**
6+
* SageMaker target properties.
7+
*/
8+
export interface SageMakerTargetParameters {
9+
/**
10+
* The input transformation to apply to the message before sending it to the target.
11+
*
12+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
13+
* @default - none
14+
*/
15+
readonly inputTransformation?: IInputTransformation;
16+
17+
/**
18+
* List of parameter names and values for SageMaker Model Building Pipeline execution.
19+
*
20+
* The Name/Value pairs are passed to start execution of the pipeline.
21+
*
22+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsagemakerpipelineparameters.html#cfn-pipes-pipe-pipetargetsagemakerpipelineparameters-pipelineparameterlist
23+
* @default - none
24+
*/
25+
readonly pipelineParameters?: Record<string, string>;
26+
}
27+
28+
/**
29+
* An EventBridge Pipes target that sends messages to a SageMaker pipeline.
30+
*/
31+
export class SageMakerTarget implements ITarget {
32+
private pipeline: IPipeline;
33+
private sageMakerParameters?: SageMakerTargetParameters;
34+
private pipelineParameters?: Record<string, string>;
35+
public readonly targetArn: string;
36+
37+
constructor(pipeline: IPipeline, parameters?: SageMakerTargetParameters) {
38+
this.pipeline = pipeline;
39+
this.targetArn = pipeline.pipelineArn;
40+
this.sageMakerParameters = parameters;
41+
this.pipelineParameters = this.sageMakerParameters?.pipelineParameters;
42+
}
43+
44+
grantPush(grantee: IRole): void {
45+
this.pipeline.grantStartPipelineExecution(grantee);
46+
}
47+
48+
bind(pipe: IPipe): TargetConfig {
49+
if (!this.sageMakerParameters) {
50+
return { targetParameters: {} };
51+
}
52+
53+
return {
54+
targetParameters: {
55+
inputTemplate: this.sageMakerParameters.inputTransformation?.bind(pipe).inputTemplate,
56+
sageMakerPipelineParameters: {
57+
pipelineParameterList: this.pipelineParameters ?
58+
Object.entries(this.pipelineParameters).map(([key, value]) => ({
59+
name: key,
60+
value: value,
61+
})) : undefined,
62+
},
63+
},
64+
};
65+
}
66+
}

packages/@aws-cdk/aws-pipes-targets-alpha/lib/sqs.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export interface SqsTargetParameters {
1010
* The input transformation to apply to the message before sending it to the target.
1111
*
1212
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
13-
* @default none
13+
* @default - none
1414
*/
1515
readonly inputTransformation?: IInputTransformation;
1616

@@ -20,15 +20,15 @@ export interface SqsTargetParameters {
2020
* The token used for deduplication of sent messages.
2121
*
2222
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagededuplicationid
23-
* @default none
23+
* @default - none
2424
*/
2525
readonly messageDeduplicationId?: string;
2626

2727
/**
2828
* The FIFO message group ID to use as the target.
2929
*
3030
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagegroupid
31-
* @default none
31+
* @default - none
3232
*/
3333
readonly messageGroupId?: string;
3434
}

packages/@aws-cdk/aws-pipes-targets-alpha/lib/stepfunctions.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export interface SfnStateMachineParameters {
1111
* The input transformation to apply to the message before sending it to the target.
1212
*
1313
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
14-
* @default none
14+
* @default - none
1515
*/
1616
readonly inputTransformation?: IInputTransformation;
1717

packages/@aws-cdk/aws-pipes-targets-alpha/rosetta/default.ts-fixture

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
33
import * as events from 'aws-cdk-lib/aws-events';
44
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
55
import * as logs from 'aws-cdk-lib/aws-logs';
6+
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
67
import * as sqs from 'aws-cdk-lib/aws-sqs';
78
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
89
import * as lambda from 'aws-cdk-lib/aws-lambda';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`SageMaker should grant pipe role push access 1`] = `
4+
{
5+
"MyPipeRoleCBC8E9AB": {
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Statement": [
9+
{
10+
"Action": "sts:AssumeRole",
11+
"Effect": "Allow",
12+
"Principal": {
13+
"Service": "pipes.amazonaws.com",
14+
},
15+
},
16+
],
17+
"Version": "2012-10-17",
18+
},
19+
},
20+
"Type": "AWS::IAM::Role",
21+
},
22+
}
23+
`;
24+
25+
exports[`SageMaker should grant pipe role push access 2`] = `
26+
{
27+
"MyPipeRoleDefaultPolicy31387C20": {
28+
"Properties": {
29+
"PolicyDocument": {
30+
"Statement": [
31+
{
32+
"Action": "sagemaker:StartPipelineExecution",
33+
"Effect": "Allow",
34+
"Resource": "MyPipeline",
35+
},
36+
],
37+
"Version": "2012-10-17",
38+
},
39+
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
40+
"Roles": [
41+
{
42+
"Ref": "MyPipeRoleCBC8E9AB",
43+
},
44+
],
45+
},
46+
"Type": "AWS::IAM::Policy",
47+
},
48+
}
49+
`;

packages/@aws-cdk/aws-pipes-targets-alpha/test/integ.sagemaker.js.snapshot/asset.44e9c4d7a5d3fd2d677e1a7e416b2b56f6b0104bd5eff9cac5557b4c65a9dc61/index.js

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)