Skip to content

Commit fc7845b

Browse files
feat: count row merging errors as internal errors (#2045)
* feat: count row merging errors as internal errors Currently they dont have a status associated and thus get counted as UNKOWN Change-Id: Ida3470a0609f2e2ad51534eb3141db394af1dcdc * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * format Change-Id: Iccb2a38b78e5f6c420cb1656887beebaecfa02d2 --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 6b48606 commit fc7845b

File tree

2 files changed

+130
-2
lines changed
  • google-cloud-bigtable/src

2 files changed

+130
-2
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/StateMachine.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub.readrows;
1717

18+
import com.google.api.gax.grpc.GrpcStatusCode;
19+
import com.google.api.gax.rpc.InternalException;
1820
import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
1921
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
2022
import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder;
2123
import com.google.common.base.Joiner;
2224
import com.google.common.base.Preconditions;
2325
import com.google.common.collect.EvictingQueue;
2426
import com.google.protobuf.ByteString;
27+
import io.grpc.Status;
2528
import java.util.List;
2629

2730
/**
@@ -252,6 +255,21 @@ State handleChunk(CellChunk chunk) {
252255
new State() {
253256
@Override
254257
State handleLastScannedRow(ByteString rowKey) {
258+
if (lastCompleteRowKey != null) {
259+
int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, rowKey);
260+
String direction = "increasing";
261+
if (reversed) {
262+
cmp *= -1;
263+
direction = "decreasing";
264+
}
265+
266+
validate(
267+
cmp < 0,
268+
"AWAITING_NEW_ROW: last scanned key must be strictly "
269+
+ direction
270+
+ ". New last scanned key="
271+
+ rowKey);
272+
}
255273
completeRow = adapter.createScanMarkerRow(rowKey);
256274
lastCompleteRowKey = rowKey;
257275
return AWAITING_ROW_CONSUME;
@@ -468,9 +486,9 @@ private void validate(boolean condition, String message) {
468486
}
469487
}
470488

471-
static class InvalidInputException extends RuntimeException {
489+
static class InvalidInputException extends InternalException {
472490
InvalidInputException(String message) {
473-
super(message);
491+
super(message, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
474492
}
475493
}
476494
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.functional;
17+
18+
import com.google.api.gax.rpc.InternalException;
19+
import com.google.bigtable.v2.BigtableGrpc;
20+
import com.google.bigtable.v2.ReadRowsRequest;
21+
import com.google.bigtable.v2.ReadRowsResponse;
22+
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
23+
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
24+
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
25+
import com.google.cloud.bigtable.data.v2.models.Query;
26+
import com.google.cloud.bigtable.data.v2.models.Row;
27+
import com.google.protobuf.ByteString;
28+
import com.google.protobuf.BytesValue;
29+
import com.google.protobuf.StringValue;
30+
import io.grpc.Server;
31+
import io.grpc.stub.StreamObserver;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.List;
35+
import org.junit.After;
36+
import org.junit.Assert;
37+
import org.junit.Before;
38+
import org.junit.Test;
39+
import org.junit.runner.RunWith;
40+
import org.junit.runners.JUnit4;
41+
42+
@RunWith(JUnit4.class)
43+
public class ReadRowsTest {
44+
private FakeService service;
45+
private Server server;
46+
47+
@Before
48+
public void setUp() throws Exception {
49+
service = new FakeService();
50+
server = FakeServiceBuilder.create(service).start();
51+
}
52+
53+
@After
54+
public void tearDown() throws Exception {
55+
server.shutdown();
56+
}
57+
58+
@Test
59+
public void rowMergingErrorsUseInternalStatus() throws Exception {
60+
BigtableDataSettings settings =
61+
BigtableDataSettings.newBuilderForEmulator(server.getPort())
62+
.setProjectId("fake-project")
63+
.setInstanceId("fake-instance")
64+
.build();
65+
66+
service.readRowsResponses.add(
67+
ReadRowsResponse.newBuilder()
68+
.addChunks(
69+
ReadRowsResponse.CellChunk.newBuilder()
70+
.setRowKey(ByteString.copyFromUtf8("z"))
71+
.setFamilyName(StringValue.newBuilder().setValue("f"))
72+
.setQualifier(
73+
BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
74+
.setTimestampMicros(1000)
75+
.setValue(ByteString.copyFromUtf8("v"))
76+
.setCommitRow(true))
77+
.addChunks(
78+
ReadRowsResponse.CellChunk.newBuilder()
79+
.setRowKey(ByteString.copyFromUtf8("a"))
80+
.setFamilyName(StringValue.newBuilder().setValue("f"))
81+
.setQualifier(
82+
BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
83+
.setTimestampMicros(1000)
84+
.setValue(ByteString.copyFromUtf8("v"))
85+
.setCommitRow(true))
86+
.build());
87+
88+
try (BigtableDataClient client = BigtableDataClient.create(settings)) {
89+
Assert.assertThrows(
90+
InternalException.class,
91+
() -> {
92+
for (Row ignored : client.readRows(Query.create("fake-table"))) {}
93+
});
94+
}
95+
}
96+
97+
static class FakeService extends BigtableGrpc.BigtableImplBase {
98+
private List<ReadRowsResponse> readRowsResponses =
99+
Collections.synchronizedList(new ArrayList<>());
100+
101+
@Override
102+
public void readRows(
103+
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
104+
for (ReadRowsResponse r : readRowsResponses) {
105+
responseObserver.onNext(r);
106+
}
107+
responseObserver.onCompleted();
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)