Skip to content

Commit 5c8e1f6

Browse files
feat: add a query paginator (#1530)
* feat: add a query paginator * add some comments * add a test for full table scan * fix format * address comments * update * fix test * fix nit * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent ee98338 commit 5c8e1f6

File tree

2 files changed

+269
-0
lines changed
  • google-cloud-bigtable/src
    • main/java/com/google/cloud/bigtable/data/v2/models
    • test/java/com/google/cloud/bigtable/data/v2/models

2 files changed

+269
-0
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.models;
1717

18+
import com.google.api.core.BetaApi;
1819
import com.google.api.core.InternalApi;
1920
import com.google.bigtable.v2.ReadRowsRequest;
2021
import com.google.bigtable.v2.RowFilter;
@@ -248,6 +249,29 @@ public List<Query> shard(SortedSet<ByteString> splitPoints) {
248249
return shards;
249250
}
250251

252+
/**
253+
* Create a query paginator that'll split the query into smaller chunks.
254+
*
255+
* <p>Example usage:
256+
*
257+
* <pre>{@code
258+
* Query query = Query.create(...).range("a", "z");
259+
* Query.QueryPaginator paginator = query.createQueryPaginator(100);
260+
* ByteString lastSeenRowKey = ByteString.EMPTY;
261+
* do {
262+
* List<Row> rows = client.readRowsCallable().all().call(paginator.getNextQuery());
263+
* for (Row row : rows) {
264+
* // do some processing
265+
* lastSeenRow = row;
266+
* }
267+
* } while (paginator.advance(lastSeenRowKey));
268+
* }</pre>
269+
*/
270+
@BetaApi("This surface is stable yet it might be removed in the future.")
271+
public QueryPaginator createPaginator(int pageSize) {
272+
return new QueryPaginator(this, pageSize);
273+
}
274+
251275
/** Get the minimal range that encloses all of the row keys and ranges in this Query. */
252276
public ByteStringRange getBound() {
253277
return RowSetUtil.getBound(builder.getRows());
@@ -297,6 +321,73 @@ private static ByteString wrapKey(String key) {
297321
return ByteString.copyFromUtf8(key);
298322
}
299323

324+
/**
325+
* A Query Paginator that will split a query into small chunks. See {@link
326+
* Query#createPaginator(int)} for example usage.
327+
*/
328+
@BetaApi("This surface is stable yet it might be removed in the future.")
329+
public static class QueryPaginator {
330+
331+
private final boolean hasOverallLimit;
332+
private long remainingRows;
333+
private Query query;
334+
private final int pageSize;
335+
private ByteString prevSplitPoint;
336+
337+
QueryPaginator(@Nonnull Query query, int pageSize) {
338+
this.hasOverallLimit = query.builder.getRowsLimit() > 0;
339+
this.remainingRows = query.builder.getRowsLimit();
340+
this.query = query.limit(pageSize);
341+
if (hasOverallLimit) {
342+
remainingRows -= pageSize;
343+
}
344+
this.pageSize = pageSize;
345+
this.prevSplitPoint = ByteString.EMPTY;
346+
}
347+
348+
/** Return the next query. */
349+
public Query getNextQuery() {
350+
return query;
351+
}
352+
353+
/**
354+
* Construct the next query. Return true if there are more queries to return. False if we've
355+
* read everything.
356+
*/
357+
public boolean advance(@Nonnull ByteString lastSeenRowKey) {
358+
Preconditions.checkNotNull(
359+
lastSeenRowKey, "lastSeenRowKey cannot be null, use ByteString.EMPTY instead.");
360+
// Full table scans don't have ranges or limits. Running the query again will return an empty
361+
// list when we reach the end of the table. lastSeenRowKey won't be updated in this case, and
362+
// we can break out of the loop.
363+
if (lastSeenRowKey.equals(prevSplitPoint)) {
364+
return false;
365+
}
366+
this.prevSplitPoint = lastSeenRowKey;
367+
368+
// Set the query limit. If the original limit is set, return false if the new
369+
// limit is <= 0 to avoid returning more rows than intended.
370+
if (hasOverallLimit && remainingRows <= 0) {
371+
return false;
372+
}
373+
if (hasOverallLimit) {
374+
query.limit(Math.min(this.pageSize, remainingRows));
375+
remainingRows -= pageSize;
376+
} else {
377+
query.limit(pageSize);
378+
}
379+
380+
// Split the row ranges / row keys. Return false if there's nothing
381+
// left on the right of the split point.
382+
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
383+
if (split.getRight() == null) {
384+
return false;
385+
}
386+
query.builder.setRows(split.getRight());
387+
return true;
388+
}
389+
}
390+
300391
@Override
301392
public boolean equals(Object o) {
302393
if (this == o) {

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,4 +327,182 @@ public void testClone() {
327327
assertThat(clonedReq).isEqualTo(query);
328328
assertThat(clonedReq.toProto(requestContext)).isEqualTo(request);
329329
}
330+
331+
@Test
332+
public void testQueryPaginatorRangeLimitReached() {
333+
int chunkSize = 10, limit = 15;
334+
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
335+
Query.QueryPaginator paginator = query.createPaginator(chunkSize);
336+
337+
Query nextQuery = paginator.getNextQuery();
338+
339+
Builder expectedProto =
340+
expectedProtoBuilder()
341+
.setRows(
342+
RowSet.newBuilder()
343+
.addRowRanges(
344+
RowRange.newBuilder()
345+
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
346+
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
347+
.build()))
348+
.setRowsLimit(chunkSize);
349+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
350+
351+
assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
352+
int expectedLimit = limit - chunkSize;
353+
nextQuery = paginator.getNextQuery();
354+
expectedProto =
355+
expectedProtoBuilder()
356+
.setRows(
357+
RowSet.newBuilder()
358+
.addRowRanges(
359+
RowRange.newBuilder()
360+
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
361+
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
362+
.build()))
363+
.setRowsLimit(expectedLimit);
364+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
365+
366+
assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
367+
}
368+
369+
@Test
370+
public void testQueryPaginatorRangeLimitMultiplyOfChunkSize() {
371+
int chunkSize = 10, limit = 20;
372+
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
373+
Query.QueryPaginator paginator = query.createPaginator(chunkSize);
374+
375+
Query nextQuery = paginator.getNextQuery();
376+
377+
Builder expectedProto =
378+
expectedProtoBuilder()
379+
.setRows(
380+
RowSet.newBuilder()
381+
.addRowRanges(
382+
RowRange.newBuilder()
383+
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
384+
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
385+
.build()))
386+
.setRowsLimit(chunkSize);
387+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
388+
389+
assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
390+
int expectedLimit = limit - chunkSize;
391+
nextQuery = paginator.getNextQuery();
392+
expectedProto =
393+
expectedProtoBuilder()
394+
.setRows(
395+
RowSet.newBuilder()
396+
.addRowRanges(
397+
RowRange.newBuilder()
398+
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
399+
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
400+
.build()))
401+
.setRowsLimit(expectedLimit);
402+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
403+
404+
assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
405+
}
406+
407+
@Test
408+
public void testQueryPaginatorRagneNoLimit() {
409+
int chunkSize = 10;
410+
Query query = Query.create(TABLE_ID).range("a", "z");
411+
Query.QueryPaginator paginator = query.createPaginator(chunkSize);
412+
413+
Query nextQuery = paginator.getNextQuery();
414+
415+
Builder expectedProto =
416+
expectedProtoBuilder()
417+
.setRows(
418+
RowSet.newBuilder()
419+
.addRowRanges(
420+
RowRange.newBuilder()
421+
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
422+
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
423+
.build()))
424+
.setRowsLimit(chunkSize);
425+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
426+
427+
assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
428+
nextQuery = paginator.getNextQuery();
429+
expectedProto
430+
.setRows(
431+
RowSet.newBuilder()
432+
.addRowRanges(
433+
RowRange.newBuilder()
434+
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
435+
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
436+
.build()))
437+
.setRowsLimit(chunkSize);
438+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
439+
440+
assertThat(paginator.advance(ByteString.copyFromUtf8("z"))).isFalse();
441+
}
442+
443+
@Test
444+
public void testQueryPaginatorRowsNoLimit() {
445+
int chunkSize = 10;
446+
Query query = Query.create(TABLE_ID).rowKey("a").rowKey("b").rowKey("c");
447+
448+
Query.QueryPaginator paginator = query.createPaginator(chunkSize);
449+
450+
Query nextQuery = paginator.getNextQuery();
451+
452+
ReadRowsRequest.Builder expectedProto = expectedProtoBuilder();
453+
expectedProto
454+
.getRowsBuilder()
455+
.addRowKeys(ByteString.copyFromUtf8("a"))
456+
.addRowKeys(ByteString.copyFromUtf8("b"))
457+
.addRowKeys(ByteString.copyFromUtf8("c"));
458+
expectedProto.setRowsLimit(chunkSize);
459+
460+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
461+
462+
paginator.advance(ByteString.copyFromUtf8("b"));
463+
nextQuery = paginator.getNextQuery();
464+
expectedProto = expectedProtoBuilder();
465+
expectedProto.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8("c"));
466+
expectedProto.setRowsLimit(chunkSize);
467+
468+
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());
469+
470+
assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isFalse();
471+
}
472+
473+
@Test
474+
public void testQueryPaginatorFullTableScan() {
475+
int chunkSize = 10;
476+
Query query = Query.create(TABLE_ID);
477+
Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize);
478+
479+
ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize);
480+
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
481+
.isEqualTo(expectedProto.build());
482+
483+
assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isTrue();
484+
expectedProto
485+
.setRows(
486+
RowSet.newBuilder()
487+
.addRowRanges(
488+
RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("a")).build()))
489+
.setRowsLimit(chunkSize);
490+
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
491+
.isEqualTo(expectedProto.build());
492+
493+
assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isFalse();
494+
}
495+
496+
@Test
497+
public void testQueryPaginatorEmptyTable() {
498+
int chunkSize = 10;
499+
Query query = Query.create(TABLE_ID);
500+
Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize);
501+
502+
ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize);
503+
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
504+
.isEqualTo(expectedProto.build());
505+
506+
assertThat(queryPaginator.advance(ByteString.EMPTY)).isFalse();
507+
}
330508
}

0 commit comments

Comments
 (0)