Skip to content

Commit 9b9b5c0

Browse files
GaoleMengrelease-please[bot]gcf-owl-bot[bot]
authored
fix: Populate final stauts to initial request during connection shutdown (#2228)
* chore(main): release 2.41.1 (#2222) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> * chore(main): release 2.41.1 (#2222) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent b85c562 commit 9b9b5c0

File tree

4 files changed

+56
-13
lines changed

4 files changed

+56
-13
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
5050
If you are using Gradle 5.x or later, add this to your dependencies:
5151

5252
```Groovy
53-
implementation platform('com.google.cloud:libraries-bom:26.21.0')
53+
implementation platform('com.google.cloud:libraries-bom:26.22.0')
5454
5555
implementation 'com.google.cloud:google-cloud-bigquerystorage'
5656
```
5757
If you are using Gradle without BOM, add this to your dependencies:
5858

5959
```Groovy
60-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.41.0'
60+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.41.1'
6161
```
6262

6363
If you are using SBT, add this to your dependencies:
6464

6565
```Scala
66-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.41.0"
66+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.41.1"
6767
```
6868
<!-- {x-version-update-end} -->
6969

@@ -220,7 +220,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
220220
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
221221
[stability-image]: https://img.shields.io/badge/stability-stable-green
222222
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
223-
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.41.0
223+
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.41.1
224224
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
225225
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
226226
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -839,8 +839,24 @@ private void cleanupInflightRequests() {
839839
+ streamName
840840
+ " id: "
841841
+ writerId);
842-
while (!localQueue.isEmpty()) {
843-
localQueue.pollFirst().appendResult.setException(finalStatus);
842+
int sizeOfQueue = localQueue.size();
843+
for (int i = 0; i < sizeOfQueue; i++) {
844+
if (i == 0) {
845+
localQueue.pollFirst().appendResult.setException(finalStatus);
846+
} else {
847+
localQueue
848+
.pollFirst()
849+
.appendResult
850+
.setException(
851+
new Exceptions.StreamWriterClosedException(
852+
Status.fromCode(Code.ABORTED)
853+
.withDescription(
854+
"Connection is aborted due to an unrecoverable failure of "
855+
+ "another request sharing the connection. Please retry this "
856+
+ "request."),
857+
streamName,
858+
writerId));
859+
}
844860
}
845861
}
846862

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,11 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
418418
assertThrows(
419419
ExecutionException.class,
420420
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
421-
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
421+
if (i == 0) {
422+
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
423+
} else {
424+
assertThat(ex.getCause()).hasMessageThat().contains("Connection is aborted due to ");
425+
}
422426
}
423427

424428
// The future append will directly fail.
@@ -654,7 +658,11 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
654658
assertThrows(
655659
ExecutionException.class,
656660
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
657-
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
661+
if (i == 0) {
662+
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
663+
} else {
664+
assertThat(ex.getCause()).hasMessageThat().contains("Connection is aborted due to ");
665+
}
658666
}
659667

660668
// The future append will directly fail.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.google.api.gax.rpc.UnknownException;
4040
import com.google.cloud.bigquery.storage.test.Test.FooType;
4141
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
42+
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
4243
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
4344
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
4445
import com.google.common.base.Strings;
@@ -666,8 +667,12 @@ public void serverCloseWhileRequestsInflight() throws Exception {
666667

667668
// Server close should properly handle all inflight requests.
668669
for (int i = 0; i < appendCount; i++) {
669-
ApiException actualError = assertFutureException(ApiException.class, futures.get(i));
670-
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode());
670+
if (i == 0) {
671+
ApiException actualError = assertFutureException(ApiException.class, futures.get(i));
672+
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode());
673+
} else {
674+
assertFutureException(StreamWriterClosedException.class, futures.get(i));
675+
}
671676
}
672677

673678
writer.close();
@@ -988,7 +993,13 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
988993
assertThrows(
989994
ExecutionException.class,
990995
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
991-
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
996+
if (i == 0) {
997+
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
998+
} else {
999+
assertThat(ex.getCause())
1000+
.hasMessageThat()
1001+
.contains("Connection is aborted due to an unrecoverable");
1002+
}
9921003
}
9931004
}
9941005

@@ -1027,7 +1038,11 @@ public void testAppendWithResetNeverSuccess() throws Exception {
10271038
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
10281039
// after 5 seconds, the requests will bail out.
10291040
for (int i = 1; i < appendCount; i++) {
1030-
assertFutureException(AbortedException.class, futures.get(i));
1041+
if (i == 1) {
1042+
assertFutureException(AbortedException.class, futures.get(i));
1043+
} else {
1044+
assertFutureException(StreamWriterClosedException.class, futures.get(i));
1045+
}
10311046
}
10321047
}
10331048
}
@@ -1048,7 +1063,11 @@ public void testAppendWithResetNeverSuccessWithMultiplexing() throws Exception {
10481063
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
10491064
// after 5 seconds, the requests will bail out.
10501065
for (int i = 1; i < appendCount; i++) {
1051-
assertFutureException(AbortedException.class, futures.get(i));
1066+
if (i == 1) {
1067+
assertFutureException(AbortedException.class, futures.get(i));
1068+
} else {
1069+
assertFutureException(StreamWriterClosedException.class, futures.get(i));
1070+
}
10521071
}
10531072
}
10541073
}

0 commit comments

Comments
 (0)