24
24
25
25
import com .google .api .client .util .BackOff ;
26
26
import com .google .api .client .util .ExponentialBackOff ;
27
+ import com .google .api .gax .grpc .GrpcStatusCode ;
27
28
import com .google .api .gax .retrying .RetrySettings ;
28
29
import com .google .api .gax .rpc .ApiCallContext ;
30
+ import com .google .api .gax .rpc .StatusCode .Code ;
29
31
import com .google .cloud .ByteArray ;
30
32
import com .google .cloud .Date ;
31
33
import com .google .cloud .Timestamp ;
65
67
import java .util .LinkedList ;
66
68
import java .util .List ;
67
69
import java .util .Objects ;
70
+ import java .util .Set ;
68
71
import java .util .concurrent .BlockingQueue ;
69
72
import java .util .concurrent .CountDownLatch ;
70
73
import java .util .concurrent .Executor ;
@@ -1082,10 +1085,12 @@ public void onError(SpannerException e) {
1082
1085
@ VisibleForTesting
1083
1086
abstract static class ResumableStreamIterator extends AbstractIterator <PartialResultSet >
1084
1087
implements CloseableIterator <PartialResultSet > {
1085
- private static final RetrySettings STREAMING_RETRY_SETTINGS =
1088
+ private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
1086
1089
SpannerStubSettings .newBuilder ().executeStreamingSqlSettings ().getRetrySettings ();
1090
+ private final RetrySettings streamingRetrySettings ;
1091
+ private final Set <Code > retryableCodes ;
1087
1092
private static final Logger logger = Logger .getLogger (ResumableStreamIterator .class .getName ());
1088
- private final BackOff backOff = newBackOff () ;
1093
+ private final BackOff backOff ;
1089
1094
private final LinkedList <PartialResultSet > buffer = new LinkedList <>();
1090
1095
private final int maxBufferSize ;
1091
1096
private final Span span ;
@@ -1099,24 +1104,58 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
1099
1104
*/
1100
1105
private boolean safeToRetry = true ;
1101
1106
1102
- protected ResumableStreamIterator (int maxBufferSize , String streamName , Span parent ) {
1107
+ protected ResumableStreamIterator (
1108
+ int maxBufferSize ,
1109
+ String streamName ,
1110
+ Span parent ,
1111
+ RetrySettings streamingRetrySettings ,
1112
+ Set <Code > retryableCodes ) {
1103
1113
checkArgument (maxBufferSize >= 0 );
1104
1114
this .maxBufferSize = maxBufferSize ;
1105
1115
this .span = tracer .spanBuilderWithExplicitParent (streamName , parent ).startSpan ();
1106
- }
1107
-
1108
- private static ExponentialBackOff newBackOff () {
1116
+ this .streamingRetrySettings = Preconditions .checkNotNull (streamingRetrySettings );
1117
+ this .retryableCodes = Preconditions .checkNotNull (retryableCodes );
1118
+ this .backOff = newBackOff ();
1119
+ }
1120
+
1121
+ private ExponentialBackOff newBackOff () {
1122
+ if (Objects .equals (streamingRetrySettings , DEFAULT_STREAMING_RETRY_SETTINGS )) {
1123
+ return new ExponentialBackOff .Builder ()
1124
+ .setMultiplier (streamingRetrySettings .getRetryDelayMultiplier ())
1125
+ .setInitialIntervalMillis (
1126
+ Math .max (10 , (int ) streamingRetrySettings .getInitialRetryDelay ().toMillis ()))
1127
+ .setMaxIntervalMillis (
1128
+ Math .max (1000 , (int ) streamingRetrySettings .getMaxRetryDelay ().toMillis ()))
1129
+ .setMaxElapsedTimeMillis (
1130
+ Integer .MAX_VALUE ) // Prevent Backoff.STOP from getting returned.
1131
+ .build ();
1132
+ }
1109
1133
return new ExponentialBackOff .Builder ()
1110
- .setMultiplier (STREAMING_RETRY_SETTINGS .getRetryDelayMultiplier ())
1134
+ .setMultiplier (streamingRetrySettings .getRetryDelayMultiplier ())
1135
+ // All of these values must be > 0.
1111
1136
.setInitialIntervalMillis (
1112
- Math .max (10 , (int ) STREAMING_RETRY_SETTINGS .getInitialRetryDelay ().toMillis ()))
1137
+ Math .max (
1138
+ 1 ,
1139
+ (int )
1140
+ Math .min (
1141
+ streamingRetrySettings .getInitialRetryDelay ().toMillis (),
1142
+ Integer .MAX_VALUE )))
1113
1143
.setMaxIntervalMillis (
1114
- Math .max (1000 , (int ) STREAMING_RETRY_SETTINGS .getMaxRetryDelay ().toMillis ()))
1115
- .setMaxElapsedTimeMillis (Integer .MAX_VALUE ) // Prevent Backoff.STOP from getting returned.
1144
+ Math .max (
1145
+ 1 ,
1146
+ (int )
1147
+ Math .min (
1148
+ streamingRetrySettings .getMaxRetryDelay ().toMillis (), Integer .MAX_VALUE )))
1149
+ .setMaxElapsedTimeMillis (
1150
+ Math .max (
1151
+ 1 ,
1152
+ (int )
1153
+ Math .min (
1154
+ streamingRetrySettings .getTotalTimeout ().toMillis (), Integer .MAX_VALUE )))
1116
1155
.build ();
1117
1156
}
1118
1157
1119
- private static void backoffSleep (Context context , BackOff backoff ) throws SpannerException {
1158
+ private void backoffSleep (Context context , BackOff backoff ) throws SpannerException {
1120
1159
backoffSleep (context , nextBackOffMillis (backoff ));
1121
1160
}
1122
1161
@@ -1128,7 +1167,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
1128
1167
}
1129
1168
}
1130
1169
1131
- private static void backoffSleep (Context context , long backoffMillis ) throws SpannerException {
1170
+ private void backoffSleep (Context context , long backoffMillis ) throws SpannerException {
1132
1171
tracer
1133
1172
.getCurrentSpan ()
1134
1173
.addAnnotation (
@@ -1145,7 +1184,7 @@ private static void backoffSleep(Context context, long backoffMillis) throws Spa
1145
1184
try {
1146
1185
if (backoffMillis == BackOff .STOP ) {
1147
1186
// Highly unlikely but we handle it just in case.
1148
- backoffMillis = STREAMING_RETRY_SETTINGS .getMaxRetryDelay ().toMillis ();
1187
+ backoffMillis = streamingRetrySettings .getMaxRetryDelay ().toMillis ();
1149
1188
}
1150
1189
if (latch .await (backoffMillis , TimeUnit .MILLISECONDS )) {
1151
1190
// Woken by context cancellation.
@@ -1233,19 +1272,20 @@ protected PartialResultSet computeNext() {
1233
1272
return null ;
1234
1273
}
1235
1274
}
1236
- } catch (SpannerException e ) {
1237
- if (safeToRetry && e . isRetryable ()) {
1275
+ } catch (SpannerException spannerException ) {
1276
+ if (safeToRetry && isRetryable (spannerException )) {
1238
1277
span .addAnnotation (
1239
- "Stream broken. Safe to retry" , TraceUtil .getExceptionAnnotations (e ));
1240
- logger .log (Level .FINE , "Retryable exception, will sleep and retry" , e );
1278
+ "Stream broken. Safe to retry" ,
1279
+ TraceUtil .getExceptionAnnotations (spannerException ));
1280
+ logger .log (Level .FINE , "Retryable exception, will sleep and retry" , spannerException );
1241
1281
// Truncate any items in the buffer before the last retry token.
1242
1282
while (!buffer .isEmpty () && buffer .getLast ().getResumeToken ().isEmpty ()) {
1243
1283
buffer .removeLast ();
1244
1284
}
1245
1285
assert buffer .isEmpty () || buffer .getLast ().getResumeToken ().equals (resumeToken );
1246
1286
stream = null ;
1247
1287
try (Scope s = tracer .withSpan (span )) {
1248
- long delay = e .getRetryDelayInMillis ();
1288
+ long delay = spannerException .getRetryDelayInMillis ();
1249
1289
if (delay != -1 ) {
1250
1290
backoffSleep (context , delay );
1251
1291
} else {
@@ -1256,15 +1296,21 @@ protected PartialResultSet computeNext() {
1256
1296
continue ;
1257
1297
}
1258
1298
span .addAnnotation ("Stream broken. Not safe to retry" );
1259
- TraceUtil .setWithFailure (span , e );
1260
- throw e ;
1299
+ TraceUtil .setWithFailure (span , spannerException );
1300
+ throw spannerException ;
1261
1301
} catch (RuntimeException e ) {
1262
1302
span .addAnnotation ("Stream broken. Not safe to retry" );
1263
1303
TraceUtil .setWithFailure (span , e );
1264
1304
throw e ;
1265
1305
}
1266
1306
}
1267
1307
}
1308
+
1309
+ boolean isRetryable (SpannerException spannerException ) {
1310
+ return spannerException .isRetryable ()
1311
+ || retryableCodes .contains (
1312
+ GrpcStatusCode .of (spannerException .getErrorCode ().getGrpcStatusCode ()).getCode ());
1313
+ }
1268
1314
}
1269
1315
1270
1316
static double valueProtoToFloat64 (com .google .protobuf .Value proto ) {
0 commit comments