Skip to content

Commit 1480213

Browse files
authored
feat(iot): add Action to put record to Kinesis Data stream (#18321)
Fixes #17703 ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 6937564 commit 1480213

7 files changed

+347
-0
lines changed

packages/@aws-cdk/aws-iot-actions/README.md

+21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Currently supported are:
2626
- Put logs to CloudWatch Logs
2727
- Capture CloudWatch metrics
2828
- Change state for a CloudWatch alarm
29+
- Put records to Kinesis Data stream
2930
- Put records to Kinesis Data Firehose stream
3031
- Send messages to SQS queues
3132

@@ -172,6 +173,26 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
172173
});
173174
```
174175

176+
## Put records to Kinesis Data stream
177+
178+
The code snippet below creates an AWS IoT Rule that put records to Kinesis Data
179+
stream when it is triggered.
180+
181+
```ts
182+
import * as kinesis from '@aws-cdk/aws-kinesis';
183+
184+
const stream = new kinesis.Stream(this, 'MyStream');
185+
186+
const topicRule = new iot.TopicRule(this, 'TopicRule', {
187+
sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"),
188+
actions: [
189+
new actions.KinesisPutRecordAction(stream, {
190+
partitionKey: '${newuuid()}',
191+
}),
192+
],
193+
});
194+
```
195+
175196
## Put records to Kinesis Data Firehose stream
176197

177198
The code snippet below creates an AWS IoT Rule that put records to Put records

packages/@aws-cdk/aws-iot-actions/lib/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export * from './cloudwatch-put-metric-action';
33
export * from './cloudwatch-set-alarm-state-action';
44
export * from './common-action-props';
55
export * from './firehose-put-record-action';
6+
export * from './kinesis-put-record-action';
67
export * from './lambda-function-action';
78
export * from './s3-put-object-action';
89
export * from './sqs-queue-action';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import * as iam from '@aws-cdk/aws-iam';
2+
import * as iot from '@aws-cdk/aws-iot';
3+
import * as kinesis from '@aws-cdk/aws-kinesis';
4+
import { CommonActionProps } from './common-action-props';
5+
import { singletonActionRole } from './private/role';
6+
7+
/**
8+
* Configuration properties of an action for the Kinesis Data stream.
9+
*/
10+
export interface KinesisPutRecordActionProps extends CommonActionProps {
11+
/**
12+
* The partition key used to determine to which shard the data is written.
13+
* The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}).
14+
*
15+
* @see https://docs.aws.amazon.com/iot/latest/developerguide/iot-substitution-templates.html
16+
*
17+
* You can use the expression '${newuuid()}' if your payload does not have a high cardinarity property.
18+
* If you use empty string, this action use no partition key and all records will put same one shard.
19+
*
20+
* @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters
21+
*/
22+
readonly partitionKey: string;
23+
}
24+
25+
/**
26+
* The action to put the record from an MQTT message to the Kinesis Data stream.
27+
*/
28+
export class KinesisPutRecordAction implements iot.IAction {
29+
private readonly partitionKey?: string;
30+
private readonly role?: iam.IRole;
31+
32+
/**
33+
* @param stream The Kinesis Data stream to which to put records.
34+
* @param props Optional properties to not use default
35+
*/
36+
constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps) {
37+
this.partitionKey = props.partitionKey;
38+
this.role = props.role;
39+
}
40+
41+
bind(rule: iot.ITopicRule): iot.ActionConfig {
42+
const role = this.role ?? singletonActionRole(rule);
43+
role.addToPrincipalPolicy(new iam.PolicyStatement({
44+
actions: ['kinesis:PutRecord'],
45+
resources: [this.stream.streamArn],
46+
}));
47+
48+
return {
49+
configuration: {
50+
kinesis: {
51+
streamName: this.stream.streamName,
52+
partitionKey: this.partitionKey || undefined,
53+
roleArn: role.roleArn,
54+
},
55+
},
56+
};
57+
}
58+
}

packages/@aws-cdk/aws-iot-actions/package.json

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
"@aws-cdk/aws-cloudwatch": "0.0.0",
9191
"@aws-cdk/aws-iam": "0.0.0",
9292
"@aws-cdk/aws-iot": "0.0.0",
93+
"@aws-cdk/aws-kinesis": "0.0.0",
9394
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
9495
"@aws-cdk/aws-lambda": "0.0.0",
9596
"@aws-cdk/aws-logs": "0.0.0",
@@ -104,6 +105,7 @@
104105
"@aws-cdk/aws-cloudwatch": "0.0.0",
105106
"@aws-cdk/aws-iam": "0.0.0",
106107
"@aws-cdk/aws-iot": "0.0.0",
108+
"@aws-cdk/aws-kinesis": "0.0.0",
107109
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
108110
"@aws-cdk/aws-lambda": "0.0.0",
109111
"@aws-cdk/aws-logs": "0.0.0",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
{
2+
"Resources": {
3+
"TopicRule40A4EA44": {
4+
"Type": "AWS::IoT::TopicRule",
5+
"Properties": {
6+
"TopicRulePayload": {
7+
"Actions": [
8+
{
9+
"Kinesis": {
10+
"PartitionKey": "${timestamp()}",
11+
"RoleArn": {
12+
"Fn::GetAtt": [
13+
"TopicRuleTopicRuleActionRole246C4F77",
14+
"Arn"
15+
]
16+
},
17+
"StreamName": {
18+
"Ref": "MyStream5C050E93"
19+
}
20+
}
21+
}
22+
],
23+
"AwsIotSqlVersion": "2016-03-23",
24+
"Sql": "SELECT * FROM 'device/+/data'"
25+
}
26+
}
27+
},
28+
"TopicRuleTopicRuleActionRole246C4F77": {
29+
"Type": "AWS::IAM::Role",
30+
"Properties": {
31+
"AssumeRolePolicyDocument": {
32+
"Statement": [
33+
{
34+
"Action": "sts:AssumeRole",
35+
"Effect": "Allow",
36+
"Principal": {
37+
"Service": "iot.amazonaws.com"
38+
}
39+
}
40+
],
41+
"Version": "2012-10-17"
42+
}
43+
}
44+
},
45+
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
46+
"Type": "AWS::IAM::Policy",
47+
"Properties": {
48+
"PolicyDocument": {
49+
"Statement": [
50+
{
51+
"Action": "kinesis:PutRecord",
52+
"Effect": "Allow",
53+
"Resource": {
54+
"Fn::GetAtt": [
55+
"MyStream5C050E93",
56+
"Arn"
57+
]
58+
}
59+
}
60+
],
61+
"Version": "2012-10-17"
62+
},
63+
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
64+
"Roles": [
65+
{
66+
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
67+
}
68+
]
69+
}
70+
},
71+
"MyStream5C050E93": {
72+
"Type": "AWS::Kinesis::Stream",
73+
"Properties": {
74+
"RetentionPeriodHours": 24,
75+
"ShardCount": 3,
76+
"StreamEncryption": {
77+
"Fn::If": [
78+
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions",
79+
{
80+
"Ref": "AWS::NoValue"
81+
},
82+
{
83+
"EncryptionType": "KMS",
84+
"KeyId": "alias/aws/kinesis"
85+
}
86+
]
87+
},
88+
"StreamModeDetails": {
89+
"StreamMode": "PROVISIONED"
90+
}
91+
}
92+
}
93+
},
94+
"Conditions": {
95+
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions": {
96+
"Fn::Or": [
97+
{
98+
"Fn::Equals": [
99+
{
100+
"Ref": "AWS::Region"
101+
},
102+
"cn-north-1"
103+
]
104+
},
105+
{
106+
"Fn::Equals": [
107+
{
108+
"Ref": "AWS::Region"
109+
},
110+
"cn-northwest-1"
111+
]
112+
}
113+
]
114+
}
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import * as iot from '@aws-cdk/aws-iot';
2+
import * as kinesis from '@aws-cdk/aws-kinesis';
3+
import * as cdk from '@aws-cdk/core';
4+
import * as actions from '../../lib';
5+
6+
class TestStack extends cdk.Stack {
7+
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
8+
super(scope, id, props);
9+
10+
const topicRule = new iot.TopicRule(this, 'TopicRule', {
11+
sql: iot.IotSql.fromStringAsVer20160323(
12+
"SELECT * FROM 'device/+/data'",
13+
),
14+
});
15+
16+
const stream = new kinesis.Stream(this, 'MyStream', {
17+
shardCount: 3,
18+
});
19+
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
20+
partitionKey: '${timestamp()}',
21+
}));
22+
}
23+
}
24+
25+
const app = new cdk.App();
26+
new TestStack(app, 'test-kinesis-stream-action-stack');
27+
app.synth();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { Template, Match } from '@aws-cdk/assertions';
2+
import * as iam from '@aws-cdk/aws-iam';
3+
import * as iot from '@aws-cdk/aws-iot';
4+
import * as kinesis from '@aws-cdk/aws-kinesis';
5+
import * as cdk from '@aws-cdk/core';
6+
import * as actions from '../../lib';
7+
8+
test('Default kinesis stream action', () => {
9+
// GIVEN
10+
const stack = new cdk.Stack();
11+
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
12+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
13+
});
14+
const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream');
15+
16+
// WHEN
17+
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
18+
partitionKey: '${newuuid()}',
19+
}));
20+
21+
// THEN
22+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
23+
TopicRulePayload: {
24+
Actions: [
25+
{
26+
Kinesis: {
27+
StreamName: 'my-stream',
28+
PartitionKey: '${newuuid()}',
29+
RoleArn: {
30+
'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'],
31+
},
32+
},
33+
},
34+
],
35+
},
36+
});
37+
38+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
39+
AssumeRolePolicyDocument: {
40+
Statement: [
41+
{
42+
Action: 'sts:AssumeRole',
43+
Effect: 'Allow',
44+
Principal: {
45+
Service: 'iot.amazonaws.com',
46+
},
47+
},
48+
],
49+
Version: '2012-10-17',
50+
},
51+
});
52+
53+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
54+
PolicyDocument: {
55+
Statement: [
56+
{
57+
Action: 'kinesis:PutRecord',
58+
Effect: 'Allow',
59+
Resource: 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream',
60+
},
61+
],
62+
Version: '2012-10-17',
63+
},
64+
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7',
65+
Roles: [
66+
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' },
67+
],
68+
});
69+
});
70+
71+
test('passes undefined to partitionKey if empty string is given', () => {
72+
// GIVEN
73+
const stack = new cdk.Stack();
74+
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
75+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
76+
});
77+
const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream');
78+
79+
// WHEN
80+
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
81+
partitionKey: '',
82+
}));
83+
84+
// THEN
85+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
86+
TopicRulePayload: {
87+
Actions: [
88+
Match.objectLike({ Kinesis: { PartitionKey: Match.absent() } }),
89+
],
90+
},
91+
});
92+
});
93+
94+
test('can set role', () => {
95+
// GIVEN
96+
const stack = new cdk.Stack();
97+
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
98+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
99+
});
100+
const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream');
101+
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest');
102+
103+
// WHEN
104+
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
105+
partitionKey: '${newuuid()}',
106+
role,
107+
}));
108+
109+
// THEN
110+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
111+
TopicRulePayload: {
112+
Actions: [
113+
Match.objectLike({ Kinesis: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }),
114+
],
115+
},
116+
});
117+
118+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
119+
PolicyName: 'MyRolePolicy64AB00A5',
120+
Roles: ['ForTest'],
121+
});
122+
});

0 commit comments

Comments
 (0)