Skip to content

Commit af5345e

Browse files
authored
feat(pipes): add LogDestination implementation (#31672)
Closes #31671.
1 parent 4db9565 commit af5345e

28 files changed

+54077
-23400
lines changed

packages/@aws-cdk/aws-pipes-alpha/README.md

+33-106
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818

1919

2020
EventBridge Pipes let you create source to target connections between several
21-
aws services. While transporting messages from a source to a target the messages
21+
AWS services. While transporting messages from a source to a target the messages
2222
can be filtered, transformed and enriched.
2323

2424
![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)
2525

26-
For more details see the service documentation:
27-
28-
[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
26+
For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).
2927

3028
## Pipe
3129

@@ -34,12 +32,12 @@ is a fully managed service that enables point-to-point integrations between
3432
event producers and consumers. Pipes can be used to connect several AWS services
3533
to each other, or to connect AWS services to external services.
3634

37-
A Pipe has a source and a target. The source events can be filtered and enriched
35+
A pipe has a source and a target. The source events can be filtered and enriched
3836
before reaching the target.
3937

4038
## Example - pipe usage
4139

42-
> The following code examples use an example implementation of a [source](#source) and [target](#target). In the future there will be separate packages for the sources and targets.
40+
> The following code examples use an example implementation of a [source](#source) and [target](#target).
4341
4442
To define a pipe you need to create a new `Pipe` construct. The `Pipe` construct needs a source and a target.
4543

@@ -58,49 +56,30 @@ Messages from the source are put into the body of the target message.
5856

5957
## Source
6058

61-
A source is a AWS Service that is polled. The following Sources are
62-
possible:
59+
A source is a AWS Service that is polled. The following sources are possible:
6360

6461
- [Amazon DynamoDB stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-dynamodb.html)
6562
- [Amazon Kinesis stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html)
6663
- [Amazon MQ broker](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-mq.html)
6764
- [Amazon MSK stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-msk.html)
68-
- [Self managed Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)
6965
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)
66+
- [Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)
7067

71-
> Currently no implementation exist for any of the supported sources. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
68+
Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
69+
kindly let us know by opening a GitHub issue or raising a PR.
7270

73-
### Example source implementation
71+
### Example source
7472

75-
```ts fixture=pipes-imports
76-
class SqsSource implements pipes.ISource {
77-
sourceArn: string;
78-
sourceParameters = undefined;
79-
80-
constructor(private readonly queue: sqs.Queue) {
81-
this.queue = queue;
82-
this.sourceArn = queue.queueArn;
83-
}
84-
85-
bind(_pipe: pipes.IPipe): pipes.SourceConfig {
86-
return {
87-
sourceParameters: this.sourceParameters,
88-
};
89-
}
90-
91-
grantRead(pipeRole: cdk.aws_iam.IRole): void {
92-
this.queue.grantConsumeMessages(pipeRole);
93-
}
94-
}
73+
```ts
74+
declare const sourceQueue: sqs.Queue;
75+
const pipeSource = new SqsSource(sourceQueue);
9576
```
9677

97-
A source implementation needs to provide the `sourceArn`, `sourceParameters` and grant the pipe role read access to the source.
98-
9978
## Filter
10079

101-
A Filter can be used to filter the events from the source before they are
80+
A filter can be used to filter the events from the source before they are
10281
forwarded to the enrichment or, if no enrichment is present, target step. Multiple filter expressions are possible.
103-
If one of the filter expressions matches the event is forwarded to the enrichment or target step.
82+
If one of the filter expressions matches, the event is forwarded to the enrichment or target step.
10483

10584
### Example - filter usage
10685

@@ -130,7 +109,7 @@ This example shows a filter that only forwards events with the `customerType` B2
130109

131110
You can define multiple filter pattern which are combined with a logical `OR`.
132111

133-
Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html)
112+
Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).
134113

135114
## Input transformation
136115

@@ -175,7 +154,7 @@ So when the following batch of input events is processed by the pipe
175154
]
176155
```
177156

178-
it is converted into the following payload.
157+
it is converted into the following payload:
179158

180159
```json
181160
[
@@ -189,7 +168,7 @@ it is converted into the following payload.
189168
]
190169
```
191170

192-
If the transformation is applied to a target it might be converted to a string representation. E.g. the resulting SQS message body looks like this.
171+
If the transformation is applied to a target it might be converted to a string representation. For example, the resulting SQS message body looks like this:
193172

194173
```json
195174
[
@@ -237,7 +216,7 @@ So when the following batch of input events is processed by the pipe
237216
]
238217
```
239218

240-
it is converted into the following target payload.
219+
it is converted into the following target payload:
241220

242221
```json
243222
[
@@ -283,8 +262,7 @@ This transformation forwards the static text to the target.
283262

284263
## Enrichment
285264

286-
In the enrichment step the (un)filtered payloads from the source can be used to
287-
invoke one of the following services
265+
In the enrichment step the (un)filtered payloads from the source can be used to invoke one of the following services:
288266

289267
- API destination
290268
- Amazon API Gateway
@@ -420,95 +398,44 @@ targets are supported:
420398
The target event can be transformed before it is forwarded to the target using
421399
the same input transformation as in the enrichment step.
422400

423-
### Example target implementation
424-
425-
> Currently no implementation exist for any of the supported targets. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
426-
427-
```ts fixture=pipes-imports
428-
class SqsTarget implements pipes.ITarget {
429-
targetArn: string;
430-
private inputTransformation: pipes.InputTransformation | undefined;
431-
432-
constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) {
433-
this.queue = queue;
434-
this.targetArn = queue.queueArn;
435-
this.inputTransformation = props?.inputTransformation
436-
}
437-
438-
bind(_pipe: pipes.Pipe): pipes.TargetConfig {
439-
return {
440-
targetParameters: {
441-
inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate,
442-
},
443-
};
444-
}
401+
### Example target
445402

446-
grantPush(pipeRole: cdk.aws_iam.IRole): void {
447-
this.queue.grantSendMessages(pipeRole);
448-
}
449-
}
403+
```ts
404+
declare const targetQueue: sqs.Queue;
405+
const pipeTarget = new SqsTarget(targetQueue);
450406
```
451407

452-
A target implementation needs to provide the `targetArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment.
453-
454408
## Log destination
455409

456410
A pipe can produce log events that are forwarded to different log destinations.
457411
You can configure multiple destinations, but all the destination share the same log level and log data.
458412
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).
459413

460414
The log level and data that is included in the log events is configured on the pipe class itself.
461-
Whereas the actual destination is defined independent.
462-
463-
### Example log destination implementation
415+
The actual destination is defined independently, and there are three options:
464416

465-
> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
466-
467-
468-
```ts fixture=pipes-imports
469-
class CloudwatchDestination implements pipes.ILogDestination {
470-
parameters: pipes.LogDestinationParameters;
471-
472-
constructor(private readonly logGroup: cdk.aws_logs.LogGroup) {
473-
this.logGroup = logGroup;
474-
this.parameters = {
475-
cloudwatchLogsLogDestination: {
476-
logGroupArn: logGroup.logGroupArn,
477-
},
478-
};
479-
}
480-
481-
bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig {
482-
return {
483-
parameters: this.parameters,
484-
};
485-
}
486-
487-
grantPush(pipeRole: cdk.aws_iam.IRole): void {
488-
this.logGroup.grantWrite(pipeRole);
489-
}
490-
}
491-
```
417+
1. `CloudwatchLogsLogDestination`
418+
2. `FirehoseLogDestination`
419+
3. `S3LogDestination`
492420

493421
### Example log destination usage
494422

495423
```ts
496424
declare const sourceQueue: sqs.Queue;
497425
declare const targetQueue: sqs.Queue;
498-
declare const loggroup: logs.LogGroup;
426+
declare const logGroup: logs.LogGroup;
427+
428+
const cwlLogDestination = new pipes.CloudwatchLogsLogDestination(logGroup);
499429

500430
const pipe = new pipes.Pipe(this, 'Pipe', {
501431
source: new SqsSource(sourceQueue),
502432
target: new SqsTarget(targetQueue),
503-
504433
logLevel: pipes.LogLevel.TRACE,
505434
logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],
506-
507-
logDestinations: [
508-
new CloudwatchDestination(loggroup),
509-
],
435+
logDestinations: [cwlLogDestination],
510436
});
511437
```
512438

513-
This example uses a cloudwatch loggroup to store the log emitted during a pipe execution. The log level is set to `TRACE` so all steps of the pipe are logged.
439+
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
440+
The log level is set to `TRACE` so all steps of the pipe are logged.
514441
Additionally all execution data is logged as well.

0 commit comments

Comments
 (0)