Skip to content

Commit 7ac1215

Browse files
authored
feat(iot): add Action to republish MQTT messages to another MQTT topic (#18661)
resolve #17701 ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 203cd9a commit 7ac1215

File tree

6 files changed

+287
-0
lines changed

6 files changed

+287
-0
lines changed

Diff for: packages/@aws-cdk/aws-iot-actions/README.md

+17
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ supported AWS Services. Instances of these classes should be passed to
2121

2222
Currently supported are:
2323

24+
- Republish a message to another MQTT topic
2425
- Invoke a Lambda function
2526
- Put objects to a S3 bucket
2627
- Put logs to CloudWatch Logs
@@ -30,6 +31,22 @@ Currently supported are:
3031
- Put records to Kinesis Data Firehose stream
3132
- Send messages to SQS queues
3233

34+
## Republish a message to another MQTT topic
35+
36+
The code snippet below creates an AWS IoT Rule that republish a message to
37+
another MQTT topic when it is triggered.
38+
39+
```ts
40+
new iot.TopicRule(this, 'TopicRule', {
41+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id, timestamp() as timestamp, temperature FROM 'device/+/data'"),
42+
actions: [
43+
new actions.IotRepublishMqttAction('${topic()}/republish', {
44+
qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE, // optional property, default is MqttQualityOfService.ZERO_OR_MORE_TIMES
45+
}),
46+
],
47+
});
48+
```
49+
3350
## Invoke a Lambda function
3451

3552
The code snippet below creates an AWS IoT Rule that invoke a Lambda function

Diff for: packages/@aws-cdk/aws-iot-actions/lib/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export * from './cloudwatch-put-metric-action';
33
export * from './cloudwatch-set-alarm-state-action';
44
export * from './common-action-props';
55
export * from './firehose-put-record-action';
6+
export * from './iot-republish-action';
67
export * from './kinesis-put-record-action';
78
export * from './lambda-function-action';
89
export * from './s3-put-object-action';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import * as iam from '@aws-cdk/aws-iam';
2+
import * as iot from '@aws-cdk/aws-iot';
3+
import { CommonActionProps } from './common-action-props';
4+
import { singletonActionRole } from './private/role';
5+
6+
/**
7+
* MQTT Quality of Service (QoS) indicates the level of assurance for delivery of an MQTT Message.
8+
*
9+
* @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos
10+
*/
11+
export enum MqttQualityOfService {
12+
/**
13+
* QoS level 0. Sent zero or more times.
14+
* This level should be used for messages that are sent over reliable communication links or that can be missed without a problem.
15+
*/
16+
ZERO_OR_MORE_TIMES,
17+
18+
/**
19+
* QoS level 1. Sent at least one time, and then repeatedly until a PUBACK response is received.
20+
* The message is not considered complete until the sender receives a PUBACK response to indicate successful delivery.
21+
*/
22+
AT_LEAST_ONCE,
23+
}
24+
25+
/**
26+
* Configuration properties of an action to republish MQTT messages.
27+
*/
28+
export interface IotRepublishMqttActionProps extends CommonActionProps {
29+
/**
30+
* The Quality of Service (QoS) level to use when republishing messages.
31+
*
32+
* @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos
33+
*
34+
* @default MqttQualityOfService.ZERO_OR_MORE_TIMES
35+
*/
36+
readonly qualityOfService?: MqttQualityOfService;
37+
}
38+
39+
/**
40+
* The action to put the record from an MQTT message to republish another MQTT topic.
41+
*/
42+
export class IotRepublishMqttAction implements iot.IAction {
43+
private readonly qualityOfService?: MqttQualityOfService;
44+
private readonly role?: iam.IRole;
45+
46+
/**
47+
* @param topic The MQTT topic to which to republish the message.
48+
* @param props Optional properties to not use default.
49+
*/
50+
constructor(private readonly topic: string, props: IotRepublishMqttActionProps = {}) {
51+
this.qualityOfService = props.qualityOfService;
52+
this.role = props.role;
53+
}
54+
55+
bind(rule: iot.ITopicRule): iot.ActionConfig {
56+
const role = this.role ?? singletonActionRole(rule);
57+
role.addToPrincipalPolicy(new iam.PolicyStatement({
58+
actions: ['iot:Publish'],
59+
resources: ['*'],
60+
}));
61+
62+
return {
63+
configuration: {
64+
republish: {
65+
topic: this.topic,
66+
qos: this.qualityOfService,
67+
roleArn: role.roleArn,
68+
},
69+
},
70+
};
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
{
2+
"Resources": {
3+
"TopicRule40A4EA44": {
4+
"Type": "AWS::IoT::TopicRule",
5+
"Properties": {
6+
"TopicRulePayload": {
7+
"Actions": [
8+
{
9+
"Republish": {
10+
"Qos": 1,
11+
"RoleArn": {
12+
"Fn::GetAtt": [
13+
"TopicRuleTopicRuleActionRole246C4F77",
14+
"Arn"
15+
]
16+
},
17+
"Topic": "${topic()}/republish"
18+
}
19+
}
20+
],
21+
"AwsIotSqlVersion": "2016-03-23",
22+
"Sql": "SELECT * FROM 'device/+/data'"
23+
}
24+
}
25+
},
26+
"TopicRuleTopicRuleActionRole246C4F77": {
27+
"Type": "AWS::IAM::Role",
28+
"Properties": {
29+
"AssumeRolePolicyDocument": {
30+
"Statement": [
31+
{
32+
"Action": "sts:AssumeRole",
33+
"Effect": "Allow",
34+
"Principal": {
35+
"Service": "iot.amazonaws.com"
36+
}
37+
}
38+
],
39+
"Version": "2012-10-17"
40+
}
41+
}
42+
},
43+
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
44+
"Type": "AWS::IAM::Policy",
45+
"Properties": {
46+
"PolicyDocument": {
47+
"Statement": [
48+
{
49+
"Action": "iot:Publish",
50+
"Effect": "Allow",
51+
"Resource": "*"
52+
}
53+
],
54+
"Version": "2012-10-17"
55+
},
56+
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
57+
"Roles": [
58+
{
59+
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
60+
}
61+
]
62+
}
63+
}
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import * as iot from '@aws-cdk/aws-iot';
2+
import * as cdk from '@aws-cdk/core';
3+
import * as actions from '../../lib';
4+
5+
class TestStack extends cdk.Stack {
6+
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
7+
super(scope, id, props);
8+
9+
const topicRule = new iot.TopicRule(this, 'TopicRule', {
10+
sql: iot.IotSql.fromStringAsVer20160323(
11+
"SELECT * FROM 'device/+/data'",
12+
),
13+
});
14+
15+
topicRule.addAction(
16+
new actions.IotRepublishMqttAction('${topic()}/republish', {
17+
qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE,
18+
}),
19+
);
20+
}
21+
}
22+
23+
const app = new cdk.App();
24+
new TestStack(app, 'iot-republish-action-test-stack');
25+
app.synth();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { Template, Match } from '@aws-cdk/assertions';
2+
import * as iam from '@aws-cdk/aws-iam';
3+
import * as iot from '@aws-cdk/aws-iot';
4+
import * as cdk from '@aws-cdk/core';
5+
import * as actions from '../../lib';
6+
7+
let stack: cdk.Stack;
8+
let topicRule:iot.TopicRule;
9+
beforeEach(() => {
10+
stack = new cdk.Stack();
11+
topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
12+
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
13+
});
14+
});
15+
16+
test('Default IoT republish action', () => {
17+
// WHEN
18+
topicRule.addAction(
19+
new actions.IotRepublishMqttAction('test-topic'),
20+
);
21+
22+
// THEN
23+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
24+
TopicRulePayload: {
25+
Actions: [
26+
{
27+
Republish: {
28+
Topic: 'test-topic',
29+
RoleArn: {
30+
'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'],
31+
},
32+
},
33+
},
34+
],
35+
},
36+
});
37+
38+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
39+
AssumeRolePolicyDocument: {
40+
Statement: [
41+
{
42+
Action: 'sts:AssumeRole',
43+
Effect: 'Allow',
44+
Principal: {
45+
Service: 'iot.amazonaws.com',
46+
},
47+
},
48+
],
49+
Version: '2012-10-17',
50+
},
51+
});
52+
53+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
54+
PolicyDocument: {
55+
Statement: [
56+
{
57+
Action: 'iot:Publish',
58+
Effect: 'Allow',
59+
Resource: '*',
60+
},
61+
],
62+
Version: '2012-10-17',
63+
},
64+
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7',
65+
Roles: [
66+
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' },
67+
],
68+
});
69+
});
70+
71+
test('can set qualityOfService', () => {
72+
// WHEN
73+
topicRule.addAction(
74+
new actions.IotRepublishMqttAction('test-topic', { qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE }),
75+
);
76+
77+
// THEN
78+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
79+
TopicRulePayload: {
80+
Actions: [
81+
Match.objectLike({ Republish: { Qos: 1 } }),
82+
],
83+
},
84+
});
85+
});
86+
87+
test('can set role', () => {
88+
// WHEN
89+
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest');
90+
topicRule.addAction(
91+
new actions.IotRepublishMqttAction('test-topic', { role }),
92+
);
93+
94+
// THEN
95+
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
96+
TopicRulePayload: {
97+
Actions: [
98+
Match.objectLike({ Republish: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }),
99+
],
100+
},
101+
});
102+
103+
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
104+
PolicyName: 'MyRolePolicy64AB00A5',
105+
Roles: ['ForTest'],
106+
});
107+
});

0 commit comments

Comments
 (0)