Skip to content

Commit 4b51acd

Browse files
feat: Add userClose flag back to StreamWriter (#1973)
* feat: Add userClose flag back to StreamWriter * . * . * . * . * . * fix test failure * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 54e9bb9 commit 4b51acd

File tree

6 files changed

+158
-7
lines changed

6 files changed

+158
-7
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,15 @@ ApiFuture<AppendRowsResponse> append(
279279
return appendInternal(requestBuilder.build());
280280
}
281281

282+
Boolean isUserClosed() {
283+
this.lock.lock();
284+
try {
285+
return userClosed;
286+
} finally {
287+
this.lock.unlock();
288+
}
289+
}
290+
282291
private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
283292
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
284293
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
@@ -384,8 +393,13 @@ public String getWriterId() {
384393
}
385394

386395
boolean isConnectionInUnrecoverableState() {
387-
// If final status is set, there's no
388-
return connectionFinalStatus != null;
396+
this.lock.lock();
397+
try {
398+
// If final status is set, there's no
399+
return connectionFinalStatus != null;
400+
} finally {
401+
this.lock.unlock();
402+
}
389403
}
390404

391405
/** Close the stream writer. Shut down all resources. */
@@ -821,7 +835,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {
821835
}
822836

823837
// Class that wraps AppendRowsRequest and its corresponding Response future.
824-
private static final class AppendRequestAndResponse {
838+
static final class AppendRequestAndResponse {
825839
final SettableApiFuture<AppendRowsResponse> appendResult;
826840
final AppendRowsRequest message;
827841
final long messageSize;

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
379379
connectionWorkerPool.add(connectionWorker);
380380
log.info(
381381
String.format(
382-
"Scaling up new connection for stream name: %s, pool size after scaling up %s",
382+
"Scaling up new connection for stream name: %s, pool size after scaling up %d",
383383
streamName, connectionWorkerPool.size()));
384384
return connectionWorker;
385385
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import com.google.api.gax.rpc.TransportChannelProvider;
2323
import com.google.auto.value.AutoOneOf;
2424
import com.google.auto.value.AutoValue;
25+
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
2526
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
27+
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
2628
import com.google.common.annotations.VisibleForTesting;
2729
import com.google.common.base.Preconditions;
2830
import io.grpc.Status;
@@ -36,6 +38,8 @@
3638
import java.util.UUID;
3739
import java.util.concurrent.ConcurrentHashMap;
3840
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.locks.Lock;
3943
import java.util.logging.Logger;
4044
import java.util.regex.Matcher;
4145
import java.util.regex.Pattern;
@@ -70,6 +74,11 @@ public class StreamWriter implements AutoCloseable {
7074
*/
7175
private final String location;
7276

77+
/*
78+
* If user has closed the StreamWriter.
79+
*/
80+
private AtomicBoolean userClosed = new AtomicBoolean(false);
81+
7382
/*
7483
* A String that uniquely identifies this writer.
7584
*/
@@ -94,6 +103,8 @@ public class StreamWriter implements AutoCloseable {
94103
/** Creation timestamp of this streamwriter */
95104
private final long creationTimestamp;
96105

106+
private Lock lock;
107+
97108
/** The maximum size of one request. Defined by the API. */
98109
public static long getApiMaxRequestBytes() {
99110
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -363,6 +374,17 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
363374
* @return the append response wrapped in a future.
364375
*/
365376
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
377+
if (userClosed.get()) {
378+
AppendRequestAndResponse requestWrapper =
379+
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build());
380+
requestWrapper.appendResult.setException(
381+
new Exceptions.StreamWriterClosedException(
382+
Status.fromCode(Status.Code.FAILED_PRECONDITION)
383+
.withDescription("User closed StreamWriter"),
384+
streamName,
385+
getWriterId()));
386+
return requestWrapper.appendResult;
387+
}
366388
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
367389
}
368390

@@ -398,9 +420,25 @@ public String getLocation() {
398420
return location;
399421
}
400422

423+
/**
424+
* @return if a stream writer can no longer be used for writing. It is due to either the
425+
* StreamWriter is explicitly closed or the underlying connection is broken when connection
426+
* pool is not used. Client should recreate StreamWriter in this case.
427+
*/
428+
public boolean isDone() {
429+
if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) {
430+
return userClosed.get()
431+
|| singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState();
432+
} else {
433+
// With ConnectionPool, we will replace the bad connection automatically.
434+
return userClosed.get();
435+
}
436+
}
437+
401438
/** Close the stream writer. Shut down all resources. */
402439
@Override
403440
public void close() {
441+
userClosed.set(true);
404442
singleConnectionOrConnectionPool.close(this);
405443
}
406444

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.api.gax.grpc.testing.MockGrpcService;
1919
import com.google.protobuf.AbstractMessage;
2020
import io.grpc.ServerServiceDefinition;
21+
import io.grpc.Status;
2122
import java.util.LinkedList;
2223
import java.util.List;
2324
import java.util.concurrent.ScheduledExecutorService;
@@ -102,4 +103,8 @@ public long getConnectionCount() {
102103
public void setExecutor(ScheduledExecutorService executor) {
103104
serviceImpl.setExecutor(executor);
104105
}
106+
107+
public void setFailedStatus(Status failedStatus) {
108+
serviceImpl.setFailedStatus(failedStatus);
109+
}
105110
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
6262
// Record whether the first record has been seen on a connection.
6363
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
6464
new ConcurrentHashMap<>();
65+
private Status failedStatus = Status.ABORTED;
6566

6667
/** Class used to save the state of a possible response. */
6768
private static class Response {
@@ -138,6 +139,10 @@ public long getConnectionCount() {
138139
return connectionCount;
139140
}
140141

142+
public void setFailedStatus(Status failedStatus) {
143+
this.failedStatus = failedStatus;
144+
}
145+
141146
@Override
142147
public StreamObserver<AppendRowsRequest> appendRows(
143148
final StreamObserver<AppendRowsResponse> responseObserver) {
@@ -177,10 +182,10 @@ public void onNext(AppendRowsRequest value) {
177182
&& recordCount % closeAfter == 0
178183
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
179184
LOG.info("Shutting down connection from test...");
180-
responseObserver.onError(Status.ABORTED.asException());
185+
responseObserver.onError(failedStatus.asException());
181186
} else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
182187
LOG.info("Shutting down connection from test...");
183-
responseObserver.onError(Status.ABORTED.asException());
188+
responseObserver.onError(failedStatus.asException());
184189
} else {
185190
final Response response = responses.get(offset);
186191
sendResponse(response, responseObserver);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.api.gax.grpc.testing.MockServiceHelper;
3131
import com.google.api.gax.rpc.AbortedException;
3232
import com.google.api.gax.rpc.ApiException;
33+
import com.google.api.gax.rpc.InvalidArgumentException;
3334
import com.google.api.gax.rpc.StatusCode.Code;
3435
import com.google.api.gax.rpc.UnknownException;
3536
import com.google.cloud.bigquery.storage.test.Test.FooType;
@@ -1037,7 +1038,7 @@ public void testWriterAlreadyClosedException() throws Exception {
10371038
// The basic StatusRuntimeException API is not changed.
10381039
assertTrue(actualError instanceof StatusRuntimeException);
10391040
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
1040-
assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed"));
1041+
assertTrue(actualError.getStatus().getDescription().contains("User closed StreamWriter"));
10411042
assertEquals(actualError.getWriterId(), writer.getWriterId());
10421043
assertEquals(actualError.getStreamName(), writer.getStreamName());
10431044
}
@@ -1225,4 +1226,92 @@ public void testCloseDisconnectedStream() throws Exception {
12251226
// Ensure closing the writer after disconnect succeeds.
12261227
writer.close();
12271228
}
1229+
1230+
@Test(timeout = 10000)
1231+
public void testStreamWriterUserCloseMultiplexing() throws Exception {
1232+
StreamWriter writer =
1233+
StreamWriter.newBuilder(TEST_STREAM_1, client)
1234+
.setWriterSchema(createProtoSchema())
1235+
.setEnableConnectionPool(true)
1236+
.setLocation("us")
1237+
.build();
1238+
1239+
writer.close();
1240+
assertTrue(writer.isDone());
1241+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1242+
ExecutionException ex =
1243+
assertThrows(
1244+
ExecutionException.class,
1245+
() -> {
1246+
appendFuture1.get();
1247+
});
1248+
assertEquals(
1249+
Status.Code.FAILED_PRECONDITION,
1250+
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
1251+
}
1252+
1253+
@Test(timeout = 10000)
1254+
public void testStreamWriterUserCloseNoMultiplexing() throws Exception {
1255+
StreamWriter writer =
1256+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build();
1257+
1258+
writer.close();
1259+
assertTrue(writer.isDone());
1260+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1261+
ExecutionException ex =
1262+
assertThrows(
1263+
ExecutionException.class,
1264+
() -> {
1265+
appendFuture1.get();
1266+
});
1267+
assertEquals(
1268+
Status.Code.FAILED_PRECONDITION,
1269+
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
1270+
}
1271+
1272+
@Test(timeout = 10000)
1273+
public void testStreamWriterPermanentErrorMultiplexing() throws Exception {
1274+
StreamWriter writer =
1275+
StreamWriter.newBuilder(TEST_STREAM_1, client)
1276+
.setWriterSchema(createProtoSchema())
1277+
.setEnableConnectionPool(true)
1278+
.setLocation("us")
1279+
.build();
1280+
testBigQueryWrite.setCloseForeverAfter(1);
1281+
// Permenant errror.
1282+
testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT);
1283+
testBigQueryWrite.addResponse(createAppendResponse(0));
1284+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1285+
appendFuture1.get();
1286+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"});
1287+
ExecutionException ex =
1288+
assertThrows(
1289+
ExecutionException.class,
1290+
() -> {
1291+
appendFuture2.get();
1292+
});
1293+
assertTrue(ex.getCause() instanceof InvalidArgumentException);
1294+
assertFalse(writer.isDone());
1295+
}
1296+
1297+
@Test(timeout = 10000)
1298+
public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
1299+
StreamWriter writer =
1300+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build();
1301+
testBigQueryWrite.setCloseForeverAfter(1);
1302+
// Permenant errror.
1303+
testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT);
1304+
testBigQueryWrite.addResponse(createAppendResponse(0));
1305+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1306+
appendFuture1.get();
1307+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"});
1308+
ExecutionException ex =
1309+
assertThrows(
1310+
ExecutionException.class,
1311+
() -> {
1312+
appendFuture2.get();
1313+
});
1314+
assertTrue(writer.isDone());
1315+
assertTrue(ex.getCause() instanceof InvalidArgumentException);
1316+
}
12281317
}

0 commit comments

Comments
 (0)