Skip to content

Commit 2675fa9

Browse files
nichmorganmorgan1371EduardoRSOlucabarcelosViniciusBrisotti
authored
DocumentDB support (awslabs#706)
* insert event draft * abstract change event * Added documentdb delete event * Added support to change event drop * Added support to dropDatabase Event * - MongoDB v6.0 Change Event Fields removed - ChangeEvent enum tagged - AnyDocument common type created * replace event support * added support to invalidate event in documentdb * Adding DocumentDB Rename event. * run cargo fmt * Excluding 'to' parameter * Add DocumentDB Update event * fixed 'to' parameter and run cargo fmt * Refactoring 'Rename' event declaration as a single type not a commum type * InsertNs renamed to DatabaseCollection for code reuse * unused field removed * cfg fix * fix lines * fmt and makefile fixed * makefile reord --------- Co-authored-by: nich.morgan <[email protected]> Co-authored-by: erso <[email protected]> Co-authored-by: Luca Barcelos <[email protected]> Co-authored-by: Vinicius Brisotti <[email protected]> Co-authored-by: Pedro Rabello Sato <[email protected]> Co-authored-by: darwish <[email protected]>
1 parent bcd3f97 commit 2675fa9

23 files changed

+533
-1
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,10 @@ check-event-features:
8181
cargo test --package aws_lambda_events --no-default-features --features cognito
8282
cargo test --package aws_lambda_events --no-default-features --features config
8383
cargo test --package aws_lambda_events --no-default-features --features connect
84+
cargo test --package aws_lambda_events --no-default-features --features documentdb
8485
cargo test --package aws_lambda_events --no-default-features --features dynamodb
8586
cargo test --package aws_lambda_events --no-default-features --features ecr_scan
87+
cargo test --package aws_lambda_events --no-default-features --features eventbridge
8688
cargo test --package aws_lambda_events --no-default-features --features firehose
8789
cargo test --package aws_lambda_events --no-default-features --features iam
8890
cargo test --package aws_lambda_events --no-default-features --features iot
@@ -101,7 +103,6 @@ check-event-features:
101103
cargo test --package aws_lambda_events --no-default-features --features sns
102104
cargo test --package aws_lambda_events --no-default-features --features sqs
103105
cargo test --package aws_lambda_events --no-default-features --features streams
104-
cargo test --package aws_lambda_events --no-default-features --features eventbridge
105106

106107
fmt:
107108
cargo +nightly fmt --all

lambda-events/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ default = [
7676
"sns",
7777
"sqs",
7878
"streams",
79+
"documentdb",
7980
"eventbridge",
8081
]
8182

@@ -118,4 +119,5 @@ ses = ["chrono"]
118119
sns = ["chrono", "serde_with"]
119120
sqs = ["serde_with"]
120121
streams = []
122+
documentdb = []
121123
eventbridge = ["chrono", "serde_with"]
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::collections::HashMap;
2+
3+
use serde::{Deserialize, Serialize};
4+
use serde_json::Value;
5+
6+
pub type AnyDocument = HashMap<String, Value>;
7+
8+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
9+
#[serde(rename_all = "camelCase")]
10+
pub struct DatabaseCollection {
11+
db: String,
12+
#[serde(default)]
13+
coll: Option<String>,
14+
}
15+
16+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
17+
pub struct DocumentId {
18+
#[serde(rename = "_data")]
19+
pub data: String,
20+
}
21+
22+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
23+
pub struct DocumentKeyIdOid {
24+
#[serde(rename = "$oid")]
25+
pub oid: String,
26+
}
27+
28+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
29+
pub struct DocumentKeyId {
30+
#[serde(rename = "_id")]
31+
pub id: DocumentKeyIdOid,
32+
}
33+
34+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
35+
pub struct InnerTimestamp {
36+
t: usize,
37+
i: usize,
38+
}
39+
40+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
41+
pub struct Timestamp {
42+
#[serde(rename = "$timestamp")]
43+
pub timestamp: InnerTimestamp,
44+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
pub struct ChangeDeleteEvent {
8+
#[serde(rename = "_id")]
9+
id: DocumentId,
10+
#[serde(default)]
11+
cluster_time: Option<Timestamp>,
12+
document_key: DocumentKeyId,
13+
#[serde(default)]
14+
#[serde(rename = "lsid")]
15+
ls_id: Option<AnyDocument>,
16+
ns: DatabaseCollection,
17+
// operation_type: String,
18+
#[serde(default)]
19+
txn_number: Option<String>,
20+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
pub struct ChangeDropDatabaseEvent {
8+
#[serde(rename = "_id")]
9+
id: DocumentId,
10+
cluster_time: Timestamp,
11+
#[serde(rename = "lsid")]
12+
ls_id: Option<AnyDocument>,
13+
ns: DatabaseCollection,
14+
// operation_type: String,
15+
#[serde(default)]
16+
txn_number: Option<String>,
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};
2+
use serde::{Deserialize, Serialize};
3+
4+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
5+
#[serde(rename_all = "camelCase")]
6+
pub struct ChangeDropEvent {
7+
#[serde(rename = "_id")]
8+
id: DocumentId,
9+
cluster_time: Timestamp,
10+
#[serde(default)]
11+
#[serde(rename = "lsid")]
12+
ls_id: Option<AnyDocument>,
13+
ns: DatabaseCollection,
14+
// operation_type: String,
15+
#[serde(default)]
16+
txn_number: Option<String>,
17+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
8+
pub struct ChangeInsertEvent {
9+
#[serde(rename = "_id")]
10+
id: DocumentId,
11+
#[serde(default)]
12+
cluster_time: Option<Timestamp>,
13+
document_key: DocumentKeyId,
14+
#[serde(default)]
15+
#[serde(rename = "lsid")]
16+
ls_id: Option<String>,
17+
ns: DatabaseCollection,
18+
//operation_type: String,
19+
#[serde(default)]
20+
txn_number: Option<AnyDocument>,
21+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{DocumentId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
pub struct ChangeInvalidateEvent {
8+
#[serde(rename = "_id")]
9+
id: DocumentId,
10+
#[serde(default)]
11+
cluster_time: Option<Timestamp>,
12+
// operation_type: String,
13+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
pub mod commom_types;
2+
pub mod delete_event;
3+
pub mod drop_database_event;
4+
pub mod drop_event;
5+
pub mod insert_event;
6+
pub mod invalidate_event;
7+
pub mod rename_event;
8+
pub mod replace_event;
9+
pub mod update_event;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
pub struct ChangeRenameEvent {
8+
#[serde(rename = "_id")]
9+
id: DocumentId,
10+
#[serde(default)]
11+
cluster_time: Option<Timestamp>,
12+
13+
#[serde(default)]
14+
#[serde(rename = "lsid")]
15+
ls_id: Option<AnyDocument>,
16+
ns: DatabaseCollection,
17+
//operation_type: String,
18+
#[serde(default)]
19+
txn_number: Option<String>,
20+
to: DatabaseCollection,
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
pub struct ChangeReplaceEvent {
8+
#[serde(rename = "_id")]
9+
id: DocumentId,
10+
#[serde(default)]
11+
cluster_time: Option<Timestamp>,
12+
document_key: DocumentKeyId,
13+
#[serde(default)]
14+
#[serde(rename = "lsid")]
15+
ls_id: Option<String>,
16+
ns: DatabaseCollection,
17+
// operation_type: String,
18+
#[serde(default)]
19+
txn_number: Option<AnyDocument>,
20+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};
4+
5+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
6+
#[serde(rename_all = "camelCase")]
7+
pub struct ChangeUpdateEvent {
8+
#[serde(rename = "_id")]
9+
id: DocumentId,
10+
#[serde(default)]
11+
cluster_time: Option<Timestamp>,
12+
document_key: DocumentKeyId,
13+
#[serde(rename = "lsid")]
14+
ls_id: Option<String>,
15+
ns: DatabaseCollection,
16+
// operation_type: String,
17+
#[serde(default)]
18+
txn_number: Option<AnyDocument>,
19+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
pub mod events;
2+
3+
use self::events::{
4+
delete_event::ChangeDeleteEvent, drop_database_event::ChangeDropDatabaseEvent, drop_event::ChangeDropEvent,
5+
insert_event::ChangeInsertEvent, invalidate_event::ChangeInvalidateEvent, rename_event::ChangeRenameEvent,
6+
replace_event::ChangeReplaceEvent, update_event::ChangeUpdateEvent,
7+
};
8+
use serde::{Deserialize, Serialize};
9+
10+
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
11+
#[serde(tag = "operationType", rename_all = "camelCase")]
12+
pub enum ChangeEvent {
13+
Insert(ChangeInsertEvent),
14+
Delete(ChangeDeleteEvent),
15+
Drop(ChangeDropEvent),
16+
DropDatabase(ChangeDropDatabaseEvent),
17+
Invalidate(ChangeInvalidateEvent),
18+
Replace(ChangeReplaceEvent),
19+
Update(ChangeUpdateEvent),
20+
Rename(ChangeRenameEvent),
21+
}
22+
23+
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
24+
pub struct DocumentDbInnerEvent {
25+
pub event: ChangeEvent,
26+
}
27+
28+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
29+
#[serde(rename_all = "camelCase")]
30+
pub struct DocumentDbEvent {
31+
#[serde(default)]
32+
pub event_source_arn: Option<String>,
33+
pub events: Vec<DocumentDbInnerEvent>,
34+
#[serde(default)]
35+
pub event_source: Option<String>,
36+
}
37+
38+
#[cfg(test)]
39+
#[cfg(feature = "documentdb")]
40+
mod test {
41+
use super::*;
42+
43+
pub type Event = DocumentDbEvent;
44+
45+
fn test_example(data: &[u8]) {
46+
let parsed: Event = serde_json::from_slice(data).unwrap();
47+
let output: String = serde_json::to_string(&parsed).unwrap();
48+
let reparsed: Event = serde_json::from_slice(output.as_bytes()).unwrap();
49+
50+
assert_eq!(parsed, reparsed);
51+
}
52+
53+
#[test]
54+
fn example_documentdb_insert_event() {
55+
test_example(include_bytes!("../../fixtures/example-documentdb-insert-event.json"));
56+
}
57+
58+
#[test]
59+
fn example_documentdb_delete_event() {
60+
test_example(include_bytes!("../../fixtures/example-documentdb-delete-event.json"));
61+
}
62+
63+
#[test]
64+
fn example_documentdb_drop_event() {
65+
test_example(include_bytes!("../../fixtures/example-documentdb-drop-event.json"));
66+
}
67+
68+
#[test]
69+
fn example_documentdb_replace_event() {
70+
test_example(include_bytes!("../../fixtures/example-documentdb-replace-event.json"));
71+
}
72+
73+
#[test]
74+
fn example_documentdb_update_event() {
75+
test_example(include_bytes!("../../fixtures/example-documentdb-update-event.json"));
76+
}
77+
78+
#[test]
79+
fn example_documentdb_rename_event() {
80+
test_example(include_bytes!("../../fixtures/example-documentdb-rename-event.json"));
81+
}
82+
83+
#[test]
84+
fn example_documentdb_invalidate_event() {
85+
test_example(include_bytes!(
86+
"../../fixtures/example-documentdb-invalidate-event.json"
87+
));
88+
}
89+
90+
#[test]
91+
fn example_documentdb_drop_database_event() {
92+
test_example(include_bytes!(
93+
"../../fixtures/example-documentdb-drop-database-event.json"
94+
));
95+
}
96+
}

lambda-events/src/event/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ pub mod sqs;
141141
#[cfg(feature = "streams")]
142142
pub mod streams;
143143

144+
// AWS Lambda event definitions for DocumentDB
145+
#[cfg(feature = "documentdb")]
146+
pub mod documentdb;
147+
144148
/// AWS Lambda event definitions for EventBridge.
145149
#[cfg(feature = "eventbridge")]
146150
pub mod eventbridge;
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03",
3+
"events": [
4+
{
5+
"event": {
6+
"_id": {
7+
"_data": "0163eeb6e7000000090100000009000041e1"
8+
},
9+
"clusterTime": {
10+
"$timestamp": {
11+
"t": 1676588775,
12+
"i": 9
13+
}
14+
},
15+
"documentKey": {
16+
"_id": {
17+
"$oid": "63eeb6e7d418cd98afb1c1d7"
18+
}
19+
},
20+
"ns": {
21+
"db": "test_database",
22+
"coll": "test_collection"
23+
},
24+
"operationType": "delete"
25+
}
26+
}
27+
],
28+
"eventSource": "aws:docdb"
29+
}
30+

0 commit comments

Comments
 (0)