22
22
23
23
import com .google .api .core .ApiFunction ;
24
24
import com .google .api .core .ApiFuture ;
25
+ import com .google .api .core .ApiFutures ;
25
26
import com .google .api .core .BetaApi ;
26
27
import com .google .api .core .InternalApi ;
27
28
import com .google .api .gax .batching .Batcher ;
57
58
import com .google .auth .Credentials ;
58
59
import com .google .auth .oauth2 .ServiceAccountJwtAccessCredentials ;
59
60
import com .google .bigtable .v2 .BigtableGrpc ;
60
- import com .google .bigtable .v2 .CheckAndMutateRowRequest ;
61
61
import com .google .bigtable .v2 .CheckAndMutateRowResponse ;
62
62
import com .google .bigtable .v2 .ExecuteQueryRequest ;
63
63
import com .google .bigtable .v2 .ExecuteQueryResponse ;
64
64
import com .google .bigtable .v2 .GenerateInitialChangeStreamPartitionsRequest ;
65
65
import com .google .bigtable .v2 .GenerateInitialChangeStreamPartitionsResponse ;
66
- import com .google .bigtable .v2 .MutateRowRequest ;
67
- import com .google .bigtable .v2 .MutateRowResponse ;
68
66
import com .google .bigtable .v2 .MutateRowsRequest ;
69
67
import com .google .bigtable .v2 .MutateRowsResponse ;
70
68
import com .google .bigtable .v2 .PingAndWarmRequest ;
71
69
import com .google .bigtable .v2 .PingAndWarmResponse ;
72
70
import com .google .bigtable .v2 .ReadChangeStreamRequest ;
73
71
import com .google .bigtable .v2 .ReadChangeStreamResponse ;
74
- import com .google .bigtable .v2 .ReadModifyWriteRowRequest ;
75
- import com .google .bigtable .v2 .ReadModifyWriteRowResponse ;
76
72
import com .google .bigtable .v2 .ReadRowsRequest ;
77
73
import com .google .bigtable .v2 .ReadRowsResponse ;
78
74
import com .google .bigtable .v2 .RowRange ;
144
140
import com .google .common .base .Preconditions ;
145
141
import com .google .common .collect .ImmutableList ;
146
142
import com .google .common .collect .ImmutableMap ;
143
+ import com .google .common .util .concurrent .MoreExecutors ;
147
144
import com .google .protobuf .ByteString ;
148
145
import io .grpc .ManagedChannelBuilder ;
146
+ import io .grpc .MethodDescriptor ;
149
147
import io .opencensus .stats .Stats ;
150
148
import io .opencensus .stats .StatsRecorder ;
151
149
import io .opencensus .tags .TagKey ;
162
160
import java .util .Map ;
163
161
import java .util .Set ;
164
162
import java .util .concurrent .TimeUnit ;
163
+ import java .util .function .Function ;
165
164
import java .util .logging .Level ;
166
165
import java .util .logging .Logger ;
167
166
import javax .annotation .Nonnull ;
@@ -790,42 +789,14 @@ public Map<String, String> extract(
790
789
* </ul>
791
790
*/
792
791
private UnaryCallable <RowMutation , Void > createMutateRowCallable () {
793
- String methodName = "MutateRow" ;
794
- UnaryCallable <MutateRowRequest , MutateRowResponse > base =
795
- GrpcRawCallableFactory .createUnaryCallable (
796
- GrpcCallSettings .<MutateRowRequest , MutateRowResponse >newBuilder ()
797
- .setMethodDescriptor (BigtableGrpc .getMutateRowMethod ())
798
- .setParamsExtractor (
799
- new RequestParamsExtractor <MutateRowRequest >() {
800
- @ Override
801
- public Map <String , String > extract (MutateRowRequest mutateRowRequest ) {
802
- String tableName = mutateRowRequest .getTableName ();
803
- String authorizedViewName = mutateRowRequest .getAuthorizedViewName ();
804
- if (tableName .isEmpty ()) {
805
- tableName =
806
- NameUtil .extractTableNameFromAuthorizedViewName (authorizedViewName );
807
- }
808
- return ImmutableMap .of (
809
- "table_name" ,
810
- tableName ,
811
- "app_profile_id" ,
812
- mutateRowRequest .getAppProfileId ());
813
- }
814
- })
815
- .build (),
816
- settings .mutateRowSettings ().getRetryableCodes ());
817
-
818
- UnaryCallable <MutateRowRequest , MutateRowResponse > withStatsHeaders =
819
- new StatsHeadersUnaryCallable <>(base );
820
-
821
- UnaryCallable <MutateRowRequest , MutateRowResponse > withBigtableTracer =
822
- new BigtableTracerUnaryCallable <>(withStatsHeaders );
823
-
824
- UnaryCallable <MutateRowRequest , MutateRowResponse > retrying =
825
- withRetries (withBigtableTracer , settings .mutateRowSettings ());
826
-
827
- return createUserFacingUnaryCallable (
828
- methodName , new MutateRowCallable (retrying , requestContext ));
792
+ return createUnaryCallable (
793
+ BigtableGrpc .getMutateRowMethod (),
794
+ req ->
795
+ composeRequestParams (
796
+ req .getAppProfileId (), req .getTableName (), req .getAuthorizedViewName ()),
797
+ settings .mutateRowSettings (),
798
+ req -> req .toProto (requestContext ),
799
+ resp -> null );
829
800
}
830
801
831
802
/**
@@ -1045,44 +1016,14 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(
1045
1016
* </ul>
1046
1017
*/
1047
1018
private UnaryCallable <ConditionalRowMutation , Boolean > createCheckAndMutateRowCallable () {
1048
- String methodName = "CheckAndMutateRow" ;
1049
- UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > base =
1050
- GrpcRawCallableFactory .createUnaryCallable (
1051
- GrpcCallSettings .<CheckAndMutateRowRequest , CheckAndMutateRowResponse >newBuilder ()
1052
- .setMethodDescriptor (BigtableGrpc .getCheckAndMutateRowMethod ())
1053
- .setParamsExtractor (
1054
- new RequestParamsExtractor <CheckAndMutateRowRequest >() {
1055
- @ Override
1056
- public Map <String , String > extract (
1057
- CheckAndMutateRowRequest checkAndMutateRowRequest ) {
1058
- String tableName = checkAndMutateRowRequest .getTableName ();
1059
- String authorizedViewName =
1060
- checkAndMutateRowRequest .getAuthorizedViewName ();
1061
- if (tableName .isEmpty ()) {
1062
- tableName =
1063
- NameUtil .extractTableNameFromAuthorizedViewName (authorizedViewName );
1064
- }
1065
- return ImmutableMap .of (
1066
- "table_name" ,
1067
- tableName ,
1068
- "app_profile_id" ,
1069
- checkAndMutateRowRequest .getAppProfileId ());
1070
- }
1071
- })
1072
- .build (),
1073
- settings .checkAndMutateRowSettings ().getRetryableCodes ());
1074
-
1075
- UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > withStatsHeaders =
1076
- new StatsHeadersUnaryCallable <>(base );
1077
-
1078
- UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > withBigtableTracer =
1079
- new BigtableTracerUnaryCallable <>(withStatsHeaders );
1080
-
1081
- UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > retrying =
1082
- withRetries (withBigtableTracer , settings .checkAndMutateRowSettings ());
1083
-
1084
- return createUserFacingUnaryCallable (
1085
- methodName , new CheckAndMutateRowCallable (retrying , requestContext ));
1019
+ return createUnaryCallable (
1020
+ BigtableGrpc .getCheckAndMutateRowMethod (),
1021
+ req ->
1022
+ composeRequestParams (
1023
+ req .getAppProfileId (), req .getTableName (), req .getAuthorizedViewName ()),
1024
+ settings .checkAndMutateRowSettings (),
1025
+ req -> req .toProto (requestContext ),
1026
+ CheckAndMutateRowResponse ::getPredicateMatched );
1086
1027
}
1087
1028
1088
1029
/**
@@ -1096,39 +1037,16 @@ public Map<String, String> extract(
1096
1037
* </ul>
1097
1038
*/
1098
1039
private UnaryCallable <ReadModifyWriteRow , Row > createReadModifyWriteRowCallable () {
1099
- UnaryCallable <ReadModifyWriteRowRequest , ReadModifyWriteRowResponse > base =
1100
- GrpcRawCallableFactory .createUnaryCallable (
1101
- GrpcCallSettings .<ReadModifyWriteRowRequest , ReadModifyWriteRowResponse >newBuilder ()
1102
- .setMethodDescriptor (BigtableGrpc .getReadModifyWriteRowMethod ())
1103
- .setParamsExtractor (
1104
- new RequestParamsExtractor <ReadModifyWriteRowRequest >() {
1105
- @ Override
1106
- public Map <String , String > extract (ReadModifyWriteRowRequest request ) {
1107
- String tableName = request .getTableName ();
1108
- String authorizedViewName = request .getAuthorizedViewName ();
1109
- if (tableName .isEmpty ()) {
1110
- tableName =
1111
- NameUtil .extractTableNameFromAuthorizedViewName (authorizedViewName );
1112
- }
1113
- return ImmutableMap .of (
1114
- "table_name" , tableName , "app_profile_id" , request .getAppProfileId ());
1115
- }
1116
- })
1117
- .build (),
1118
- settings .readModifyWriteRowSettings ().getRetryableCodes ());
1119
-
1120
- UnaryCallable <ReadModifyWriteRowRequest , ReadModifyWriteRowResponse > withStatsHeaders =
1121
- new StatsHeadersUnaryCallable <>(base );
1122
-
1123
- String methodName = "ReadModifyWriteRow" ;
1124
- UnaryCallable <ReadModifyWriteRowRequest , ReadModifyWriteRowResponse > withBigtableTracer =
1125
- new BigtableTracerUnaryCallable <>(withStatsHeaders );
1126
-
1127
- UnaryCallable <ReadModifyWriteRowRequest , ReadModifyWriteRowResponse > retrying =
1128
- withRetries (withBigtableTracer , settings .readModifyWriteRowSettings ());
1129
-
1130
- return createUserFacingUnaryCallable (
1131
- methodName , new ReadModifyWriteRowCallable (retrying , requestContext ));
1040
+ DefaultRowAdapter rowAdapter = new DefaultRowAdapter ();
1041
+
1042
+ return createUnaryCallable (
1043
+ BigtableGrpc .getReadModifyWriteRowMethod (),
1044
+ req ->
1045
+ composeRequestParams (
1046
+ req .getAppProfileId (), req .getTableName (), req .getAuthorizedViewName ()),
1047
+ settings .readModifyWriteRowSettings (),
1048
+ req -> req .toProto (requestContext ),
1049
+ resp -> rowAdapter .createRowFromProto (resp .getRow ()));
1132
1050
}
1133
1051
1134
1052
/**
@@ -1393,6 +1311,56 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
1393
1311
return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
1394
1312
}
1395
1313
1314
+ private Map <String , String > composeRequestParams (
1315
+ String appProfileId , String tableName , String authorizedViewName ) {
1316
+ if (tableName .isEmpty ()) {
1317
+ tableName = NameUtil .extractTableNameFromAuthorizedViewName (authorizedViewName );
1318
+ }
1319
+ return ImmutableMap .of ("table_name" , tableName , "app_profile_id" , appProfileId );
1320
+ }
1321
+
1322
+ private <BaseReqT , BaseRespT , ReqT , RespT > UnaryCallable <ReqT , RespT > createUnaryCallable (
1323
+ MethodDescriptor <BaseReqT , BaseRespT > methodDescriptor ,
1324
+ RequestParamsExtractor <BaseReqT > headerParamsFn ,
1325
+ UnaryCallSettings <ReqT , RespT > callSettings ,
1326
+ Function <ReqT , BaseReqT > requestTransformer ,
1327
+ Function <BaseRespT , RespT > responseTranformer ) {
1328
+
1329
+ UnaryCallable <BaseReqT , BaseRespT > base =
1330
+ GrpcRawCallableFactory .createUnaryCallable (
1331
+ GrpcCallSettings .<BaseReqT , BaseRespT >newBuilder ()
1332
+ .setMethodDescriptor (methodDescriptor )
1333
+ .setParamsExtractor (headerParamsFn )
1334
+ .build (),
1335
+ callSettings .getRetryableCodes ());
1336
+
1337
+ UnaryCallable <BaseReqT , BaseRespT > withStatsHeaders = new StatsHeadersUnaryCallable <>(base );
1338
+
1339
+ UnaryCallable <BaseReqT , BaseRespT > withBigtableTracer =
1340
+ new BigtableTracerUnaryCallable <>(withStatsHeaders );
1341
+
1342
+ UnaryCallable <BaseReqT , BaseRespT > retrying = withRetries (withBigtableTracer , callSettings );
1343
+
1344
+ UnaryCallable <ReqT , RespT > transformed =
1345
+ new UnaryCallable <ReqT , RespT >() {
1346
+ @ Override
1347
+ public ApiFuture <RespT > futureCall (ReqT reqT , ApiCallContext apiCallContext ) {
1348
+ ApiFuture <BaseRespT > f =
1349
+ retrying .futureCall (requestTransformer .apply (reqT ), apiCallContext );
1350
+ return ApiFutures .transform (
1351
+ f , responseTranformer ::apply , MoreExecutors .directExecutor ());
1352
+ }
1353
+ };
1354
+
1355
+ UnaryCallable <ReqT , RespT > traced =
1356
+ new TracedUnaryCallable <>(
1357
+ transformed ,
1358
+ clientContext .getTracerFactory (),
1359
+ getSpanName (methodDescriptor .getBareMethodName ()));
1360
+
1361
+ return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
1362
+ }
1363
+
1396
1364
private UnaryCallable <PingAndWarmRequest , PingAndWarmResponse > createPingAndWarmCallable () {
1397
1365
UnaryCallable <PingAndWarmRequest , PingAndWarmResponse > pingAndWarm =
1398
1366
GrpcRawCallableFactory .createUnaryCallable (
0 commit comments