18
18
import static com .google .common .truth .Truth .assertThat ;
19
19
import static org .junit .Assert .assertThrows ;
20
20
21
- import com .google .api .gax .core .NoCredentialsProvider ;
22
21
import com .google .api .gax .grpc .GrpcStatusCode ;
23
- import com .google .api .gax .grpc .GrpcTransportChannel ;
24
22
import com .google .api .gax .rpc .ApiException ;
25
23
import com .google .api .gax .rpc .ErrorDetails ;
26
- import com .google .api .gax .rpc .FixedTransportChannelProvider ;
27
24
import com .google .api .gax .rpc .InternalException ;
28
25
import com .google .api .gax .rpc .UnavailableException ;
29
26
import com .google .bigtable .v2 .BigtableGrpc ;
45
42
import com .google .bigtable .v2 .SampleRowKeysResponse ;
46
43
import com .google .cloud .bigtable .data .v2 .BigtableDataClient ;
47
44
import com .google .cloud .bigtable .data .v2 .BigtableDataSettings ;
45
+ import com .google .cloud .bigtable .data .v2 .FakeServiceBuilder ;
48
46
import com .google .cloud .bigtable .data .v2 .models .BulkMutation ;
49
47
import com .google .cloud .bigtable .data .v2 .models .ConditionalRowMutation ;
50
48
import com .google .cloud .bigtable .data .v2 .models .Filters ;
55
53
import com .google .cloud .bigtable .data .v2 .models .ReadModifyWriteRow ;
56
54
import com .google .cloud .bigtable .data .v2 .models .RowMutation ;
57
55
import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
56
+ import com .google .cloud .bigtable .data .v2 .models .TableId ;
58
57
import com .google .common .base .Stopwatch ;
59
58
import com .google .common .collect .ImmutableList ;
60
59
import com .google .common .collect .Queues ;
61
60
import com .google .protobuf .Any ;
62
61
import com .google .rpc .RetryInfo ;
62
+ import io .grpc .ForwardingServerCall ;
63
63
import io .grpc .Metadata ;
64
+ import io .grpc .MethodDescriptor ;
65
+ import io .grpc .Server ;
66
+ import io .grpc .ServerCall ;
67
+ import io .grpc .ServerCallHandler ;
68
+ import io .grpc .ServerInterceptor ;
64
69
import io .grpc .Status ;
65
70
import io .grpc .StatusRuntimeException ;
66
71
import io .grpc .stub .StreamObserver ;
67
- import io .grpc .testing .GrpcServerRule ;
68
72
import java .io .IOException ;
69
73
import java .time .Duration ;
74
+ import java .util .HashSet ;
70
75
import java .util .Queue ;
76
+ import java .util .Set ;
71
77
import java .util .concurrent .atomic .AtomicInteger ;
78
+ import java .util .stream .Collectors ;
79
+ import org .junit .After ;
72
80
import org .junit .Before ;
73
- import org .junit .Rule ;
74
81
import org .junit .Test ;
75
82
import org .junit .runner .RunWith ;
76
83
import org .junit .runners .JUnit4 ;
77
84
78
85
@ RunWith (JUnit4 .class )
79
86
public class RetryInfoTest {
80
87
81
- @ Rule public GrpcServerRule serverRule = new GrpcServerRule ();
82
-
83
88
private static final Metadata .Key <byte []> ERROR_DETAILS_KEY =
84
89
Metadata .Key .of ("grpc-status-details-bin" , Metadata .BINARY_BYTE_MARSHALLER );
85
90
91
+ private final Set <String > methods = new HashSet <>();
92
+
86
93
private FakeBigtableService service ;
94
+ private Server server ;
87
95
private BigtableDataClient client ;
88
96
private BigtableDataSettings .Builder settings ;
89
97
@@ -94,29 +102,111 @@ public class RetryInfoTest {
94
102
@ Before
95
103
public void setUp () throws IOException {
96
104
service = new FakeBigtableService ();
97
- serverRule .getServiceRegistry ().addService (service );
105
+
106
+ ServerInterceptor serverInterceptor =
107
+ new ServerInterceptor () {
108
+ @ Override
109
+ public <ReqT , RespT > ServerCall .Listener <ReqT > interceptCall (
110
+ ServerCall <ReqT , RespT > serverCall ,
111
+ Metadata metadata ,
112
+ ServerCallHandler <ReqT , RespT > serverCallHandler ) {
113
+ return serverCallHandler .startCall (
114
+ new ForwardingServerCall .SimpleForwardingServerCall <ReqT , RespT >(serverCall ) {
115
+ @ Override
116
+ public void close (Status status , Metadata trailers ) {
117
+ if (trailers .containsKey (ERROR_DETAILS_KEY )) {
118
+ methods .add (serverCall .getMethodDescriptor ().getBareMethodName ());
119
+ }
120
+ super .close (status , trailers );
121
+ }
122
+ },
123
+ metadata );
124
+ }
125
+ };
126
+ server = FakeServiceBuilder .create (service ).intercept (serverInterceptor ).start ();
98
127
99
128
settings =
100
- BigtableDataSettings .newBuilder ( )
129
+ BigtableDataSettings .newBuilderForEmulator ( server . getPort () )
101
130
.setProjectId ("fake-project" )
102
- .setInstanceId ("fake-instance" )
103
- .setCredentialsProvider (NoCredentialsProvider .create ());
104
-
105
- settings
106
- .stubSettings ()
107
- .setTransportChannelProvider (
108
- FixedTransportChannelProvider .create (
109
- GrpcTransportChannel .create (serverRule .getChannel ())))
110
- // channel priming doesn't work with FixedTransportChannelProvider. Disable it for the test
111
- .setRefreshingChannel (false )
112
- .build ();
131
+ .setInstanceId ("fake-instance" );
113
132
114
133
this .client = BigtableDataClient .create (settings .build ());
115
134
}
116
135
136
+ @ After
137
+ public void tearDown () {
138
+ if (client != null ) {
139
+ client .close ();
140
+ }
141
+ if (server != null ) {
142
+ server .shutdown ();
143
+ }
144
+ }
145
+
117
146
@ Test
118
- public void testReadRow () {
119
- verifyRetryInfoIsUsed (() -> client .readRow ("table" , "row" ), true );
147
+ public void testAllMethods () {
148
+ // Verify retry info is handled correctly for all the methods in data API.
149
+ verifyRetryInfoIsUsed (() -> client .readRow (TableId .of ("table" ), "row" ), true );
150
+
151
+ attemptCounter .set (0 );
152
+ verifyRetryInfoIsUsed (
153
+ () -> client .readRows (Query .create (TableId .of ("table" ))).iterator ().hasNext (), true );
154
+
155
+ attemptCounter .set (0 );
156
+ verifyRetryInfoIsUsed (
157
+ () ->
158
+ client .bulkMutateRows (
159
+ BulkMutation .create (TableId .of ("fake-table" ))
160
+ .add (RowMutationEntry .create ("row-key-1" ).setCell ("cf" , "q" , "v" ))),
161
+ true );
162
+
163
+ attemptCounter .set (0 );
164
+ verifyRetryInfoIsUsed (
165
+ () ->
166
+ client .mutateRow (
167
+ RowMutation .create (TableId .of ("fake-table" ), "key" ).setCell ("cf" , "q" , "v" )),
168
+ true );
169
+
170
+ attemptCounter .set (0 );
171
+ verifyRetryInfoIsUsed (() -> client .sampleRowKeys (TableId .of ("table" )), true );
172
+
173
+ attemptCounter .set (0 );
174
+ verifyRetryInfoIsUsed (
175
+ () ->
176
+ client .checkAndMutateRow (
177
+ ConditionalRowMutation .create ("table" , "key" )
178
+ .condition (Filters .FILTERS .value ().regex ("old-value" ))
179
+ .then (Mutation .create ().setCell ("cf" , "q" , "v" ))),
180
+ true );
181
+
182
+ attemptCounter .set (0 );
183
+ verifyRetryInfoIsUsed (
184
+ () ->
185
+ client .readModifyWriteRow (
186
+ ReadModifyWriteRow .create ("table" , "row" ).append ("cf" , "q" , "v" )),
187
+ true );
188
+
189
+ attemptCounter .set (0 );
190
+ verifyRetryInfoIsUsed (
191
+ () -> client .readChangeStream (ReadChangeStreamQuery .create ("table" )).iterator ().hasNext (),
192
+ true );
193
+
194
+ attemptCounter .set (0 );
195
+ verifyRetryInfoIsUsed (
196
+ () -> client .generateInitialChangeStreamPartitions ("table" ).iterator ().hasNext (), true );
197
+
198
+ // Verify that the new data API methods are tested or excluded. This is enforced by
199
+ // introspecting grpc
200
+ // method descriptors.
201
+ Set <String > expected =
202
+ BigtableGrpc .getServiceDescriptor ().getMethods ().stream ()
203
+ .map (MethodDescriptor ::getBareMethodName )
204
+ .collect (Collectors .toSet ());
205
+
206
+ // Exclude methods that don't support retry info
207
+ methods .add ("PingAndWarm" );
208
+
209
+ assertThat (methods ).containsExactlyElementsIn (expected );
120
210
}
121
211
122
212
@ Test
@@ -147,11 +237,6 @@ public void testReadRowServerNotReturningRetryInfoClientDisabledHandling() throw
147
237
}
148
238
}
149
239
150
- @ Test
151
- public void testReadRows () {
152
- verifyRetryInfoIsUsed (() -> client .readRows (Query .create ("table" )).iterator ().hasNext (), true );
153
- }
154
-
155
240
@ Test
156
241
public void testReadRowsNonRetraybleErrorWithRetryInfo () {
157
242
verifyRetryInfoIsUsed (() -> client .readRows (Query .create ("table" )).iterator ().hasNext (), false );
@@ -181,16 +266,6 @@ public void testReadRowsServerNotReturningRetryInfoClientDisabledHandling() thro
181
266
}
182
267
}
183
268
184
- @ Test
185
- public void testMutateRows () {
186
- verifyRetryInfoIsUsed (
187
- () ->
188
- client .bulkMutateRows (
189
- BulkMutation .create ("fake-table" )
190
- .add (RowMutationEntry .create ("row-key-1" ).setCell ("cf" , "q" , "v" ))),
191
- true );
192
- }
193
-
194
269
@ Test
195
270
public void testMutateRowsNonRetryableErrorWithRetryInfo () {
196
271
verifyRetryInfoIsUsed (
@@ -238,12 +313,6 @@ public void testMutateRowsServerNotReturningRetryInfoClientDisabledHandling() th
238
313
}
239
314
}
240
315
241
- @ Test
242
- public void testMutateRow () {
243
- verifyRetryInfoIsUsed (
244
- () -> client .mutateRow (RowMutation .create ("table" , "key" ).setCell ("cf" , "q" , "v" )), true );
245
- }
246
-
247
316
@ Test
248
317
public void testMutateRowNonRetryableErrorWithRetryInfo () {
249
318
verifyRetryInfoIsUsed (
@@ -278,11 +347,6 @@ public void testMutateRowServerNotReturningRetryInfoClientDisabledHandling() thr
278
347
}
279
348
}
280
349
281
- @ Test
282
- public void testSampleRowKeys () {
283
- verifyRetryInfoIsUsed (() -> client .sampleRowKeys ("table" ), true );
284
- }
285
-
286
350
@ Test
287
351
public void testSampleRowKeysNonRetryableErrorWithRetryInfo () {
288
352
verifyRetryInfoIsUsed (() -> client .sampleRowKeys ("table" ), false );
@@ -312,17 +376,6 @@ public void testSampleRowKeysServerNotReturningRetryInfoClientDisabledHandling()
312
376
}
313
377
}
314
378
315
- @ Test
316
- public void testCheckAndMutateRow () {
317
- verifyRetryInfoIsUsed (
318
- () ->
319
- client .checkAndMutateRow (
320
- ConditionalRowMutation .create ("table" , "key" )
321
- .condition (Filters .FILTERS .value ().regex ("old-value" ))
322
- .then (Mutation .create ().setCell ("cf" , "q" , "v" ))),
323
- true );
324
- }
325
-
326
379
@ Test
327
380
public void testCheckAndMutateDisableRetryInfo () throws IOException {
328
381
settings .stubSettings ().setEnableRetryInfo (false );
@@ -368,15 +421,6 @@ public void testCheckAndMutateServerNotReturningRetryInfoClientDisabledHandling(
368
421
}
369
422
}
370
423
371
- @ Test
372
- public void testReadModifyWrite () {
373
- verifyRetryInfoIsUsed (
374
- () ->
375
- client .readModifyWriteRow (
376
- ReadModifyWriteRow .create ("table" , "row" ).append ("cf" , "q" , "v" )),
377
- true );
378
- }
379
-
380
424
@ Test
381
425
public void testReadModifyWriteDisableRetryInfo () throws IOException {
382
426
settings .stubSettings ().setEnableRetryInfo (false );
@@ -414,13 +458,6 @@ public void testReadModifyWriteNotReturningRetryInfoClientDisabledHandling() thr
414
458
}
415
459
}
416
460
417
- @ Test
418
- public void testReadChangeStream () {
419
- verifyRetryInfoIsUsed (
420
- () -> client .readChangeStream (ReadChangeStreamQuery .create ("table" )).iterator ().hasNext (),
421
- true );
422
- }
423
-
424
461
@ Test
425
462
public void testReadChangeStreamNonRetryableErrorWithRetryInfo () {
426
463
verifyRetryInfoIsUsed (
@@ -465,12 +502,6 @@ public void testReadChangeStreamNotReturningRetryInfoClientDisabledHandling() th
465
502
}
466
503
}
467
504
468
- @ Test
469
- public void testGenerateInitialChangeStreamPartition () {
470
- verifyRetryInfoIsUsed (
471
- () -> client .generateInitialChangeStreamPartitions ("table" ).iterator ().hasNext (), true );
472
- }
473
-
474
505
@ Test
475
506
public void testGenerateInitialChangeStreamPartitionNonRetryableError () {
476
507
verifyRetryInfoIsUsed (
0 commit comments