Skip to content

Commit b36bc11

Browse files
authored
feat(lambda-event-sources): add kafka consumerGroupId support (#21791)
This PR adds the capability to specify the consumerGroupId in the event-source-mapping when connection to Kafka. Adds the feature described in issue #21734 This features allows you to specify the consumerGroupId while connecting to a Kafka cluster. This feature was recently annouced https://aws.amazon.com/blogs/compute/using-custom-consumer-group-id-support-for-the-aws-lambda-event-sources-for-msk-and-self-managed-kafka/ and wasn't part of the cdk Kafka construct before. Things done: * Added missing attributes to class EventSourceMapping * amazonManagedKafkaEventSourceConfig * selfManagedKafkaEventSourceConfig * Added validation for the consumerGroupId value based in [CfnSpec](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html) * Added a common api `consumerGroupId` for adding the consumerGroupId independant if selfManaged or awsManaged * Updated existing integration test for SelfManagesKafkaConfig * The ManagedKafka Config is not integration tested because it requires a ManagedKafkaCluster which is not possible to deploy right now via cdk * Added Tests for consumerGroupId validation * Added Tests for template synth for SelfManagedKafka and AwsManagedKafka ---- ### All Submissions: * [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) ### Adding new Unconventional Dependencies: * [ ] This PR adds new unconventional dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md/#adding-new-unconventional-dependencies) ### New Features * [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/main/INTEGRATION_TESTS.md)? * [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)? *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent bcdd2a8 commit b36bc11

File tree

9 files changed

+150
-1
lines changed

9 files changed

+150
-1
lines changed

packages/@aws-cdk/aws-lambda-event-sources/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,14 @@ declare const secret: Secret;
264264
// (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption
265265
declare const encryption: Secret;
266266

267+
// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used.
268+
const consumerGroupId: "my-consumer-group-id";
269+
267270
declare const myFunction: lambda.Function;
268271
myFunction.addEventSource(new SelfManagedKafkaEventSource({
269272
bootstrapServers: bootstrapServers,
270273
topic: topic,
274+
consumerGroupId: consumerGroupId,
271275
secret: secret,
272276
batchSize: 100, // default
273277
startingPosition: lambda.StartingPosition.TRIM_HORIZON,

packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts

+9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
2222
* @default none
2323
*/
2424
readonly secret?: secretsmanager.ISecret
25+
/**
26+
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'.
27+
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id
28+
*
29+
* @default - none
30+
*/
31+
readonly consumerGroupId?: string;
2532
}
2633

2734
/**
@@ -125,6 +132,7 @@ export class ManagedKafkaEventSource extends StreamEventSource {
125132
startingPosition: this.innerProps.startingPosition,
126133
sourceAccessConfigurations: this.sourceAccessConfigurations(),
127134
kafkaTopic: this.innerProps.topic,
135+
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
128136
}),
129137
);
130138

@@ -199,6 +207,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
199207
this.enrichMappingOptions({
200208
kafkaBootstrapServers: this.innerProps.bootstrapServers,
201209
kafkaTopic: this.innerProps.topic,
210+
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
202211
startingPosition: this.innerProps.startingPosition,
203212
sourceAccessConfigurations: this.sourceAccessConfigurations(),
204213
}),

packages/@aws-cdk/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
4848
new SelfManagedKafkaEventSource({
4949
bootstrapServers,
5050
topic: 'my-test-topic',
51+
consumerGroupId: 'myTestConsumerGroup',
5152
secret: clientCertificatesSecret,
5253
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
5354
rootCACertificate: rootCASecret,

packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/lambda-event-source-kafka-self-managed.template.json

+3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@
9898
]
9999
}
100100
},
101+
"SelfManagedKafkaEventSourceConfig": {
102+
"ConsumerGroupId": "myTestConsumerGroup"
103+
},
101104
"SourceAccessConfigurations": [
102105
{
103106
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",

packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/manifest.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@
6060
"/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [
6161
{
6262
"type": "aws:cdk:logicalId",
63-
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798"
63+
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798",
64+
"trace": [
65+
"!!DESTRUCTIVE_CHANGES: WILL_REPLACE"
66+
]
6467
}
6568
],
6669
"/lambda-event-source-kafka-self-managed/S/Resource": [

packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/tree.json

+3
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@
165165
]
166166
}
167167
},
168+
"selfManagedKafkaEventSourceConfig": {
169+
"consumerGroupId": "myTestConsumerGroup"
170+
},
168171
"sourceAccessConfigurations": [
169172
{
170173
"type": "CLIENT_CERTIFICATE_TLS_AUTH",

packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts

+56
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,62 @@ describe('KafkaEventSource', () => {
634634
});
635635
});
636636

637+
test('consumerGroupId can be set for SelfManagedKafkaEventSource', () => {
638+
// GIVEN
639+
const stack = new cdk.Stack();
640+
const fn = new TestFunction(stack, 'Fn');
641+
const kafkaTopic = 'some-topic';
642+
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
643+
const bootstrapServers = ['kafka-broker:9092'];
644+
const consumerGroupId = 'my-consumer-group-id';
645+
const eventSourceMapping = new sources.SelfManagedKafkaEventSource(
646+
{
647+
bootstrapServers: bootstrapServers,
648+
topic: kafkaTopic,
649+
secret: secret,
650+
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
651+
consumerGroupId: consumerGroupId,
652+
});
653+
// WHEN
654+
fn.addEventSource(eventSourceMapping);
655+
656+
// THEN
657+
const template = Template.fromStack(stack);
658+
template.hasResourceProperties('AWS::Lambda::EventSourceMapping', {
659+
SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId },
660+
});
661+
662+
});
663+
664+
test('consumerGroupId can be set for ManagedKafkaEventSource', () => {
665+
666+
// GIVEN
667+
const stack = new cdk.Stack();
668+
const fn = new TestFunction(stack, 'Fn');
669+
const clusterArn = 'some-arn';
670+
const kafkaTopic = 'some-topic';
671+
const consumerGroupId = 'my-consumer-group-id';
672+
673+
674+
const mskEventMapping = new sources.ManagedKafkaEventSource(
675+
{
676+
clusterArn,
677+
topic: kafkaTopic,
678+
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
679+
consumerGroupId,
680+
});
681+
682+
// WHEN
683+
fn.addEventSource(mskEventMapping);
684+
expect(mskEventMapping.eventSourceMappingId).toBeDefined();
685+
686+
const template = Template.fromStack(stack);
687+
template.hasResourceProperties('AWS::Lambda::EventSourceMapping', {
688+
AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId },
689+
});
690+
691+
});
692+
637693
test('ManagedKafkaEventSource name conforms to construct id rules', () => {
638694
// GIVEN
639695
const stack = new cdk.Stack();

packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts

+30
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,16 @@ export interface EventSourceMappingOptions {
215215
*/
216216
readonly kafkaBootstrapServers?: string[]
217217

218+
/**
219+
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id).
220+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-amazonmanagedkafkaeventsourceconfig.html
221+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html
222+
*
223+
* @default - none
224+
*/
225+
readonly kafkaConsumerGroupId?: string
226+
227+
218228
/**
219229
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
220230
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html
@@ -319,6 +329,10 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
319329
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
320330
}
321331

332+
if (props.kafkaConsumerGroupId) {
333+
this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId);
334+
}
335+
322336
let destinationConfig;
323337

324338
if (props.onFailure) {
@@ -332,6 +346,8 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
332346
selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } };
333347
}
334348

349+
let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined;
350+
335351
const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
336352
batchSize: props.batchSize,
337353
bisectBatchOnFunctionError: props.bisectBatchOnError,
@@ -350,9 +366,23 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
350366
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
351367
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
352368
selfManagedEventSource,
369+
selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined,
370+
amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined,
353371
});
354372
this.eventSourceMappingId = cfnEventSourceMapping.ref;
355373
}
374+
375+
private validateKafkaConsumerGroupIdOrThrow(kafkaConsumerGroupId: string) {
376+
if (kafkaConsumerGroupId.length > 200 ||kafkaConsumerGroupId.length < 1) {
377+
throw new Error('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
378+
}
379+
380+
const regex = new RegExp(/[a-zA-Z0-9-\/*:_+=.@-]*/);
381+
const patternMatch = regex.exec(kafkaConsumerGroupId);
382+
if (patternMatch === null || patternMatch[0] !== kafkaConsumerGroupId) {
383+
throw new Error('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
384+
}
385+
}
356386
}
357387

358388
/**

packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts

+40
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,46 @@ describe('event source mapping', () => {
154154
})).toThrow(/kafkaBootStrapServers must not be empty if set/);
155155
});
156156

157+
test('throws if kafkaConsumerGroupId is invalid', () => {
158+
expect(() => new EventSourceMapping(stack, 'test', {
159+
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
160+
kafkaConsumerGroupId: 'some invalid',
161+
target: fn,
162+
})).toThrow('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
163+
});
164+
165+
test('throws if kafkaConsumerGroupId is too long', () => {
166+
expect(() => new EventSourceMapping(stack, 'test', {
167+
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
168+
kafkaConsumerGroupId: 'x'.repeat(201),
169+
target: fn,
170+
})).toThrow('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
171+
});
172+
173+
test('not throws if kafkaConsumerGroupId is empty', () => {
174+
expect(() => new EventSourceMapping(stack, 'test', {
175+
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
176+
kafkaConsumerGroupId: '',
177+
target: fn,
178+
})).not.toThrow();
179+
});
180+
181+
test('not throws if kafkaConsumerGroupId is valid for amazon managed kafka', () => {
182+
expect(() => new EventSourceMapping(stack, 'test', {
183+
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
184+
kafkaConsumerGroupId: 'someValidConsumerGroupId',
185+
target: fn,
186+
})).not.toThrow();
187+
});
188+
189+
test('not throws if kafkaConsumerGroupId is valid for self managed kafka', () => {
190+
expect(() => new EventSourceMapping(stack, 'test', {
191+
kafkaBootstrapServers: ['kafka-broker-1:9092', 'kafka-broker-2:9092'],
192+
kafkaConsumerGroupId: 'someValidConsumerGroupId',
193+
target: fn,
194+
})).not.toThrow();
195+
});
196+
157197
test('eventSourceArn appears in stack', () => {
158198
const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', {
159199
type: 'String',

0 commit comments

Comments
 (0)