Skip to content

Commit 027f677

Browse files
committed
DynamoDb mapper: non-blocking asynchronous support for all operations
1 parent 732749f commit 027f677

File tree

71 files changed

+3972
-540
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+3972
-540
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon DynamoDB Enhanced Client [Preview]",
4+
"description": "Support for non-blocking asynchronous calling of all mapper operations"
5+
}

services-custom/dynamodb-enhanced/README.md

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ values used are also completely arbitrary.
5959
3. Create a MappedDatabase object that you will use to repeatedly
6060
execute operations against all your tables :-
6161
```java
62-
MappedDatabase database = MappedDatabase.builder()
63-
.dynamoDbClient(dynamoDbClient)
64-
.build();
62+
MappedDatabase database = DynamoDbMappedDatabase.builder()
63+
.dynamoDbClient(dynamoDbClient)
64+
.build();
6565
```
6666
4. Create a MappedTable object that you will use to repeatedly execute
6767
operations against a specific table :-
@@ -123,6 +123,42 @@ index. Here's an example of how to do this:
123123
Iterable<Page<Customer>> customersWithName = customersByName.query(equalTo(Key.of(stringValue("Smith"))));
124124
```
125125
126+
### Non-blocking asynchronous operations
127+
If your application requires non-blocking asynchronous calls to
128+
DynamoDb, then you can use the asynchronous implementation of the
129+
mapper. It's very similar to the synchronous implementation with a few
130+
key differences:
131+
132+
1. When instantiating the mapped database, use the asynchronous version
133+
of the library instead of the synchronous one (you will need to use
134+
an asynchronous DynamoDb client from the SDK as well):
135+
```java
136+
AsyncMappedDatabase database = DynamoDbAsyncMappedDatabase.builder()
137+
.dynamoDbAsyncClient(dynamoDbAsyncClient)
138+
.build();
139+
```
140+
141+
2. Operations that return a single data item will return a
142+
CompletableFuture of the result instead of just the result. Your
143+
application can then do other work without having to block on the
144+
result:
145+
```java
146+
CompletableFuture<Customer> result = mappedTable.execute(GetItem.of(customerKey));
147+
// Perform other work here
148+
return result.join(); // now block and wait for the result
149+
```
150+
151+
3. Operations that return paginated lists of results will return an
152+
SdkPublisher of the results instead of an SdkIterable. Your
153+
application can then subscribe a handler to that publisher and deal
154+
with the results asynchronously without having to block:
155+
```java
156+
SdkPublisher<Customer> results = mappedTable.execute(myQueryCommand);
157+
results.subscribe(myCustomerResultsProcessor);
158+
// Perform other work and let the processor handle the results asynchronously
159+
```
160+
161+
126162
### Using extensions
127163
The mapper supports plugin extensions to provide enhanced functionality
128164
beyond the simple primitive mapped operations. Only one extension can be
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.extensions.dynamodb.mappingclient;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
20+
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
import software.amazon.awssdk.extensions.dynamodb.mappingclient.core.DynamoDbAsyncMappedDatabase;
22+
23+
/**
24+
* Asynchronous interface for running commands against a DynamoDb database. See {@link DynamoDbAsyncMappedDatabase} for
25+
* an implementation of this interface that can statically created.
26+
*/
27+
@SdkPublicApi
28+
public interface AsyncMappedDatabase {
29+
/**
30+
* Executes a command against the database.
31+
*
32+
* @param operation The operation to be performed in the context of the database.
33+
* @param <T> The expected return type from the operation. This is typically inferred by the compiler.
34+
*
35+
* @return A {@link CompletableFuture} of the result of the operation being executed. The documentation on the
36+
* operation itself should have more information.
37+
*/
38+
<T> CompletableFuture<T> execute(DatabaseOperation<?, ?, T> operation);
39+
40+
/**
41+
* Returns a mapped table that can be used to execute commands that work with mapped items against that table.
42+
*
43+
* @param tableName The name of the physical table persisted by DynamoDb.
44+
* @param tableSchema A {@link TableSchema} that maps the table to a modelled object.
45+
* @return A {@link AsyncMappedTable} object that can be used to execute table operations against.
46+
* @param <T> THe modelled object type being mapped to this table.
47+
*/
48+
<T> AsyncMappedTable<T> table(String tableName, TableSchema<T> tableSchema);
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.extensions.dynamodb.mappingclient;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
20+
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
import software.amazon.awssdk.core.async.SdkPublisher;
22+
23+
/**
24+
* Asynchronous interface for running commands against an object that is linked to a specific DynamoDb secondary index
25+
* and knows how to map records from the table that index is linked to into a modelled object.
26+
*
27+
* @param <T> The type of the modelled object.
28+
*/
29+
@SdkPublicApi
30+
public interface AsyncMappedIndex<T> {
31+
/**
32+
* Executes a command that is expected to return a single data item against the database with the context of the
33+
* specific table and secondary index this object is linked to.
34+
*
35+
* @param operationToPerform The operation to be performed in the context of the secondary index.
36+
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
37+
* @return A {@link CompletableFuture} of the result of the operation being executed. The documentation on the
38+
* operation itself should have more information.
39+
*/
40+
<R> CompletableFuture<R> execute(IndexOperation<T, ?, ?, R> operationToPerform);
41+
42+
/**
43+
* Executes a command that is expected to return a paginated list of data items against the database with the
44+
* context of the specific table and secondary index this object is linked to.
45+
*
46+
* @param operationToPerform The operation to be performed in the context of the secondary index.
47+
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
48+
* @return An {@link SdkPublisher} that will publish successive pages of result data items to any subscriber with
49+
* demand for them.
50+
*/
51+
<R> SdkPublisher<R> execute(PaginatedIndexOperation<T, ?, ?, R> operationToPerform);
52+
53+
/**
54+
* Gets the {@link MapperExtension} associated with this mapped resource.
55+
* @return The {@link MapperExtension} associated with this mapped resource.
56+
*/
57+
MapperExtension mapperExtension();
58+
59+
/**
60+
* Gets the {@link TableSchema} object that this mapped table was built with.
61+
* @return The {@link TableSchema} object for this mapped table.
62+
*/
63+
TableSchema<T> tableSchema();
64+
65+
/**
66+
* Gets the physical table name that operations performed by this object will be executed against.
67+
* @return The physical table name.
68+
*/
69+
String tableName();
70+
71+
/**
72+
* Gets the physical secondary index name that operations performed by this object will be executed against.
73+
* @return The physical secondary index name.
74+
*/
75+
String indexName();
76+
77+
/**
78+
* Creates a {@link Key} object from a modelled item. This key can be used in query conditionals and get
79+
* operations to locate a specific record.
80+
* @param item The item to extract the key fields from.
81+
* @return A key that has been initialized with the index values extracted from the modelled object.
82+
*/
83+
Key keyFrom(T item);
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.extensions.dynamodb.mappingclient;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
20+
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
import software.amazon.awssdk.core.async.SdkPublisher;
22+
23+
/**
24+
* Asynchronous interface for running commands against an object that is linked to a specific DynamoDb table resource
25+
* and therefore knows how to map records from that table into a modelled object.
26+
*
27+
* @param <T> The type of the modelled object.
28+
*/
29+
@SdkPublicApi
30+
public interface AsyncMappedTable<T> extends MappedTableResource<T> {
31+
/**
32+
* Returns a mapped index that can be used to execute commands against a secondary index belonging to the table
33+
* being mapped by this object. Note that only a subset of the commands that work against a table will work
34+
* against a secondary index.
35+
*
36+
* @param indexName The name of the secondary index to build the command interface for.
37+
* @return An {@link AsyncMappedIndex} object that can be used to execute database commands against.
38+
*/
39+
AsyncMappedIndex<T> index(String indexName);
40+
41+
/**
42+
* Executes a command that is expected to return a single data item against the database with the context of the
43+
* primary index of the specific table this object is linked to.
44+
**
45+
* @param operationToPerform The operation to be performed in the context of the primary index of the table.
46+
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
47+
*
48+
* @return A {@link CompletableFuture} that will return the result of the operation being executed. The
49+
* documentation on the operation itself should have more information.
50+
*/
51+
<R> CompletableFuture<R> execute(TableOperation<T, ?, ?, R> operationToPerform);
52+
53+
/**
54+
* Executes a command that is expected to return a paginated list of data items against the database with the
55+
* context of the primary index of the specific table this object is linked to.
56+
**
57+
* @param operationToPerform The operation to be performed in the context of the primary index of the table.
58+
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
59+
*
60+
* @return An {@link SdkPublisher} that will publish successive pages of result data items to any subscriber with
61+
* demand for them.
62+
*/
63+
<R> SdkPublisher<R> execute(PaginatedTableOperation<T, ?, ?, R> operationToPerform);
64+
}

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/extensions/dynamodb/mappingclient/CommonOperation.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,31 @@
1515

1616
package software.amazon.awssdk.extensions.dynamodb.mappingclient;
1717

18+
import java.util.concurrent.CompletableFuture;
1819
import java.util.function.Function;
1920

2021
import software.amazon.awssdk.annotations.SdkPublicApi;
22+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
2123
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2224

2325
/**
24-
* Common interface for a single operation that can be executed against a mapped database table. These operations can be
25-
* made against either the primary index of a table or a secondary index, although some implementations of this
26-
* interface do not support secondary indices and will throw an exception when executed against one. Conceptually an
27-
* operation maps 1:1 with an actual DynamoDb call.
28-
*
26+
* Common interface for a single operation that can be executed in a synchronous or non-blocking asynchronous fashion
27+
* against a mapped database table. These operations can be made against either the primary index of a table or a
28+
* secondary index, although some implementations of this interface do not support secondary indices and will throw
29+
* an exception when executed against one. Conceptually an operation maps 1:1 with an actual DynamoDb call.
30+
* <p>
2931
* This interface is extended by {@link TableOperation} and {@link IndexOperation} which contain implementations of
3032
* the behavior to actually execute the operation in the context of a table or secondary index and are used by
31-
* {@link MappedTable} and {@link MappedIndex} respectively. By sharing this common interface operations are able to
32-
* re-use code regardless of whether they are executed in the context of a primary or secondary index.
33+
* {@link MappedTable} or {@link AsyncMappedTable} and {@link MappedIndex} or {@link AsyncMappedIndex} respectively. By
34+
* sharing this common interface operations are able to re-use code regardless of whether they are executed in the
35+
* context of a primary or secondary index or whether they are being executed in a synchronous or non-blocking
36+
* asynchronous fashion.
3337
*
3438
* @param <ItemT> The modelled object that this table maps records to.
35-
* @param <RequestT> The type of the request object for the DynamoDb call in the low level {@link DynamoDbClient}.
36-
* @param <ResponseT> The type of the response object for the DynamoDb call in the low level {@link DynamoDbClient}.
39+
* @param <RequestT> The type of the request object for the DynamoDb call in the low level {@link DynamoDbClient} or
40+
* {@link DynamoDbAsyncClient}.
41+
* @param <ResponseT> The type of the response object for the DynamoDb call in the low level {@link DynamoDbClient}
42+
* or {@link DynamoDbAsyncClient}.
3743
* @param <ResultT> The type of the mapped result object that will be returned by the execution of this operation.
3844
*/
3945
@SdkPublicApi
@@ -49,12 +55,19 @@ public interface CommonOperation<ItemT, RequestT, ResponseT, ResultT> {
4955
RequestT generateRequest(TableSchema<ItemT> tableSchema, OperationContext context, MapperExtension mapperExtension);
5056

5157
/**
52-
* Provides a function for making the low level SDK call to DynamoDb.
58+
* Provides a function for making the low level synchronous SDK call to DynamoDb.
5359
* @param dynamoDbClient A low level {@link DynamoDbClient} to make the call against.
5460
* @return A function that calls DynamoDb with a provided request object and returns the response object.
5561
*/
5662
Function<RequestT, ResponseT> serviceCall(DynamoDbClient dynamoDbClient);
5763

64+
/**
65+
* Provides a function for making the low level non-blocking asynchronous SDK call to DynamoDb.
66+
* @param dynamoDbAsyncClient A low level {@link DynamoDbAsyncClient} to make the call against.
67+
* @return A function that calls DynamoDb with a provided request object and returns the response object.
68+
*/
69+
Function<RequestT, CompletableFuture<ResponseT>> asyncServiceCall(DynamoDbAsyncClient dynamoDbAsyncClient);
70+
5871
/**
5972
* Takes the response object returned by the actual DynamoDb call and maps it into a higher level abstracted
6073
* result object.
@@ -71,7 +84,8 @@ ResultT transformResponse(ResponseT response,
7184
MapperExtension mapperExtension);
7285

7386
/**
74-
* Default implementation of a complete execution of this operation against either the primary or a secondary index.
87+
* Default implementation of a complete synchronous execution of this operation against either the primary or a
88+
* secondary index.
7589
* It performs three steps:
7690
* 1) Call generateRequest() to get the request object.
7791
* 2) Call getServiceCall() and call it using the request object generated in the previous step.
@@ -92,4 +106,30 @@ default ResultT execute(TableSchema<ItemT> tableSchema,
92106
ResponseT response = serviceCall(dynamoDbClient).apply(request);
93107
return transformResponse(response, tableSchema, context, mapperExtension);
94108
}
109+
110+
/**
111+
* Default implementation of a complete non-blocking asynchronous execution of this operation against either the
112+
* primary or a secondary index.
113+
* It performs three steps:
114+
* 1) Call generateRequest() to get the request object.
115+
* 2) Call getServiceCall() and call it using the request object generated in the previous step.
116+
* 3) Wraps the {@link CompletableFuture} returned by the SDK in a new one that calls transformResponse() to
117+
* convert the response object returned in the previous step to a high level result.
118+
*
119+
* @param tableSchema A {@link TableSchema} that maps the table to a modelled object.
120+
* @param context An object containing the context, or target, of the command execution.
121+
* @param dynamoDbAsyncClient A {@link DynamoDbAsyncClient} to make the call against.
122+
* @param mapperExtension A {@link MapperExtension} that may modify the request or result of this operation. A
123+
* null value here will result in no modifications.
124+
* @return A {@link CompletableFuture} of the high level result object as specified by the implementation of this
125+
* operation.
126+
*/
127+
default CompletableFuture<ResultT> executeAsync(TableSchema<ItemT> tableSchema,
128+
OperationContext context,
129+
MapperExtension mapperExtension,
130+
DynamoDbAsyncClient dynamoDbAsyncClient) {
131+
RequestT request = generateRequest(tableSchema, context, mapperExtension);
132+
CompletableFuture<ResponseT> response = asyncServiceCall(dynamoDbAsyncClient).apply(request);
133+
return response.thenApply(r -> transformResponse(r, tableSchema, context, mapperExtension));
134+
}
95135
}

0 commit comments

Comments
 (0)