Skip to content

Commit 47a09b5

Browse files
authored
feat(scheduler-targets-alpha): KinesisStreamPutRecord Target (#27845)
This PR adds KinesisStreamPutRecord Target for EventBridge Scheduler. Closes #27451. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 14231f1 commit 47a09b5

16 files changed

+36157
-1
lines changed

packages/@aws-cdk/aws-scheduler-targets-alpha/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The following targets are supported:
3131
5. `targets.SnsPublish`: [Publish messages to an Amazon SNS topic](#publish-messages-to-an-amazon-sns-topic)
3232
6. `targets.EventBridgePutEvents`: [Put Events on EventBridge](#send-events-to-an-eventbridge-event-bus)
3333
7. `targets.InspectorStartAssessmentRun`: [Start an Amazon Inspector assessment run](#start-an-amazon-inspector-assessment-run)
34+
8. `targets.KinesisStreamPutRecord`: [Put a record to an Amazon Kinesis Data Streams](#put-a-record-to-an-amazon-kinesis-data-streams)
3435

3536
## Invoke a Lambda function
3637

@@ -225,3 +226,23 @@ new Schedule(this, 'Schedule', {
225226
target: new targets.InspectorStartAssessmentRun(assessmentTemplate),
226227
});
227228
```
229+
230+
## Put a record to an Amazon Kinesis Data Streams
231+
232+
Use the `KinesisStreamPutRecord` target to put a record to an Amazon Kinesis Data Streams.
233+
234+
The code snippet below creates an event rule with a stream as target which is
235+
called every hour by Event Bridge Scheduler.
236+
237+
```ts
238+
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
239+
240+
const stream = new kinesis.Stream(this, 'MyStream');
241+
242+
new Schedule(this, 'Schedule', {
243+
schedule: ScheduleExpression.rate(Duration.minutes(60)),
244+
target: new targets.KinesisStreamPutRecord(stream, {
245+
partitionKey: 'key',
246+
}),
247+
});
248+
```

packages/@aws-cdk/aws-scheduler-targets-alpha/lib/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export * from './codebuild-start-build';
22
export * from './event-bridge-put-events';
33
export * from './inspector-start-assessment-run';
4+
export * from './kinesis-stream-put-record';
45
export * from './lambda-invoke';
56
export * from './sns-publish';
67
export * from './sqs-send-message';
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { ISchedule, IScheduleTarget, ScheduleTargetConfig } from '@aws-cdk/aws-scheduler-alpha';
2+
import { Names, Token } from 'aws-cdk-lib';
3+
import { IRole } from 'aws-cdk-lib/aws-iam';
4+
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
5+
import { ScheduleTargetBase, ScheduleTargetBaseProps } from './target';
6+
import { sameEnvDimension } from './util';
7+
8+
/**
9+
* Properties for a Kinesis Data Streams Target
10+
*/
11+
export interface KinesisStreamPutRecordProps extends ScheduleTargetBaseProps {
12+
/**
13+
* The shard to which EventBridge Scheduler sends the event.
14+
*
15+
* The length must be between 1 and 256.
16+
*
17+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-scheduler-schedule-kinesisparameters.html
18+
*/
19+
readonly partitionKey: string;
20+
}
21+
22+
/**
23+
* Use an Amazon Kinesis Data Streams as a target for AWS EventBridge Scheduler.
24+
*/
25+
export class KinesisStreamPutRecord extends ScheduleTargetBase implements IScheduleTarget {
26+
constructor(
27+
private readonly stream: kinesis.IStream,
28+
private readonly props: KinesisStreamPutRecordProps,
29+
) {
30+
super(props, stream.streamArn);
31+
32+
if (!Token.isUnresolved(props.partitionKey) && (props.partitionKey.length < 1 || props.partitionKey.length > 256)) {
33+
throw new Error(`partitionKey length must be between 1 and 256, got ${props.partitionKey.length}`);
34+
}
35+
}
36+
37+
protected addTargetActionToRole(schedule: ISchedule, role: IRole): void {
38+
if (!sameEnvDimension(this.stream.env.region, schedule.env.region)) {
39+
throw new Error(`Cannot assign stream in region ${this.stream.env.region} to the schedule ${Names.nodeUniqueId(schedule.node)} in region ${schedule.env.region}. Both the schedule and the stream must be in the same region.`);
40+
}
41+
42+
if (!sameEnvDimension(this.stream.env.account, schedule.env.account)) {
43+
throw new Error(`Cannot assign stream in account ${this.stream.env.account} to the schedule ${Names.nodeUniqueId(schedule.node)} in account ${schedule.env.region}. Both the schedule and the stream must be in the same account.`);
44+
}
45+
46+
if (this.props.role && !sameEnvDimension(this.props.role.env.account, this.stream.env.account)) {
47+
throw new Error(`Cannot grant permission to execution role in account ${this.props.role.env.account} to invoke target ${Names.nodeUniqueId(this.stream.node)} in account ${this.stream.env.account}. Both the target and the execution role must be in the same account.`);
48+
}
49+
50+
this.stream.grantWrite(role);
51+
}
52+
53+
protected bindBaseTargetConfig(_schedule: ISchedule): ScheduleTargetConfig {
54+
return {
55+
...super.bindBaseTargetConfig(_schedule),
56+
kinesisParameters: {
57+
partitionKey: this.props.partitionKey,
58+
},
59+
};
60+
}
61+
}

0 commit comments

Comments
 (0)