Skip to content

Commit ba147c3

Browse files
authored
feat: send feature flag when flow control is enabled (#1731)
* feat: send feature flag when flow control is enabled * address comment * update
1 parent b518d68 commit ba147c3

File tree

3 files changed

+84
-12
lines changed

3 files changed

+84
-12
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.api.gax.rpc.TransportChannelProvider;
3333
import com.google.api.gax.rpc.UnaryCallSettings;
3434
import com.google.auth.Credentials;
35+
import com.google.bigtable.v2.FeatureFlags;
3536
import com.google.bigtable.v2.PingAndWarmRequest;
3637
import com.google.cloud.bigtable.Version;
3738
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
@@ -50,7 +51,10 @@
5051
import com.google.common.collect.ImmutableList;
5152
import com.google.common.collect.ImmutableMap;
5253
import com.google.common.collect.ImmutableSet;
54+
import java.io.ByteArrayOutputStream;
5355
import java.io.IOException;
56+
import java.nio.charset.StandardCharsets;
57+
import java.util.Base64;
5458
import java.util.List;
5559
import java.util.Map;
5660
import java.util.Set;
@@ -221,6 +225,8 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
221225
readChangeStreamSettings;
222226
private final UnaryCallSettings<PingAndWarmRequest, Void> pingAndWarmSettings;
223227

228+
private final FeatureFlags featureFlags;
229+
224230
private EnhancedBigtableStubSettings(Builder builder) {
225231
super(builder);
226232

@@ -259,6 +265,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
259265
builder.generateInitialChangeStreamPartitionsSettings.build();
260266
readChangeStreamSettings = builder.readChangeStreamSettings.build();
261267
pingAndWarmSettings = builder.pingAndWarmSettings.build();
268+
featureFlags = builder.featureFlags.build();
262269
}
263270

264271
/** Create a new builder. */
@@ -598,6 +605,8 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
598605
readChangeStreamSettings;
599606
private final UnaryCallSettings.Builder<PingAndWarmRequest, Void> pingAndWarmSettings;
600607

608+
private FeatureFlags.Builder featureFlags;
609+
601610
/**
602611
* Initializes a new Builder with sane defaults for all settings.
603612
*
@@ -621,16 +630,6 @@ private Builder() {
621630
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
622631
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());
623632

624-
// Inject the UserAgent in addition to api-client header
625-
Map<String, String> headers =
626-
ImmutableMap.<String, String>builder()
627-
.putAll(
628-
BigtableStubSettings.defaultApiClientHeaderProviderBuilder().build().getHeaders())
629-
// GrpcHeaderInterceptor treats the `user-agent` as a magic string
630-
.put("user-agent", "bigtable-java/" + Version.VERSION)
631-
.build();
632-
setInternalHeaderProvider(FixedHeaderProvider.create(headers));
633-
634633
// Per-method settings using baseSettings for defaults.
635634
readRowsSettings = ServerStreamingCallSettings.newBuilder();
636635

@@ -729,6 +728,8 @@ private Builder() {
729728
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
730729
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
731730
.build());
731+
732+
featureFlags = FeatureFlags.newBuilder();
732733
}
733734

734735
private Builder(EnhancedBigtableStubSettings settings) {
@@ -753,6 +754,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
753754
settings.generateInitialChangeStreamPartitionsSettings.toBuilder();
754755
readChangeStreamSettings = settings.readChangeStreamSettings.toBuilder();
755756
pingAndWarmSettings = settings.pingAndWarmSettings.toBuilder();
757+
featureFlags = settings.featureFlags.toBuilder();
756758
}
757759
// <editor-fold desc="Private Helpers">
758760

@@ -970,6 +972,34 @@ public EnhancedBigtableStubSettings build() {
970972
BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId));
971973
this.setTransportChannelProvider(channelProviderBuilder.build());
972974
}
975+
976+
if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
977+
// only set mutate rows feature flag when this feature is enabled
978+
featureFlags.setMutateRowsRateLimit(true);
979+
}
980+
981+
// Serialize the web64 encode the bigtable feature flags
982+
ByteArrayOutputStream boas = new ByteArrayOutputStream();
983+
try {
984+
featureFlags.build().writeTo(boas);
985+
} catch (IOException e) {
986+
throw new IllegalStateException(
987+
"Unexpected IOException while serializing feature flags", e);
988+
}
989+
byte[] serializedFlags = boas.toByteArray();
990+
byte[] encodedFlags = Base64.getUrlEncoder().encode(serializedFlags);
991+
992+
// Inject the UserAgent in addition to api-client header
993+
Map<String, String> headers =
994+
ImmutableMap.<String, String>builder()
995+
.putAll(
996+
BigtableStubSettings.defaultApiClientHeaderProviderBuilder().build().getHeaders())
997+
// GrpcHeaderInterceptor treats the `user-agent` as a magic string
998+
.put("user-agent", "bigtable-java/" + Version.VERSION)
999+
.put("bigtable-features", new String(encodedFlags, StandardCharsets.UTF_8))
1000+
.build();
1001+
setInternalHeaderProvider(FixedHeaderProvider.create(headers));
1002+
9731003
return new EnhancedBigtableStubSettings(this);
9741004
}
9751005
// </editor-fold>

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -831,8 +831,8 @@ public void testToString() {
831831
nonStaticFields++;
832832
}
833833
}
834-
// failure will signal about adding a new settings property
835-
assertThat(SETTINGS_LIST.length).isEqualTo(nonStaticFields);
834+
// failure will signal about adding a new settings property - feature flag field
835+
assertThat(SETTINGS_LIST.length).isEqualTo(nonStaticFields - 1);
836836
}
837837

838838
void checkToString(EnhancedBigtableStubSettings settings) {

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.api.gax.rpc.ServerStreamingCallable;
3434
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
3535
import com.google.bigtable.v2.BigtableGrpc;
36+
import com.google.bigtable.v2.FeatureFlags;
3637
import com.google.bigtable.v2.MutateRowsRequest;
3738
import com.google.bigtable.v2.MutateRowsResponse;
3839
import com.google.bigtable.v2.PingAndWarmRequest;
@@ -45,6 +46,7 @@
4546
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
4647
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
4748
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
49+
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
4850
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
4951
import com.google.cloud.bigtable.data.v2.models.Query;
5052
import com.google.cloud.bigtable.data.v2.models.Row;
@@ -77,6 +79,7 @@
7779
import java.security.KeyPair;
7880
import java.security.KeyPairGenerator;
7981
import java.security.NoSuchAlgorithmException;
82+
import java.util.Base64;
8083
import java.util.Collection;
8184
import java.util.concurrent.ArrayBlockingQueue;
8285
import java.util.concurrent.BlockingQueue;
@@ -486,6 +489,45 @@ public void testCallContextPropagatedInReadBatcher()
486489
}
487490
}
488491

492+
@Test
493+
public void testBulkMutationFlowControlFeatureFlagIsSet() throws Exception {
494+
BulkMutation bulkMutation =
495+
BulkMutation.create("my-table")
496+
.add(RowMutationEntry.create("row-key").setCell("cf", "q", "value"));
497+
498+
// Test the header is set when the feature is enabled
499+
EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder();
500+
settings.bulkMutateRowsSettings().setServerInitiatedFlowControl(true);
501+
EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build());
502+
stub.bulkMutateRowsCallable().call(bulkMutation);
503+
assertThat(metadataInterceptor.headers).hasSize(1);
504+
Metadata metadata = metadataInterceptor.headers.take();
505+
String encodedFlags =
506+
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
507+
byte[] decodedFlags = Base64.getDecoder().decode(encodedFlags);
508+
FeatureFlags featureFlags = FeatureFlags.parseFrom(decodedFlags);
509+
assertThat(featureFlags.getMutateRowsRateLimit()).isTrue();
510+
}
511+
512+
@Test
513+
public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception {
514+
BulkMutation bulkMutation =
515+
BulkMutation.create("my-table")
516+
.add(RowMutationEntry.create("row-key").setCell("cf", "q", "value"));
517+
518+
EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder();
519+
settings.bulkMutateRowsSettings().setServerInitiatedFlowControl(false);
520+
EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build());
521+
stub.bulkMutateRowsCallable().call(bulkMutation);
522+
assertThat(metadataInterceptor.headers).hasSize(1);
523+
Metadata metadata = metadataInterceptor.headers.take();
524+
String encodedFlags =
525+
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
526+
byte[] decodedFlags = Base64.getDecoder().decode(encodedFlags);
527+
FeatureFlags featureFlags = FeatureFlags.parseFrom(decodedFlags);
528+
assertThat(featureFlags.getMutateRowsRateLimit()).isFalse();
529+
}
530+
489531
private static class MetadataInterceptor implements ServerInterceptor {
490532
final BlockingQueue<Metadata> headers = Queues.newLinkedBlockingDeque();
491533

0 commit comments

Comments
 (0)