|
18 | 18 | import static com.google.common.truth.Truth.assertThat;
|
19 | 19 | import static com.google.common.truth.TruthJUnit.assume;
|
20 | 20 |
|
| 21 | +import com.google.api.core.ApiFunction; |
21 | 22 | import com.google.api.core.ApiFuture;
|
22 | 23 | import com.google.api.core.ApiFutureCallback;
|
23 | 24 | import com.google.api.core.ApiFutures;
|
24 | 25 | import com.google.api.core.SettableApiFuture;
|
| 26 | +import com.google.api.gax.batching.Batcher; |
| 27 | +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; |
25 | 28 | import com.google.api.gax.rpc.ResponseObserver;
|
26 | 29 | import com.google.api.gax.rpc.StreamController;
|
27 | 30 | import com.google.cloud.bigtable.data.v2.BigtableDataClient;
|
| 31 | +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; |
28 | 32 | import com.google.cloud.bigtable.data.v2.models.BulkMutation;
|
29 | 33 | import com.google.cloud.bigtable.data.v2.models.Query;
|
30 | 34 | import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
|
|
38 | 42 | import com.google.common.collect.Lists;
|
39 | 43 | import com.google.common.util.concurrent.MoreExecutors;
|
40 | 44 | import com.google.protobuf.ByteString;
|
| 45 | +import io.grpc.CallOptions; |
| 46 | +import io.grpc.Channel; |
| 47 | +import io.grpc.ClientCall; |
| 48 | +import io.grpc.ClientInterceptor; |
| 49 | +import io.grpc.ManagedChannelBuilder; |
| 50 | +import io.grpc.MethodDescriptor; |
| 51 | +import java.io.IOException; |
41 | 52 | import java.util.ArrayList;
|
| 53 | +import java.util.Collections; |
42 | 54 | import java.util.List;
|
| 55 | +import java.util.Random; |
43 | 56 | import java.util.UUID;
|
44 | 57 | import java.util.concurrent.CountDownLatch;
|
45 | 58 | import java.util.concurrent.ExecutionException;
|
@@ -308,6 +321,92 @@ public void reversed() {
|
308 | 321 | .inOrder();
|
309 | 322 | }
|
310 | 323 |
|
| 324 | + @Test |
| 325 | + public void reversedWithForcedResumption() throws IOException, InterruptedException { |
| 326 | + assume() |
| 327 | + .withMessage("reverse scans are not supported in the emulator") |
| 328 | + .that(testEnvRule.env()) |
| 329 | + .isNotInstanceOf(EmulatorEnv.class); |
| 330 | + |
| 331 | + BigtableDataClient client = testEnvRule.env().getDataClient(); |
| 332 | + String tableId = testEnvRule.env().getTableId(); |
| 333 | + String familyId = testEnvRule.env().getFamilyId(); |
| 334 | + String uniqueKey = prefix + "-rev-queries2"; |
| 335 | + |
| 336 | + // Add enough rows that ensures resumption logic is forced |
| 337 | + Random random; |
| 338 | + List<Row> expectedResults; |
| 339 | + try (Batcher<RowMutationEntry, Void> batcher = client.newBulkMutationBatcher(tableId)) { |
| 340 | + |
| 341 | + byte[] valueBytes = new byte[1024]; |
| 342 | + random = new Random(); |
| 343 | + |
| 344 | + expectedResults = new ArrayList<>(); |
| 345 | + |
| 346 | + for (int i = 0; i < 2 * 1024; i++) { |
| 347 | + ByteString key = ByteString.copyFromUtf8(String.format("%s-%05d", uniqueKey, i)); |
| 348 | + ByteString qualifier = ByteString.copyFromUtf8("q"); |
| 349 | + long timestamp = System.currentTimeMillis() * 1000; |
| 350 | + random.nextBytes(valueBytes); |
| 351 | + ByteString value = ByteString.copyFrom(valueBytes); |
| 352 | + |
| 353 | + batcher.add(RowMutationEntry.create(key).setCell(familyId, qualifier, timestamp, value)); |
| 354 | + expectedResults.add( |
| 355 | + Row.create( |
| 356 | + key, |
| 357 | + ImmutableList.of( |
| 358 | + RowCell.create(familyId, qualifier, timestamp, ImmutableList.of(), value)))); |
| 359 | + } |
| 360 | + } |
| 361 | + Collections.reverse(expectedResults); |
| 362 | + |
| 363 | + BigtableDataSettings.Builder settingsBuilder = |
| 364 | + testEnvRule.env().getDataClientSettings().toBuilder(); |
| 365 | + |
| 366 | + settingsBuilder.stubSettings().readRowsSettings().retrySettings().setMaxAttempts(100); |
| 367 | + |
| 368 | + InstantiatingGrpcChannelProvider.Builder transport = |
| 369 | + ((InstantiatingGrpcChannelProvider) |
| 370 | + settingsBuilder.stubSettings().getTransportChannelProvider()) |
| 371 | + .toBuilder(); |
| 372 | + ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConfigurator = |
| 373 | + transport.getChannelConfigurator(); |
| 374 | + |
| 375 | + // Randomly camp the deadline to force a timeout to force a retry |
| 376 | + transport.setChannelConfigurator( |
| 377 | + (ManagedChannelBuilder c) -> { |
| 378 | + if (oldConfigurator != null) { |
| 379 | + c = oldConfigurator.apply(c); |
| 380 | + } |
| 381 | + return c.intercept( |
| 382 | + new ClientInterceptor() { |
| 383 | + @Override |
| 384 | + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( |
| 385 | + MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { |
| 386 | + if (method.getBareMethodName().equals("ReadRows")) { |
| 387 | + callOptions = |
| 388 | + callOptions.withDeadlineAfter(random.nextInt(200), TimeUnit.MILLISECONDS); |
| 389 | + } |
| 390 | + |
| 391 | + return next.newCall(method, callOptions); |
| 392 | + } |
| 393 | + }); |
| 394 | + }); |
| 395 | + settingsBuilder.stubSettings().setTransportChannelProvider(transport.build()); |
| 396 | + |
| 397 | + try (BigtableDataClient patchedClient = BigtableDataClient.create(settingsBuilder.build())) { |
| 398 | + for (int i = 0; i < 10; i++) { |
| 399 | + List<Row> actualResults = new ArrayList<>(); |
| 400 | + for (Row row : |
| 401 | + patchedClient.readRows(Query.create(tableId).prefix(uniqueKey).reversed(true))) { |
| 402 | + actualResults.add(row); |
| 403 | + Thread.sleep(1); |
| 404 | + } |
| 405 | + assertThat(actualResults).containsExactlyElementsIn(expectedResults).inOrder(); |
| 406 | + } |
| 407 | + } |
| 408 | + } |
| 409 | + |
311 | 410 | @Test
|
312 | 411 | public void readSingleNonexistentAsyncCallback() throws Exception {
|
313 | 412 | ApiFuture<Row> future =
|
|
0 commit comments