@@ -664,6 +664,72 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
664
664
methodName , new MutateRowCallable (retrying , requestContext ));
665
665
}
666
666
667
+ /**
668
+ * Internal helper to create the base MutateRows callable chain. The chain is responsible for
669
+ * retrying individual entry in case of error.
670
+ *
671
+ * <p>NOTE: the caller is responsible for adding tracing & metrics.
672
+ *
673
+ * @see MutateRowsRetryingCallable for more details
674
+ */
675
+ private UnaryCallable <MutateRowsRequest , Void > createMutateRowsBaseCallable () {
676
+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > base =
677
+ GrpcRawCallableFactory .createServerStreamingCallable (
678
+ GrpcCallSettings .<MutateRowsRequest , MutateRowsResponse >newBuilder ()
679
+ .setMethodDescriptor (BigtableGrpc .getMutateRowsMethod ())
680
+ .setParamsExtractor (
681
+ new RequestParamsExtractor <MutateRowsRequest >() {
682
+ @ Override
683
+ public Map <String , String > extract (MutateRowsRequest mutateRowsRequest ) {
684
+ return ImmutableMap .of (
685
+ "table_name" , mutateRowsRequest .getTableName (),
686
+ "app_profile_id" , mutateRowsRequest .getAppProfileId ());
687
+ }
688
+ })
689
+ .build (),
690
+ settings .bulkMutateRowsSettings ().getRetryableCodes ());
691
+
692
+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > callable =
693
+ new StatsHeadersServerStreamingCallable <>(base );
694
+
695
+ if (settings .bulkMutateRowsSettings ().isServerInitiatedFlowControlEnabled ()) {
696
+ callable = new RateLimitingServerStreamingCallable (callable );
697
+ }
698
+
699
+ // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
700
+ // and
701
+ // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
702
+ // which by default is not retryable. Convert the exception so it can be retried in the client.
703
+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > convertException =
704
+ new ConvertExceptionCallable <>(callable );
705
+
706
+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > withBigtableTracer =
707
+ new BigtableTracerStreamingCallable <>(convertException );
708
+
709
+ BasicResultRetryAlgorithm <Void > resultRetryAlgorithm ;
710
+ if (settings .getEnableRetryInfo ()) {
711
+ resultRetryAlgorithm = new RetryInfoRetryAlgorithm <>();
712
+ } else {
713
+ resultRetryAlgorithm = new ApiResultRetryAlgorithm <>();
714
+ }
715
+
716
+ RetryAlgorithm <Void > retryAlgorithm =
717
+ new RetryAlgorithm <>(
718
+ resultRetryAlgorithm ,
719
+ new ExponentialRetryAlgorithm (
720
+ settings .bulkMutateRowsSettings ().getRetrySettings (), clientContext .getClock ()));
721
+
722
+ RetryingExecutorWithContext <Void > retryingExecutor =
723
+ new ScheduledRetryingExecutor <>(retryAlgorithm , clientContext .getExecutor ());
724
+
725
+ return new MutateRowsRetryingCallable (
726
+ clientContext .getDefaultCallContext (),
727
+ withBigtableTracer ,
728
+ retryingExecutor ,
729
+ settings .bulkMutateRowsSettings ().getRetryableCodes (),
730
+ retryAlgorithm );
731
+ }
732
+
667
733
/**
668
734
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
669
735
* batching. The chain will:
@@ -773,72 +839,6 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(
773
839
MoreObjects .firstNonNull (ctx , clientContext .getDefaultCallContext ()));
774
840
}
775
841
776
- /**
777
- * Internal helper to create the base MutateRows callable chain. The chain is responsible for
778
- * retrying individual entry in case of error.
779
- *
780
- * <p>NOTE: the caller is responsible for adding tracing & metrics.
781
- *
782
- * @see MutateRowsRetryingCallable for more details
783
- */
784
- private UnaryCallable <MutateRowsRequest , Void > createMutateRowsBaseCallable () {
785
- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > base =
786
- GrpcRawCallableFactory .createServerStreamingCallable (
787
- GrpcCallSettings .<MutateRowsRequest , MutateRowsResponse >newBuilder ()
788
- .setMethodDescriptor (BigtableGrpc .getMutateRowsMethod ())
789
- .setParamsExtractor (
790
- new RequestParamsExtractor <MutateRowsRequest >() {
791
- @ Override
792
- public Map <String , String > extract (MutateRowsRequest mutateRowsRequest ) {
793
- return ImmutableMap .of (
794
- "table_name" , mutateRowsRequest .getTableName (),
795
- "app_profile_id" , mutateRowsRequest .getAppProfileId ());
796
- }
797
- })
798
- .build (),
799
- settings .bulkMutateRowsSettings ().getRetryableCodes ());
800
-
801
- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > callable =
802
- new StatsHeadersServerStreamingCallable <>(base );
803
-
804
- if (settings .bulkMutateRowsSettings ().isServerInitiatedFlowControlEnabled ()) {
805
- callable = new RateLimitingServerStreamingCallable (callable );
806
- }
807
-
808
- // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
809
- // and
810
- // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
811
- // which by default is not retryable. Convert the exception so it can be retried in the client.
812
- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > convertException =
813
- new ConvertExceptionCallable <>(callable );
814
-
815
- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > withBigtableTracer =
816
- new BigtableTracerStreamingCallable <>(convertException );
817
-
818
- BasicResultRetryAlgorithm <Void > resultRetryAlgorithm ;
819
- if (settings .getEnableRetryInfo ()) {
820
- resultRetryAlgorithm = new RetryInfoRetryAlgorithm <>();
821
- } else {
822
- resultRetryAlgorithm = new ApiResultRetryAlgorithm <>();
823
- }
824
-
825
- RetryAlgorithm <Void > retryAlgorithm =
826
- new RetryAlgorithm <>(
827
- resultRetryAlgorithm ,
828
- new ExponentialRetryAlgorithm (
829
- settings .bulkMutateRowsSettings ().getRetrySettings (), clientContext .getClock ()));
830
-
831
- RetryingExecutorWithContext <Void > retryingExecutor =
832
- new ScheduledRetryingExecutor <>(retryAlgorithm , clientContext .getExecutor ());
833
-
834
- return new MutateRowsRetryingCallable (
835
- clientContext .getDefaultCallContext (),
836
- withBigtableTracer ,
837
- retryingExecutor ,
838
- settings .bulkMutateRowsSettings ().getRetryableCodes (),
839
- retryAlgorithm );
840
- }
841
-
842
842
/**
843
843
* Creates a callable chain to handle CheckAndMutateRow RPCs. THe chain will:
844
844
*
0 commit comments