Skip to content

Commit 37537fe

Browse files
authored
feat(iot): Action to send messages to SQS queues (#18087)
Fixes #17699 ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 82fa742 commit 37537fe

File tree

7 files changed

+315
-0
lines changed

7 files changed

+315
-0
lines changed

packages/@aws-cdk/aws-iot-actions/README.md

+25
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Currently supported are:
2727
- Capture CloudWatch metrics
2828
- Change state for a CloudWatch alarm
2929
- Put records to Kinesis Data Firehose stream
30+
- Send messages to SQS queues
3031

3132
## Invoke a Lambda function
3233

@@ -209,3 +210,27 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
209210
],
210211
});
211212
```
213+
214+
## Send messages to an SQS queue
215+
216+
The code snippet below creates an AWS IoT Rule that send messages
217+
to an SQS queue when it is triggered:
218+
219+
```ts
220+
import * as iot from '@aws-cdk/aws-iot';
221+
import * as actions from '@aws-cdk/aws-iot-actions';
222+
import * as sqs from '@aws-cdk/aws-sqs';
223+
224+
const queue = new sqs.Queue(this, 'MyQueue');
225+
226+
const topicRule = new iot.TopicRule(this, 'TopicRule', {
227+
sql: iot.IotSql.fromStringAsVer20160323(
228+
"SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'",
229+
),
230+
actions: [
231+
new actions.SqsQueueAction(queue, {
232+
useBase64: true, // optional property, default is 'false'
233+
}),
234+
]
235+
});
236+
```

packages/@aws-cdk/aws-iot-actions/lib/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ export * from './common-action-props';
55
export * from './firehose-stream-action';
66
export * from './lambda-function-action';
77
export * from './s3-put-object-action';
8+
export * from './sqs-queue-action';
9+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import * as iam from '@aws-cdk/aws-iam';
2+
import * as iot from '@aws-cdk/aws-iot';
3+
import * as sqs from '@aws-cdk/aws-sqs';
4+
import { CommonActionProps } from './common-action-props';
5+
import { singletonActionRole } from './private/role';
6+
7+
/**
8+
* Configuration properties of an action for SQS.
9+
*/
10+
export interface SqsQueueActionProps extends CommonActionProps {
11+
/**
12+
* Specifies whether to use Base64 encoding.
13+
*
14+
* @default false
15+
*/
16+
readonly useBase64?: boolean
17+
}
18+
19+
/**
20+
* The action to write the data from an MQTT message to an Amazon SQS queue.
21+
*/
22+
export class SqsQueueAction implements iot.IAction {
23+
private readonly role?: iam.IRole;
24+
private readonly queue: sqs.IQueue;
25+
private readonly useBase64?: boolean;
26+
27+
/**
28+
* @param queue The Amazon SQS queue to which to write data.
29+
* @param props Optional properties to not use default
30+
*/
31+
constructor(queue: sqs.IQueue, props: SqsQueueActionProps = {}) {
32+
this.queue = queue;
33+
this.role = props.role;
34+
this.useBase64 = props.useBase64;
35+
}
36+
37+
bind(rule: iot.ITopicRule): iot.ActionConfig {
38+
const role = this.role ?? singletonActionRole(rule);
39+
role.addToPrincipalPolicy(new iam.PolicyStatement({
40+
actions: ['sqs:SendMessage'],
41+
resources: [this.queue.queueArn],
42+
}));
43+
44+
return {
45+
configuration: {
46+
sqs: {
47+
queueUrl: this.queue.queueUrl,
48+
useBase64: this.useBase64,
49+
roleArn: role.roleArn,
50+
},
51+
},
52+
};
53+
}
54+
}

packages/@aws-cdk/aws-iot-actions/package.json

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
"@aws-cdk/aws-lambda": "0.0.0",
8888
"@aws-cdk/aws-logs": "0.0.0",
8989
"@aws-cdk/aws-s3": "0.0.0",
90+
"@aws-cdk/aws-sqs": "0.0.0",
9091
"@aws-cdk/core": "0.0.0",
9192
"case": "1.6.3",
9293
"constructs": "^3.3.69"
@@ -100,6 +101,7 @@
100101
"@aws-cdk/aws-lambda": "0.0.0",
101102
"@aws-cdk/aws-logs": "0.0.0",
102103
"@aws-cdk/aws-s3": "0.0.0",
104+
"@aws-cdk/aws-sqs": "0.0.0",
103105
"@aws-cdk/core": "0.0.0",
104106
"constructs": "^3.3.69"
105107
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
{
2+
"Resources": {
3+
"TopicRule40A4EA44": {
4+
"Type": "AWS::IoT::TopicRule",
5+
"Properties": {
6+
"TopicRulePayload": {
7+
"Actions": [
8+
{
9+
"Sqs": {
10+
"QueueUrl": {
11+
"Ref": "MyQueueE6CA6235"
12+
},
13+
"RoleArn": {
14+
"Fn::GetAtt": [
15+
"TopicRuleTopicRuleActionRole246C4F77",
16+
"Arn"
17+
]
18+
}
19+
}
20+
}
21+
],
22+
"AwsIotSqlVersion": "2016-03-23",
23+
"Sql": "SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'"
24+
}
25+
}
26+
},
27+
"TopicRuleTopicRuleActionRole246C4F77": {
28+
"Type": "AWS::IAM::Role",
29+
"Properties": {
30+
"AssumeRolePolicyDocument": {
31+
"Statement": [
32+
{
33+
"Action": "sts:AssumeRole",
34+
"Effect": "Allow",
35+
"Principal": {
36+
"Service": "iot.amazonaws.com"
37+
}
38+
}
39+
],
40+
"Version": "2012-10-17"
41+
}
42+
}
43+
},
44+
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
45+
"Type": "AWS::IAM::Policy",
46+
"Properties": {
47+
"PolicyDocument": {
48+
"Statement": [
49+
{
50+
"Action": "sqs:SendMessage",
51+
"Effect": "Allow",
52+
"Resource": {
53+
"Fn::GetAtt": [
54+
"MyQueueE6CA6235",
55+
"Arn"
56+
]
57+
}
58+
}
59+
],
60+
"Version": "2012-10-17"
61+
},
62+
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
63+
"Roles": [
64+
{
65+
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
66+
}
67+
]
68+
}
69+
},
70+
"MyQueueE6CA6235": {
71+
"Type": "AWS::SQS::Queue",
72+
"UpdateReplacePolicy": "Delete",
73+
"DeletionPolicy": "Delete"
74+
}
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/// !cdk-integ pragma:ignore-assets
2+
import * as iot from '@aws-cdk/aws-iot';
3+
import * as sqs from '@aws-cdk/aws-sqs';
4+
import * as cdk from '@aws-cdk/core';
5+
import * as actions from '../../lib';
6+
7+
const app = new cdk.App();
8+
9+
class TestStack extends cdk.Stack {
10+
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
11+
super(scope, id, props);
12+
13+
const topicRule = new iot.TopicRule(this, 'TopicRule', {
14+
sql: iot.IotSql.fromStringAsVer20160323(
15+
"SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'",
16+
),
17+
});
18+
19+
const queue = new sqs.Queue(this, 'MyQueue', {
20+
removalPolicy: cdk.RemovalPolicy.DESTROY,
21+
});
22+
topicRule.addAction(new actions.SqsQueueAction(queue));
23+
}
24+
}
25+
26+
new TestStack(app, 'test-stack');
27+
app.synth();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import { Template, Match } from '@aws-cdk/assertions';
2+
import * as iam from '@aws-cdk/aws-iam';
3+
import * as iot from '@aws-cdk/aws-iot';
4+
import * as sqs from '@aws-cdk/aws-sqs';
5+
import * as cdk from '@aws-cdk/core';
6+
import * as actions from '../../lib';
7+
8+
test('Default SQS queue action', () => {
9+
// GIVEN
10+
const stack = new cdk.Stack();
11+
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
12+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
13+
});
14+
const queue = sqs.Queue.fromQueueArn(stack, 'MyQueue', 'arn:aws:sqs::123456789012:test-queue');
15+
16+
// WHEN
17+
topicRule.addAction(new actions.SqsQueueAction(queue));
18+
19+
// THEN
20+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
21+
TopicRulePayload: {
22+
Actions: [
23+
{
24+
Sqs: {
25+
QueueUrl: {
26+
'Fn::Join': ['', [
27+
'https://sqs..',
28+
{ Ref: 'AWS::URLSuffix' },
29+
'/123456789012/test-queue',
30+
]],
31+
},
32+
RoleArn: {
33+
'Fn::GetAtt': [
34+
'MyTopicRuleTopicRuleActionRoleCE2D05DA',
35+
'Arn',
36+
],
37+
},
38+
},
39+
},
40+
],
41+
},
42+
});
43+
44+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
45+
AssumeRolePolicyDocument: {
46+
Statement: [
47+
{
48+
Action: 'sts:AssumeRole',
49+
Effect: 'Allow',
50+
Principal: {
51+
Service: 'iot.amazonaws.com',
52+
},
53+
},
54+
],
55+
Version: '2012-10-17',
56+
},
57+
});
58+
59+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
60+
PolicyDocument: {
61+
Statement: [
62+
{
63+
Action: 'sqs:SendMessage',
64+
Effect: 'Allow',
65+
Resource: 'arn:aws:sqs::123456789012:test-queue',
66+
},
67+
],
68+
Version: '2012-10-17',
69+
},
70+
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7',
71+
Roles: [
72+
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' },
73+
],
74+
});
75+
});
76+
77+
test('Can set useBase64', () => {
78+
// GIVEN
79+
const stack = new cdk.Stack();
80+
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
81+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
82+
});
83+
const queue = sqs.Queue.fromQueueArn(stack, 'MyQueue', 'arn:aws:sqs::123456789012:test-queue');
84+
85+
// WHEN
86+
topicRule.addAction(new actions.SqsQueueAction(queue, {
87+
useBase64: true,
88+
}));
89+
90+
// THEN
91+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
92+
TopicRulePayload: {
93+
Actions: [
94+
Match.objectLike({ Sqs: { UseBase64: true } }),
95+
],
96+
},
97+
});
98+
});
99+
100+
test('Can set role', () => {
101+
// GIVEN
102+
const stack = new cdk.Stack();
103+
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
104+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
105+
});
106+
const queue = sqs.Queue.fromQueueArn(stack, 'MyQueue', 'arn:aws:sqs::123456789012:test-queue');
107+
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest');
108+
109+
// WHEN
110+
topicRule.addAction(new actions.SqsQueueAction(queue, { role }));
111+
112+
// THEN
113+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
114+
TopicRulePayload: {
115+
Actions: [
116+
Match.objectLike({
117+
Sqs: {
118+
RoleArn: 'arn:aws:iam::123456789012:role/ForTest',
119+
},
120+
}),
121+
],
122+
},
123+
});
124+
125+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
126+
PolicyName: 'MyRolePolicy64AB00A5',
127+
Roles: ['ForTest'],
128+
});
129+
});

0 commit comments

Comments
 (0)