Skip to content

Commit d8d296d

Browse files
committed
Make query and scan return PageIterable that allows customers to iterate through items
Make async query and scan return PagePublisher that allows customers to subscribe to items across all pages directly Update README
1 parent c2825cf commit d8d296d

File tree

24 files changed

+574
-275
lines changed

24 files changed

+574
-275
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/SyncResponseClassSpec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
180180
resultKeyType)))
181181
.addCode(getIteratorLambdaBlock(resultKey, resultKeyModel))
182182
.addCode("\n")
183-
.addStatement("return $T.builder().pagesIterable(this).itemIteratorFunction(getIterator).build()",
184-
PaginatedItemsIterable.class)
183+
.addStatement("return $T.<$T, $T>builder().pagesIterable(this).itemIteratorFunction(getIterator).build"
184+
+ "()",
185+
PaginatedItemsIterable.class, responseType(), resultKeyType)
185186
.addJavadoc(CodeBlock.builder()
186187
.add("Returns an iterable to iterate through the paginated {@link $T#$L()} member."
187188
+ " The returned iterable is used to iterate through the results across all "

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public final SdkIterable<SimpleStruct> items() {
104104
}
105105
return Collections.emptyIterator();
106106
};
107-
return PaginatedItemsIterable.builder().pagesIterable(this).itemIteratorFunction(getIterator).build();
107+
return PaginatedItemsIterable.<PaginatedOperationWithResultKeyResponse, SimpleStruct> builder().pagesIterable(this)
108+
.itemIteratorFunction(getIterator).build();
108109
}
109110

110111
private class PaginatedOperationWithResultKeyResponseFetcher implements

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ public final SdkIterable<SimpleStruct> items() {
9999
}
100100
return Collections.emptyIterator();
101101
};
102-
return PaginatedItemsIterable.builder().pagesIterable(this).itemIteratorFunction(getIterator).build();
102+
return PaginatedItemsIterable.<SameTokenPaginationApiResponse, SimpleStruct> builder().pagesIterable(this)
103+
.itemIteratorFunction(getIterator).build();
103104
}
104105

105106
private class SameTokenPaginationApiResponseFetcher implements SyncPageFetcher<SameTokenPaginationApiResponse> {

core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/sync/PaginatedItemsIterable.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ public final class PaginatedItemsIterable<ResponseT, ItemT> implements SdkIterab
3434
private final SdkIterable<ResponseT> pagesIterable;
3535
private final Function<ResponseT, Iterator<ItemT>> getItemIterator;
3636

37-
private PaginatedItemsIterable(BuilderImpl builder) {
37+
private PaginatedItemsIterable(BuilderImpl<ResponseT, ItemT> builder) {
3838
this.pagesIterable = builder.pagesIterable;
3939
this.getItemIterator = builder.itemIteratorFunction;
4040
}
4141

42-
public static Builder builder() {
43-
return new BuilderImpl();
42+
public static <R, T> Builder<R, T> builder() {
43+
return new BuilderImpl<>();
4444
}
4545

4646
@Override
@@ -86,33 +86,36 @@ private boolean hasMoreItems() {
8686
}
8787
}
8888

89-
public interface Builder {
90-
Builder pagesIterable(SdkIterable sdkIterable);
89+
public interface Builder<ResponseT, ItemT> {
90+
Builder<ResponseT, ItemT> pagesIterable(SdkIterable<ResponseT> sdkIterable);
9191

92-
Builder itemIteratorFunction(Function itemIteratorFunction);
92+
Builder<ResponseT, ItemT> itemIteratorFunction(Function<ResponseT, Iterator<ItemT>> itemIteratorFunction);
9393

94-
PaginatedItemsIterable build();
94+
PaginatedItemsIterable<ResponseT, ItemT> build();
9595
}
9696

97-
private static final class BuilderImpl implements Builder {
98-
private SdkIterable pagesIterable;
99-
private Function itemIteratorFunction;
97+
private static final class BuilderImpl<ResponseT, ItemT> implements Builder<ResponseT, ItemT> {
98+
private SdkIterable<ResponseT> pagesIterable;
99+
private Function<ResponseT, Iterator<ItemT>> itemIteratorFunction;
100+
101+
private BuilderImpl() {
102+
}
100103

101104
@Override
102-
public Builder pagesIterable(SdkIterable pagesIterable) {
105+
public Builder<ResponseT, ItemT> pagesIterable(SdkIterable<ResponseT> pagesIterable) {
103106
this.pagesIterable = pagesIterable;
104107
return this;
105108
}
106109

107110
@Override
108-
public Builder itemIteratorFunction(Function itemIteratorFunction) {
111+
public Builder<ResponseT, ItemT> itemIteratorFunction(Function<ResponseT, Iterator<ItemT>> itemIteratorFunction) {
109112
this.itemIteratorFunction = itemIteratorFunction;
110113
return this;
111114
}
112115

113116
@Override
114-
public PaginatedItemsIterable build() {
115-
return new PaginatedItemsIterable(this);
117+
public PaginatedItemsIterable<ResponseT, ItemT> build() {
118+
return new PaginatedItemsIterable<>(this);
116119
}
117120
}
118121
}

services-custom/dynamodb-enhanced/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ fully documented in the Javadoc of the interfaces referenced in these examples.
117117
Customer deletedCustomer = customerTable.deleteItem(Key.builder().partitionValue("a123").sortValue(456).build());
118118

119119
// Query
120-
Iterable<Page<Customer>> customers = customerTable.query(equalTo(k -> k.partitionValue("a123")));
120+
PageIterable<Customer> customers = customerTable.query(equalTo(k -> k.partitionValue("a123")));
121121

122122
// Scan
123-
Iterable<Page<Customer>> customers = customerTable.scan();
123+
PageIterable<Customer> customers = customerTable.scan();
124124

125125
// BatchGetItem
126126
batchResults = enhancedClient.batchGetItem(r -> r.addReadBatch(ReadBatch.builder(Customer.class)
@@ -156,7 +156,7 @@ index. Here's an example of how to do this:
156156
```java
157157
DynamoDbIndex<Customer> customersByName = customerTable.index("customers_by_name");
158158
159-
Iterable<Page<Customer>> customersWithName =
159+
PageIterable<Customer> customersWithName =
160160
customersByName.query(r -> r.queryConditional(equalTo(k -> k.partitionValue("Smith"))));
161161
```
162162
@@ -190,7 +190,7 @@ key differences:
190190
application can then subscribe a handler to that publisher and deal
191191
with the results asynchronously without having to block:
192192
```java
193-
SdkPublisher<Customer> results = mappedTable.query(r -> r.queryConditional(equalTo(k -> k.partitionValue("Smith"))));
193+
PagePublisher<Customer> results = mappedTable.query(r -> r.queryConditional(equalTo(k -> k.partitionValue("Smith"))));
194194
results.subscribe(myCustomerResultsProcessor);
195195
// Perform other work and let the processor handle the results asynchronously
196196
```

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/DynamoDbAsyncTable.java

Lines changed: 78 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
import java.util.concurrent.CompletableFuture;
1919
import java.util.function.Consumer;
2020
import software.amazon.awssdk.annotations.SdkPublicApi;
21-
import software.amazon.awssdk.core.async.SdkPublisher;
2221
import software.amazon.awssdk.enhanced.dynamodb.model.CreateTableEnhancedRequest;
2322
import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest;
2423
import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest;
2524
import software.amazon.awssdk.enhanced.dynamodb.model.Page;
25+
import software.amazon.awssdk.enhanced.dynamodb.model.PagePublisher;
2626
import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest;
2727
import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional;
2828
import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
2929
import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest;
3030
import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest;
31+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
3132

3233
/**
3334
* Asynchronous interface for running commands against an object that is linked to a specific DynamoDb table resource
@@ -333,68 +334,77 @@ default CompletableFuture<T> getItem(T keyItem) {
333334
* Executes a query against the primary index of the table using a {@link QueryConditional} expression to retrieve a list of
334335
* items matching the given conditions.
335336
* <p>
336-
* The result is accessed through iterable pages (see {@link Page}) in an interactive way; each time a
337-
* result page is retrieved, a query call is made to DynamoDb to get those entries. If no matches are found,
338-
* the resulting iterator will contain an empty page. Results are sorted by sort key value in
337+
* The return type is a custom publisher that can be subscribed to request a stream of {@link Page}s or
338+
* a stream of items across all pages. Results are sorted by sort key value in
339339
* ascending order by default; this behavior can be overridden in the {@link QueryEnhancedRequest}.
340340
* <p>
341341
* The additional configuration parameters that the enhanced client supports are defined
342342
* in the {@link QueryEnhancedRequest}.
343343
* <p>
344-
* This operation calls the low-level DynamoDB API Query operation. Consult the Query documentation for
345-
* further details and constraints.
344+
* This operation calls the low-level DynamoDB API Query operation. Consult the Query documentation
345+
* {@link DynamoDbAsyncClient#queryPaginator} for further details and constraints.
346346
* <p>
347347
* Example:
348+
* <p>
349+
* 1) Subscribing to {@link Page}s
350+
* <pre>
351+
* {@code
352+
*
353+
* QueryConditional queryConditional = QueryConditional.keyEqualTo(Key.builder().partitionValue("id-value").build());
354+
* PagePublisher<MyItem> publisher = mappedTable.query(QueryEnhancedRequest.builder()
355+
* .queryConditional(queryConditional)
356+
* .build());
357+
* publisher.subscribe(page -> page.items().forEach(item -> System.out.println(item)));
358+
* }
359+
* <p>
360+
* 2) Subscribing to items across all pages
348361
* <pre>
349362
* {@code
350363
*
351364
* QueryConditional queryConditional = QueryConditional.keyEqualTo(Key.builder().partitionValue("id-value").build());
352-
* SdkPublisher<Page<MyItem>> publisher = mappedTable.query(QueryEnhancedRequest.builder()
353-
* .queryConditional(queryConditional)
354-
* .build());
365+
* PagePublisher<MyItem> publisher = mappedTable.query(QueryEnhancedRequest.builder()
366+
* .queryConditional(queryConditional)
367+
* .build())
368+
* .items();
369+
* publisher.items().subscribe(item -> System.out.println(item));
355370
* }
356371
* </pre>
357372
*
373+
* @see #query(Consumer)
374+
* @see #query(QueryConditional)
375+
* @see DynamoDbAsyncClient#queryPaginator
358376
* @param request A {@link QueryEnhancedRequest} defining the query conditions and how
359377
* to handle the results.
360-
* @return a publisher {@link SdkPublisher} with paginated results (see {@link Page}).
378+
* @return a publisher {@link PagePublisher} with paginated results (see {@link Page}).
361379
*/
362-
default SdkPublisher<Page<T>> query(QueryEnhancedRequest request) {
380+
default PagePublisher<T> query(QueryEnhancedRequest request) {
363381
throw new UnsupportedOperationException();
364382
}
365383

366384
/**
367385
* Executes a query against the primary index of the table using a {@link QueryConditional} expression to retrieve a list of
368386
* items matching the given conditions.
369387
* <p>
370-
* The result is accessed through iterable pages (see {@link Page}) in an interactive way; each time a
371-
* result page is retrieved, a query call is made to DynamoDb to get those entries. If no matches are found,
372-
* the resulting iterator will contain an empty page. Results are sorted by sort key value in
373-
* ascending order by default; this behavior can be overridden in the {@link QueryEnhancedRequest}.
374-
* <p>
375-
* The additional configuration parameters that the enhanced client supports are defined
376-
* in the {@link QueryEnhancedRequest}.
377-
* <p>
378-
* This operation calls the low-level DynamoDB API Query operation. Consult the Query documentation for
379-
* further details and constraints.
380-
* <p>
381388
* <b>Note:</b> This is a convenience method that creates an instance of the request builder avoiding the need to create one
382389
* manually via {@link QueryEnhancedRequest#builder()}.
383390
* <p>
384391
* Example:
385392
* <pre>
386393
* {@code
387394
*
388-
* SdkPublisher<Page<MyItem>> publisher =
395+
* PagePublisher<MyItem> publisher =
389396
* mappedTable.query(r -> r.queryConditional(QueryConditional.keyEqualTo(k -> k.partitionValue("id-value"))));
390397
* }
391398
* </pre>
392399
*
400+
* @see #query(QueryEnhancedRequest)
401+
* @see #query(QueryConditional)
402+
* @see DynamoDbAsyncClient#queryPaginator
393403
* @param requestConsumer A {@link Consumer} of {@link QueryEnhancedRequest} defining the query conditions and how to
394404
* handle the results.
395-
* @return a publisher {@link SdkPublisher} with paginated results (see {@link Page}).
405+
* @return a publisher {@link PagePublisher} with paginated results (see {@link Page}).
396406
*/
397-
default SdkPublisher<Page<T>> query(Consumer<QueryEnhancedRequest.Builder> requestConsumer) {
407+
default PagePublisher<T> query(Consumer<QueryEnhancedRequest.Builder> requestConsumer) {
398408
throw new UnsupportedOperationException();
399409
}
400410

@@ -414,15 +424,18 @@ default SdkPublisher<Page<T>> query(Consumer<QueryEnhancedRequest.Builder> reque
414424
* <pre>
415425
* {@code
416426
*
417-
* SdkPublisher<Page<MyItem>> results =
427+
* PagePublisher<MyItem> results =
418428
* mappedTable.query(QueryConditional.keyEqualTo(Key.builder().partitionValue("id-value").build()));
419429
* }
420430
* </pre>
421431
*
432+
* @see #query(QueryEnhancedRequest)
433+
* @see #query(Consumer)
434+
* @see DynamoDbAsyncClient#queryPaginator
422435
* @param queryConditional A {@link QueryConditional} defining the matching criteria for records to be queried.
423-
* @return a publisher {@link SdkPublisher} with paginated results (see {@link Page}).
436+
* @return a publisher {@link PagePublisher} with paginated results (see {@link Page}).
424437
*/
425-
default SdkPublisher<Page<T>> query(QueryConditional queryConditional) {
438+
default PagePublisher<T> query(QueryConditional queryConditional) {
426439
throw new UnsupportedOperationException();
427440
}
428441

@@ -503,72 +516,83 @@ default CompletableFuture<Void> putItem(T item) {
503516
/**
504517
* Scans the table and retrieves all items.
505518
* <p>
506-
* The result is accessed through iterable pages (see {@link Page}) in an interactive way; each time a
507-
* result page is retrieved, a scan call is made to DynamoDb to get those entries. If no matches are found,
508-
* the resulting iterator will contain an empty page.
519+
* The return type is a custom publisher that can be subscribed to request a stream of {@link Page}s or
520+
* a stream of flattened items across all pages. Each time a result page is retrieved, a scan call is made
521+
* to DynamoDb to get those entries. If no matches are found, the resulting iterator will contain an empty page.
522+
*
509523
* <p>
510524
* The additional configuration parameters that the enhanced client supports are defined
511525
* in the {@link ScanEnhancedRequest}.
512526
* <p>
513527
* Example:
528+
* <p>
529+
* 1) Subscribing to {@link Page}s
530+
* <pre>
531+
* {@code
532+
*
533+
* PagePublisher<MyItem> publisher = mappedTable.scan(ScanEnhancedRequest.builder().consistentRead(true).build());
534+
* publisher.subscribe(page -> page.items().forEach(item -> System.out.println(item)));
535+
* }
536+
* </pre>
537+
*
538+
* <p>
539+
* 2) Subscribing to items across all pages.
514540
* <pre>
515541
* {@code
516542
*
517-
* SdkPublisher<Page<MyItem>> publisher = mappedTable.scan(ScanEnhancedRequest.builder().consistentRead(true).build());
543+
* PagePublisher<MyItem> publisher = mappedTable.scan(ScanEnhancedRequest.builder().consistentRead(true).build());
544+
* publisher.items().subscribe(item -> System.out.println(item));
518545
* }
519546
* </pre>
520547
*
548+
* @see #scan(Consumer)
549+
* @see #scan()
550+
* @see DynamoDbAsyncClient#scanPaginator
521551
* @param request A {@link ScanEnhancedRequest} defining how to handle the results.
522-
* @return a publisher {@link SdkPublisher} with paginated results (see {@link Page}).
552+
* @return a publisher {@link PagePublisher} with paginated results (see {@link Page}).
523553
*/
524-
default SdkPublisher<Page<T>> scan(ScanEnhancedRequest request) {
554+
default PagePublisher<T> scan(ScanEnhancedRequest request) {
525555
throw new UnsupportedOperationException();
526556
}
527557

528558
/**
529559
* Scans the table and retrieves all items.
530560
* <p>
531-
* The result is accessed through iterable pages (see {@link Page}) in an interactive way; each time a
532-
* result page is retrieved, a scan call is made to DynamoDb to get those entries. If no matches are found,
533-
* the resulting iterator will contain an empty page.
534-
* <p>
535-
* The additional configuration parameters that the enhanced client supports are defined
536-
* in the {@link ScanEnhancedRequest}.
537-
* <p>
538561
* Example:
539562
* <pre>
540563
* {@code
541564
*
542-
* SdkPublisher<Page<MyItem>> publisher = mappedTable.scan(r -> r.limit(5));
565+
* PagePublisher<MyItem> publisher = mappedTable.scan(r -> r.limit(5));
543566
* }
544567
* </pre>
545-
*
568+
*
569+
* @see #scan(ScanEnhancedRequest)
570+
* @see #scan()
571+
* @see DynamoDbAsyncClient#scanPaginator
546572
* @param requestConsumer A {@link Consumer} of {@link ScanEnhancedRequest} defining the query conditions and how to
547573
* handle the results.
548-
* @return a publisher {@link SdkPublisher} with paginated results (see {@link Page}).
574+
* @return a publisher {@link PagePublisher} with paginated results (see {@link Page}).
549575
*/
550-
default SdkPublisher<Page<T>> scan(Consumer<ScanEnhancedRequest.Builder> requestConsumer) {
576+
default PagePublisher<T> scan(Consumer<ScanEnhancedRequest.Builder> requestConsumer) {
551577
throw new UnsupportedOperationException();
552578
}
553579

554580
/**
555581
* Scans the table and retrieves all items using default settings.
556-
* <p>
557-
* The result is accessed through iterable pages (see {@link Page}) in an interactive way; each time a
558-
* result page is retrieved, a scan call is made to DynamoDb to get those entries. If no matches are found,
559-
* the resulting iterator will contain an empty page.
560-
* <p>
582+
*
561583
* Example:
562584
* <pre>
563585
* {@code
564586
*
565-
* SdkPublisher<Page<MyItem>> publisher = mappedTable.scan();
587+
* PagePublisher<MyItem> publisher = mappedTable.scan();
566588
* }
567589
* </pre>
568-
*
569-
* @return a publisher {@link SdkPublisher} with paginated results (see {@link Page}).
590+
* @see #scan(ScanEnhancedRequest)
591+
* @see #scan(Consumer)
592+
* @see DynamoDbAsyncClient#scanPaginator
593+
* @return a publisher {@link PagePublisher} with paginated results (see {@link Page}).
570594
*/
571-
default SdkPublisher<Page<T>> scan() {
595+
default PagePublisher<T> scan() {
572596
throw new UnsupportedOperationException();
573597
}
574598

0 commit comments

Comments
 (0)