Skip to content

Commit e5cd7df

Browse files
feat: fix windows build failure by using nanoSeconds instead of Instant for better accuracy. (#1887)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 0def62f commit e5cd7df

File tree

5 files changed

+8
-16
lines changed

5 files changed

+8
-16
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.grpc.Status.Code;
3232
import io.grpc.StatusRuntimeException;
3333
import java.io.IOException;
34-
import java.time.Instant;
3534
import java.util.Comparator;
3635
import java.util.Deque;
3736
import java.util.HashMap;
@@ -610,7 +609,7 @@ private void requestCallback(AppendRowsResponse response) {
610609
this.lock.lock();
611610
if (response.hasUpdatedSchema()) {
612611
this.updatedSchema =
613-
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema());
612+
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema());
614613
}
615614
try {
616615
// Had a successful connection with at least one result, reset retries.
@@ -824,12 +823,12 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) {
824823
@AutoValue
825824
abstract static class TableSchemaAndTimestamp {
826825
// Shows the timestamp updated schema is reported from response
827-
abstract Instant updateTimeStamp();
826+
abstract long updateTimeStamp();
828827

829828
// The updated schema returned from server.
830829
abstract TableSchema updatedSchema();
831830

832-
static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) {
831+
static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) {
833832
return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
834833
}
835834
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.common.collect.ImmutableList;
2727
import com.google.common.util.concurrent.MoreExecutors;
2828
import java.io.IOException;
29-
import java.time.Instant;
3029
import java.util.Collections;
3130
import java.util.Comparator;
3231
import java.util.HashSet;
@@ -268,7 +267,7 @@ public ApiFuture<AppendRowsResponse> append(
268267
if (response.getWriteStream() != "" && response.hasUpdatedSchema()) {
269268
tableNameToUpdatedSchema.put(
270269
response.getWriteStream(),
271-
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()));
270+
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()));
272271
}
273272
return response;
274273
},

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.grpc.Status.Code;
3030
import io.grpc.StatusRuntimeException;
3131
import java.io.IOException;
32-
import java.time.Instant;
3332
import java.util.Map;
3433
import java.util.Objects;
3534
import java.util.UUID;
@@ -88,7 +87,7 @@ public class StreamWriter implements AutoCloseable {
8887
new ConcurrentHashMap<>();
8988

9089
/** Creation timestamp of this streamwriter */
91-
private final Instant creationTimestamp;
90+
private final long creationTimestamp;
9291

9392
/** The maximum size of one request. Defined by the API. */
9493
public static long getApiMaxRequestBytes() {
@@ -260,7 +259,7 @@ private StreamWriter(Builder builder) throws IOException {
260259
client.close();
261260
}
262261
}
263-
this.creationTimestamp = Instant.now();
262+
this.creationTimestamp = System.nanoTime();
264263
}
265264

266265
@VisibleForTesting
@@ -414,12 +413,12 @@ public synchronized TableSchema getUpdatedSchema() {
414413
if (tableSchemaAndTimestamp == null) {
415414
return null;
416415
}
417-
return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0
416+
return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp()
418417
? tableSchemaAndTimestamp.updatedSchema()
419418
: null;
420419
}
421420

422-
Instant getCreationTimestamp() {
421+
long getCreationTimestamp() {
423422
return creationTimestamp;
424423
}
425424

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,6 @@ public void run() throws Throwable {
530530
public void testSimpleSchemaUpdate() throws Exception {
531531
try (JsonStreamWriter writer =
532532
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
533-
// Sleep for a short period to make sure the creation timestamp is older.
534-
Sleeper.DEFAULT.sleep(200);
535533
testBigQueryWrite.addResponse(
536534
AppendRowsResponse.newBuilder()
537535
.setAppendResult(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.junit.Assert.assertThrows;
2121
import static org.junit.Assert.assertTrue;
2222

23-
import com.google.api.client.util.Sleeper;
2423
import com.google.api.core.ApiFuture;
2524
import com.google.api.gax.batching.FlowController;
2625
import com.google.api.gax.core.NoCredentialsProvider;
@@ -310,8 +309,6 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing)
310309
AppendRowsResponse response =
311310
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get();
312311
assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA);
313-
// Sleep for a short period to make sure the creation timestamp is older.
314-
Sleeper.DEFAULT.sleep(200);
315312

316313
// Create another writer, although it's the same stream name but the time stamp is newer, thus
317314
// the old updated schema won't get returned.

0 commit comments

Comments
 (0)