Skip to content

DynamoDb mapper: non-blocking asynchronous support for all operations #1600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "feature",
"category": "Amazon DynamoDB Enhanced Client [Preview]",
"description": "Support for non-blocking asynchronous calling of all mapper operations"
}
42 changes: 39 additions & 3 deletions services-custom/dynamodb-enhanced/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ values used are also completely arbitrary.
3. Create a MappedDatabase object that you will use to repeatedly
execute operations against all your tables :-
```java
MappedDatabase database = MappedDatabase.builder()
.dynamoDbClient(dynamoDbClient)
.build();
MappedDatabase database = DynamoDbMappedDatabase.builder()
.dynamoDbClient(dynamoDbClient)
.build();
```
4. Create a MappedTable object that you will use to repeatedly execute
operations against a specific table :-
Expand Down Expand Up @@ -123,6 +123,42 @@ index. Here's an example of how to do this:
Iterable<Page<Customer>> customersWithName = customersByName.query(equalTo(Key.of(stringValue("Smith"))));
```

### Non-blocking asynchronous operations
If your application requires non-blocking asynchronous calls to
DynamoDb, then you can use the asynchronous implementation of the
mapper. It's very similar to the synchronous implementation with a few
key differences:

1. When instantiating the mapped database, use the asynchronous version
of the library instead of the synchronous one (you will need to use
an asynchronous DynamoDb client from the SDK as well):
```java
AsyncMappedDatabase database = DynamoDbAsyncMappedDatabase.builder()
.dynamoDbAsyncClient(dynamoDbAsyncClient)
.build();
```

2. Operations that return a single data item will return a
CompletableFuture of the result instead of just the result. Your
application can then do other work without having to block on the
result:
```java
CompletableFuture<Customer> result = mappedTable.execute(GetItem.of(customerKey));
// Perform other work here
return result.join(); // now block and wait for the result
```

3. Operations that return paginated lists of results will return an
SdkPublisher of the results instead of an SdkIterable. Your
application can then subscribe a handler to that publisher and deal
with the results asynchronously without having to block:
```java
SdkPublisher<Customer> results = mappedTable.execute(myQueryCommand);
results.subscribe(myCustomerResultsProcessor);
// Perform other work and let the processor handle the results asynchronously
```


### Using extensions
The mapper supports plugin extensions to provide enhanced functionality
beyond the simple primitive mapped operations. Only one extension can be
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.extensions.dynamodb.mappingclient;

import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.extensions.dynamodb.mappingclient.core.DynamoDbAsyncMappedDatabase;

/**
* Asynchronous interface for running commands against a DynamoDb database. See {@link DynamoDbAsyncMappedDatabase} for
* an implementation of this interface that can statically created.
*/
@SdkPublicApi
public interface AsyncMappedDatabase {
/**
* Executes a command against the database.
*
* @param operation The operation to be performed in the context of the database.
* @param <T> The expected return type from the operation. This is typically inferred by the compiler.
*
* @return A {@link CompletableFuture} of the result of the operation being executed. The documentation on the
* operation itself should have more information.
*/
<T> CompletableFuture<T> execute(DatabaseOperation<?, ?, T> operation);

/**
* Returns a mapped table that can be used to execute commands that work with mapped items against that table.
*
* @param tableName The name of the physical table persisted by DynamoDb.
* @param tableSchema A {@link TableSchema} that maps the table to a modelled object.
* @return A {@link AsyncMappedTable} object that can be used to execute table operations against.
* @param <T> THe modelled object type being mapped to this table.
*/
<T> AsyncMappedTable<T> table(String tableName, TableSchema<T> tableSchema);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.extensions.dynamodb.mappingclient;

import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.async.SdkPublisher;

/**
* Asynchronous interface for running commands against an object that is linked to a specific DynamoDb secondary index
* and knows how to map records from the table that index is linked to into a modelled object.
*
* @param <T> The type of the modelled object.
*/
@SdkPublicApi
public interface AsyncMappedIndex<T> {
/**
* Executes a command that is expected to return a single data item against the database with the context of the
* specific table and secondary index this object is linked to.
*
* @param operationToPerform The operation to be performed in the context of the secondary index.
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
* @return A {@link CompletableFuture} of the result of the operation being executed. The documentation on the
* operation itself should have more information.
*/
<R> CompletableFuture<R> execute(IndexOperation<T, ?, ?, R> operationToPerform);

/**
* Executes a command that is expected to return a paginated list of data items against the database with the
* context of the specific table and secondary index this object is linked to.
*
* @param operationToPerform The operation to be performed in the context of the secondary index.
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
* @return An {@link SdkPublisher} that will publish successive pages of result data items to any subscriber with
* demand for them.
*/
<R> SdkPublisher<R> execute(PaginatedIndexOperation<T, ?, ?, R> operationToPerform);

/**
* Gets the {@link MapperExtension} associated with this mapped resource.
* @return The {@link MapperExtension} associated with this mapped resource.
*/
MapperExtension mapperExtension();

/**
* Gets the {@link TableSchema} object that this mapped table was built with.
* @return The {@link TableSchema} object for this mapped table.
*/
TableSchema<T> tableSchema();

/**
* Gets the physical table name that operations performed by this object will be executed against.
* @return The physical table name.
*/
String tableName();

/**
* Gets the physical secondary index name that operations performed by this object will be executed against.
* @return The physical secondary index name.
*/
String indexName();

/**
* Creates a {@link Key} object from a modelled item. This key can be used in query conditionals and get
* operations to locate a specific record.
* @param item The item to extract the key fields from.
* @return A key that has been initialized with the index values extracted from the modelled object.
*/
Key keyFrom(T item);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.extensions.dynamodb.mappingclient;

import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.async.SdkPublisher;

/**
* Asynchronous interface for running commands against an object that is linked to a specific DynamoDb table resource
* and therefore knows how to map records from that table into a modelled object.
*
* @param <T> The type of the modelled object.
*/
@SdkPublicApi
public interface AsyncMappedTable<T> extends MappedTableResource<T> {
/**
* Returns a mapped index that can be used to execute commands against a secondary index belonging to the table
* being mapped by this object. Note that only a subset of the commands that work against a table will work
* against a secondary index.
*
* @param indexName The name of the secondary index to build the command interface for.
* @return An {@link AsyncMappedIndex} object that can be used to execute database commands against.
*/
AsyncMappedIndex<T> index(String indexName);

/**
* Executes a command that is expected to return a single data item against the database with the context of the
* primary index of the specific table this object is linked to.
**
* @param operationToPerform The operation to be performed in the context of the primary index of the table.
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
*
* @return A {@link CompletableFuture} that will return the result of the operation being executed. The
* documentation on the operation itself should have more information.
*/
<R> CompletableFuture<R> execute(TableOperation<T, ?, ?, R> operationToPerform);

/**
* Executes a command that is expected to return a paginated list of data items against the database with the
* context of the primary index of the specific table this object is linked to.
**
* @param operationToPerform The operation to be performed in the context of the primary index of the table.
* @param <R> The expected return type from the operation. This is typically inferred by the compiler.
*
* @return An {@link SdkPublisher} that will publish successive pages of result data items to any subscriber with
* demand for them.
*/
<R> SdkPublisher<R> execute(PaginatedTableOperation<T, ?, ?, R> operationToPerform);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,31 @@

package software.amazon.awssdk.extensions.dynamodb.mappingclient;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

/**
* Common interface for a single operation that can be executed against a mapped database table. These operations can be
* made against either the primary index of a table or a secondary index, although some implementations of this
* interface do not support secondary indices and will throw an exception when executed against one. Conceptually an
* operation maps 1:1 with an actual DynamoDb call.
*
* Common interface for a single operation that can be executed in a synchronous or non-blocking asynchronous fashion
* against a mapped database table. These operations can be made against either the primary index of a table or a
* secondary index, although some implementations of this interface do not support secondary indices and will throw
* an exception when executed against one. Conceptually an operation maps 1:1 with an actual DynamoDb call.
* <p>
* This interface is extended by {@link TableOperation} and {@link IndexOperation} which contain implementations of
* the behavior to actually execute the operation in the context of a table or secondary index and are used by
* {@link MappedTable} and {@link MappedIndex} respectively. By sharing this common interface operations are able to
* re-use code regardless of whether they are executed in the context of a primary or secondary index.
* {@link MappedTable} or {@link AsyncMappedTable} and {@link MappedIndex} or {@link AsyncMappedIndex} respectively. By
* sharing this common interface operations are able to re-use code regardless of whether they are executed in the
* context of a primary or secondary index or whether they are being executed in a synchronous or non-blocking
* asynchronous fashion.
*
* @param <ItemT> The modelled object that this table maps records to.
* @param <RequestT> The type of the request object for the DynamoDb call in the low level {@link DynamoDbClient}.
* @param <ResponseT> The type of the response object for the DynamoDb call in the low level {@link DynamoDbClient}.
* @param <RequestT> The type of the request object for the DynamoDb call in the low level {@link DynamoDbClient} or
* {@link DynamoDbAsyncClient}.
* @param <ResponseT> The type of the response object for the DynamoDb call in the low level {@link DynamoDbClient}
* or {@link DynamoDbAsyncClient}.
* @param <ResultT> The type of the mapped result object that will be returned by the execution of this operation.
*/
@SdkPublicApi
Expand All @@ -49,12 +55,19 @@ public interface CommonOperation<ItemT, RequestT, ResponseT, ResultT> {
RequestT generateRequest(TableSchema<ItemT> tableSchema, OperationContext context, MapperExtension mapperExtension);

/**
* Provides a function for making the low level SDK call to DynamoDb.
* Provides a function for making the low level synchronous SDK call to DynamoDb.
* @param dynamoDbClient A low level {@link DynamoDbClient} to make the call against.
* @return A function that calls DynamoDb with a provided request object and returns the response object.
*/
Function<RequestT, ResponseT> serviceCall(DynamoDbClient dynamoDbClient);

/**
* Provides a function for making the low level non-blocking asynchronous SDK call to DynamoDb.
* @param dynamoDbAsyncClient A low level {@link DynamoDbAsyncClient} to make the call against.
* @return A function that calls DynamoDb with a provided request object and returns the response object.
*/
Function<RequestT, CompletableFuture<ResponseT>> asyncServiceCall(DynamoDbAsyncClient dynamoDbAsyncClient);

/**
* Takes the response object returned by the actual DynamoDb call and maps it into a higher level abstracted
* result object.
Expand All @@ -71,7 +84,8 @@ ResultT transformResponse(ResponseT response,
MapperExtension mapperExtension);

/**
* Default implementation of a complete execution of this operation against either the primary or a secondary index.
* Default implementation of a complete synchronous execution of this operation against either the primary or a
* secondary index.
* It performs three steps:
* 1) Call generateRequest() to get the request object.
* 2) Call getServiceCall() and call it using the request object generated in the previous step.
Expand All @@ -92,4 +106,30 @@ default ResultT execute(TableSchema<ItemT> tableSchema,
ResponseT response = serviceCall(dynamoDbClient).apply(request);
return transformResponse(response, tableSchema, context, mapperExtension);
}

/**
* Default implementation of a complete non-blocking asynchronous execution of this operation against either the
* primary or a secondary index.
* It performs three steps:
* 1) Call generateRequest() to get the request object.
* 2) Call getServiceCall() and call it using the request object generated in the previous step.
* 3) Wraps the {@link CompletableFuture} returned by the SDK in a new one that calls transformResponse() to
* convert the response object returned in the previous step to a high level result.
*
* @param tableSchema A {@link TableSchema} that maps the table to a modelled object.
* @param context An object containing the context, or target, of the command execution.
* @param dynamoDbAsyncClient A {@link DynamoDbAsyncClient} to make the call against.
* @param mapperExtension A {@link MapperExtension} that may modify the request or result of this operation. A
* null value here will result in no modifications.
* @return A {@link CompletableFuture} of the high level result object as specified by the implementation of this
* operation.
*/
default CompletableFuture<ResultT> executeAsync(TableSchema<ItemT> tableSchema,
OperationContext context,
MapperExtension mapperExtension,
DynamoDbAsyncClient dynamoDbAsyncClient) {
RequestT request = generateRequest(tableSchema, context, mapperExtension);
CompletableFuture<ResponseT> response = asyncServiceCall(dynamoDbAsyncClient).apply(request);
return response.thenApply(r -> transformResponse(r, tableSchema, context, mapperExtension));
}
}
Loading