Skip to content

Commit f743187

Browse files
fix: make sure to propagate the response when throttling is enabled (#1908)
Change-Id: I690c522aebea03a966155d930bff26042d1bb1f1 Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 100dcd4 commit f743187

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ protected void onResponseImpl(MutateRowsResponse response) {
127127
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
128128
}
129129
}
130+
outerObserver.onResponse(response);
130131
}
131132

132133
@Override

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
import com.google.api.gax.rpc.StreamController;
2828
import com.google.bigtable.v2.MutateRowsRequest;
2929
import com.google.bigtable.v2.MutateRowsResponse;
30+
import com.google.bigtable.v2.Mutation;
3031
import com.google.bigtable.v2.RateLimitInfo;
3132
import com.google.cloud.bigtable.gaxx.testing.FakeStatusCode;
33+
import com.google.protobuf.ByteString;
3234
import com.google.protobuf.Duration;
35+
import com.google.rpc.Code;
36+
import com.google.rpc.Status;
3337
import org.junit.Before;
3438
import org.junit.Test;
3539
import org.junit.runner.RunWith;
@@ -138,6 +142,46 @@ public void testErrorInfoLowerQPS() throws Exception {
138142
assertThat(newQps).isWithin(0.1).of(oldQps * RateLimitingServerStreamingCallable.MIN_FACTOR);
139143
}
140144

145+
@Test
146+
public void testResponseIsPropagated() {
147+
MutateRowsResponse expectedResponse =
148+
MutateRowsResponse.newBuilder()
149+
.addEntries(
150+
MutateRowsResponse.Entry.newBuilder()
151+
.setIndex(0)
152+
.setStatus(Status.newBuilder().setCode(Code.PERMISSION_DENIED_VALUE)))
153+
.build();
154+
innerCallable =
155+
new MockCallable() {
156+
@Override
157+
public void call(
158+
MutateRowsRequest mutateRowsRequest,
159+
ResponseObserver<MutateRowsResponse> responseObserver,
160+
ApiCallContext apiCallContext) {
161+
responseObserver.onResponse(expectedResponse);
162+
responseObserver.onComplete();
163+
}
164+
};
165+
166+
callableToTest = new RateLimitingServerStreamingCallable(innerCallable);
167+
168+
ResponseObserver<MutateRowsResponse> mockObserver = Mockito.mock(ResponseObserver.class);
169+
170+
MutateRowsRequest req =
171+
MutateRowsRequest.newBuilder()
172+
.addEntries(
173+
MutateRowsRequest.Entry.newBuilder()
174+
.setRowKey(ByteString.copyFromUtf8("k1"))
175+
.addMutations(
176+
Mutation.newBuilder()
177+
.setDeleteFromRow(Mutation.DeleteFromRow.getDefaultInstance())))
178+
.build();
179+
180+
callableToTest.call(req, mockObserver, context);
181+
182+
Mockito.verify(mockObserver, Mockito.times(1)).onResponse(Mockito.eq(expectedResponse));
183+
}
184+
141185
private static class MockResponseObserver implements ResponseObserver<MutateRowsResponse> {
142186

143187
private ResponseObserver<MutateRowsResponse> observer;

0 commit comments

Comments
 (0)