Skip to content

Commit 1e9a8ca

Browse files
feat: add public api to stream writer to set the maximum wait time (#2066)
* feat: add public api to stream writer to set the maximum wait time * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify back the readme change from owl post processor * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent a9f8c7e commit 1e9a8ca

File tree

4 files changed

+45
-4
lines changed

4 files changed

+45
-4
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
5656
If you are using Gradle without BOM, add this to your dependencies:
5757

5858
```Groovy
59-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.1'
59+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.2'
6060
```
6161

6262
If you are using SBT, add this to your dependencies:
6363

6464
```Scala
65-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.1"
65+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.2"
6666
```
6767
<!-- {x-version-update-end} -->
6868

@@ -219,7 +219,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
219219
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
220220
[stability-image]: https://img.shields.io/badge/stability-stable-green
221221
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
222-
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.1
222+
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.2
223223
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
224224
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
225225
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class ConnectionWorker implements AutoCloseable {
7575
* We will constantly checking how much time we have been waiting for the next request callback
7676
* if we wait too much time we will start shutting down the connections and clean up the queues.
7777
*/
78-
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
78+
static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
7979

8080
private Lock lock;
8181
private Condition hasMessageInWaitingQueue;

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,16 @@ public synchronized TableSchema getUpdatedSchema() {
518518
: null;
519519
}
520520

521+
/**
522+
* Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very
523+
* low chance, it's possible for append request to be waiting indefintely for request callback
524+
* when Google networking SDK does not detect the networking breakage. The default timeout is 15
525+
* minutes. We are investigating the root cause for callback not triggered by networking SDK.
526+
*/
527+
public static void setMaxRequestCallbackWaitTime(Duration waitTime) {
528+
ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
529+
}
530+
521531
long getCreationTimestamp() {
522532
return creationTimestamp;
523533
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigquery.storage.v1;
1717

18+
import static com.google.common.truth.Truth.assertThat;
1819
import static org.junit.Assert.assertEquals;
1920
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertThrows;
@@ -113,6 +114,7 @@ public StreamWriterTest() throws DescriptorValidationException {}
113114
@Before
114115
public void setUp() throws Exception {
115116
testBigQueryWrite = new FakeBigQueryWrite();
117+
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(10000));
116118
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
117119
serviceHelper =
118120
new MockServiceHelper(
@@ -947,6 +949,35 @@ public void testMessageTooLarge() throws Exception {
947949
writer.close();
948950
}
949951

952+
@Test
953+
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
954+
ProtoSchema schema1 = createProtoSchema("foo");
955+
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(1));
956+
StreamWriter writer =
957+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
958+
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));
959+
960+
long appendCount = 10;
961+
for (int i = 0; i < appendCount; i++) {
962+
testBigQueryWrite.addResponse(createAppendResponse(i));
963+
}
964+
965+
// In total insert 5 requests,
966+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
967+
for (int i = 0; i < appendCount; i++) {
968+
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
969+
}
970+
971+
for (int i = 0; i < appendCount; i++) {
972+
int finalI = i;
973+
ExecutionException ex =
974+
assertThrows(
975+
ExecutionException.class,
976+
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
977+
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
978+
}
979+
}
980+
950981
@Test
951982
public void testAppendWithResetSuccess() throws Exception {
952983
try (StreamWriter writer = getTestStreamWriter()) {

0 commit comments

Comments
 (0)