Skip to content

Commit 2adf81b

Browse files
feat: add update schema support for multiplexing (#1867)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 1a88736 commit 2adf81b

File tree

9 files changed

+465
-30
lines changed

9 files changed

+465
-30
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,15 @@
6565
<className>com/google/cloud/bigquery/storage/v1/Exceptions$AppendSerializtionError</className>
6666
<method>Exceptions$AppendSerializtionError(java.lang.String, java.util.Map)</method>
6767
</difference>
68+
<difference>
69+
<differenceType>7006</differenceType>
70+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
71+
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
72+
<to>com.google.cloud.bigquery.storage.v1.ConnectionWorker$TableSchemaAndTimestamp</to>
73+
</difference>
74+
<difference>
75+
<differenceType>7009</differenceType>
76+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
77+
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
78+
</difference>
6879
</differences>

google-cloud-bigquerystorage/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@
152152
<artifactId>junit</artifactId>
153153
<scope>test</scope>
154154
</dependency>
155+
<dependency>
156+
<groupId>com.google.http-client</groupId>
157+
<artifactId>google-http-client</artifactId>
158+
<version>1.42.3</version>
159+
<scope>test</scope>
160+
</dependency>
155161
<dependency>
156162
<groupId>com.google.truth</groupId>
157163
<artifactId>truth</artifactId>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.grpc.Status.Code;
3232
import io.grpc.StatusRuntimeException;
3333
import java.io.IOException;
34+
import java.time.Instant;
3435
import java.util.Comparator;
3536
import java.util.Deque;
3637
import java.util.HashMap;
@@ -159,7 +160,7 @@ public class ConnectionWorker implements AutoCloseable {
159160
* Contains the updated TableSchema.
160161
*/
161162
@GuardedBy("lock")
162-
private TableSchema updatedSchema;
163+
private TableSchemaAndTimestamp updatedSchema;
163164

164165
/*
165166
* A client used to interact with BigQuery.
@@ -608,7 +609,8 @@ private void requestCallback(AppendRowsResponse response) {
608609
AppendRequestAndResponse requestWrapper;
609610
this.lock.lock();
610611
if (response.hasUpdatedSchema()) {
611-
this.updatedSchema = response.getUpdatedSchema();
612+
this.updatedSchema =
613+
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema());
612614
}
613615
try {
614616
// Had a successful connection with at least one result, reset retries.
@@ -720,7 +722,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() {
720722
}
721723

722724
/** Thread-safe getter of updated TableSchema */
723-
public synchronized TableSchema getUpdatedSchema() {
725+
synchronized TableSchemaAndTimestamp getUpdatedSchema() {
724726
return this.updatedSchema;
725727
}
726728

@@ -818,4 +820,17 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) {
818820
overwhelmedInflightCount = newThreshold;
819821
}
820822
}
823+
824+
@AutoValue
825+
abstract static class TableSchemaAndTimestamp {
826+
// Shows the timestamp updated schema is reported from response
827+
abstract Instant updateTimeStamp();
828+
829+
// The updated schema returned from server.
830+
abstract TableSchema updatedSchema();
831+
832+
static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) {
833+
return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
834+
}
835+
}
821836
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
package com.google.cloud.bigquery.storage.v1;
1717

1818
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.ApiFutures;
1920
import com.google.api.gax.batching.FlowController;
2021
import com.google.auto.value.AutoValue;
2122
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
23+
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
24+
import com.google.common.base.Preconditions;
2225
import com.google.common.base.Stopwatch;
2326
import com.google.common.collect.ImmutableList;
27+
import com.google.common.util.concurrent.MoreExecutors;
2428
import java.io.IOException;
29+
import java.time.Instant;
2530
import java.util.Collections;
2631
import java.util.Comparator;
2732
import java.util.HashSet;
@@ -33,10 +38,15 @@
3338
import java.util.concurrent.locks.Lock;
3439
import java.util.concurrent.locks.ReentrantLock;
3540
import java.util.logging.Logger;
41+
import java.util.regex.Matcher;
42+
import java.util.regex.Pattern;
3643
import javax.annotation.concurrent.GuardedBy;
3744

3845
/** Pool of connections to accept appends and distirbute to different connections. */
3946
public class ConnectionWorkerPool {
47+
static final Pattern STREAM_NAME_PATTERN =
48+
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/streams/([^/]+)");
49+
4050
private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName());
4151
/*
4252
* Max allowed inflight requests in the stream. Method append is blocked at this.
@@ -65,6 +75,11 @@ public class ConnectionWorkerPool {
6575
private final Set<ConnectionWorker> connectionWorkerPool =
6676
Collections.synchronizedSet(new HashSet<>());
6777

78+
/*
79+
* Contains the mapping from stream name to updated schema.
80+
*/
81+
private Map<String, TableSchemaAndTimestamp> tableNameToUpdatedSchema = new ConcurrentHashMap<>();
82+
6883
/** Enable test related logic. */
6984
private static boolean enableTesting = false;
7085

@@ -246,7 +261,18 @@ public ApiFuture<AppendRowsResponse> append(
246261
ApiFuture<AppendRowsResponse> responseFuture =
247262
connectionWorker.append(
248263
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
249-
return responseFuture;
264+
return ApiFutures.transform(
265+
responseFuture,
266+
// Add callback for update schema
267+
(response) -> {
268+
if (response.getWriteStream() != "" && response.hasUpdatedSchema()) {
269+
tableNameToUpdatedSchema.put(
270+
response.getWriteStream(),
271+
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()));
272+
}
273+
return response;
274+
},
275+
MoreExecutors.directExecutor());
250276
}
251277

252278
/**
@@ -392,6 +418,10 @@ public long getInflightWaitSeconds(StreamWriter streamWriter) {
392418
}
393419
}
394420

421+
TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) {
422+
return tableNameToUpdatedSchema.getOrDefault(streamWriter.getStreamName(), null);
423+
}
424+
395425
/** Enable Test related logic. */
396426
public static void enableTestingLogic() {
397427
enableTesting = true;
@@ -421,4 +451,15 @@ FlowController.LimitExceededBehavior limitExceededBehavior() {
421451
BigQueryWriteClient bigQueryWriteClient() {
422452
return client;
423453
}
454+
455+
static String toTableName(String streamName) {
456+
Matcher matcher = STREAM_NAME_PATTERN.matcher(streamName);
457+
Preconditions.checkArgument(matcher.matches(), "Invalid stream name: %s.", streamName);
458+
return "projects/"
459+
+ matcher.group(1)
460+
+ "/datasets/"
461+
+ matcher.group(2)
462+
+ "/tables/"
463+
+ matcher.group(3);
464+
}
424465
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.api.gax.core.ExecutorProvider;
2222
import com.google.api.gax.rpc.TransportChannelProvider;
2323
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
24-
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
2524
import com.google.common.base.Preconditions;
2625
import com.google.protobuf.Descriptors;
2726
import com.google.protobuf.Descriptors.Descriptor;
@@ -186,9 +185,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
186185
throws IOException, DescriptorValidationException {
187186
// Handle schema updates in a Thread-safe way by locking down the operation
188187
synchronized (this) {
189-
// Update schema only work when connection pool is not enabled.
190-
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
191-
&& this.streamWriter.getUpdatedSchema() != null) {
188+
// Create a new stream writer internally if a new updated schema is reported from backend.
189+
if (this.streamWriter.getUpdatedSchema() != null) {
192190
refreshWriter(this.streamWriter.getUpdatedSchema());
193191
}
194192

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import com.google.api.gax.rpc.TransportChannelProvider;
2323
import com.google.auto.value.AutoOneOf;
2424
import com.google.auto.value.AutoValue;
25+
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
2526
import com.google.common.annotations.VisibleForTesting;
2627
import com.google.common.base.Preconditions;
2728
import io.grpc.Status;
2829
import io.grpc.Status.Code;
2930
import io.grpc.StatusRuntimeException;
3031
import java.io.IOException;
32+
import java.time.Instant;
3133
import java.util.Map;
3234
import java.util.Objects;
3335
import java.util.UUID;
@@ -85,6 +87,9 @@ public class StreamWriter implements AutoCloseable {
8587
private static final Map<ConnectionPoolKey, ConnectionWorkerPool> connectionPoolMap =
8688
new ConcurrentHashMap<>();
8789

90+
/** Creation timestamp of this streamwriter */
91+
private final Instant creationTimestamp;
92+
8893
/** The maximum size of one request. Defined by the API. */
8994
public static long getApiMaxRequestBytes() {
9095
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -147,11 +152,11 @@ long getInflightWaitSeconds(StreamWriter streamWriter) {
147152
return connectionWorker().getInflightWaitSeconds();
148153
}
149154

150-
TableSchema getUpdatedSchema() {
155+
TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) {
151156
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
152-
// TODO(gaole): implement updated schema support for multiplexing.
153-
throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing.");
157+
return connectionWorkerPool().getUpdatedSchema(streamWriter);
154158
}
159+
// Always populate MIN timestamp to w
155160
return connectionWorker().getUpdatedSchema();
156161
}
157162

@@ -255,6 +260,7 @@ private StreamWriter(Builder builder) throws IOException {
255260
client.close();
256261
}
257262
}
263+
this.creationTimestamp = Instant.now();
258264
}
259265

260266
@VisibleForTesting
@@ -396,9 +402,25 @@ public static StreamWriter.Builder newBuilder(String streamName) {
396402
return new StreamWriter.Builder(streamName);
397403
}
398404

399-
/** Thread-safe getter of updated TableSchema */
405+
/**
406+
* Thread-safe getter of updated TableSchema.
407+
*
408+
* <p>This will return the updated schema only when the creation timestamp of this writer is older
409+
* than the updated schema.
410+
*/
400411
public synchronized TableSchema getUpdatedSchema() {
401-
return singleConnectionOrConnectionPool.getUpdatedSchema();
412+
TableSchemaAndTimestamp tableSchemaAndTimestamp =
413+
singleConnectionOrConnectionPool.getUpdatedSchema(this);
414+
if (tableSchemaAndTimestamp == null) {
415+
return null;
416+
}
417+
return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0
418+
? tableSchemaAndTimestamp.updatedSchema()
419+
: null;
420+
}
421+
422+
Instant getCreationTimestamp() {
423+
return creationTimestamp;
402424
}
403425

404426
@VisibleForTesting

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.bigquery.storage.v1;
1717

1818
import static com.google.common.truth.Truth.assertThat;
19+
import static org.junit.Assert.assertThrows;
1920

2021
import com.google.api.core.ApiFuture;
2122
import com.google.api.gax.batching.FlowController;
@@ -311,6 +312,16 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
311312
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0);
312313
}
313314

315+
@Test
316+
public void testToTableName() {
317+
assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s"))
318+
.isEqualTo("projects/p/datasets/d/tables/t");
319+
320+
IllegalArgumentException ex =
321+
assertThrows(
322+
IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName("projects/p/"));
323+
}
324+
314325
private AppendRowsResponse createAppendResponse(long offset) {
315326
return AppendRowsResponse.newBuilder()
316327
.setAppendResult(

0 commit comments

Comments
 (0)