23
23
import com .google .android .gms .tasks .Tasks ;
24
24
import com .google .firebase .firestore .core .DatabaseInfo ;
25
25
import com .google .firebase .firestore .util .AsyncQueue ;
26
+ import com .google .firebase .firestore .util .AsyncQueue .DelayedTask ;
27
+ import com .google .firebase .firestore .util .AsyncQueue .TimerId ;
26
28
import com .google .firebase .firestore .util .Executors ;
27
29
import com .google .firebase .firestore .util .Logger ;
28
30
import com .google .firebase .firestore .util .Supplier ;
29
31
import com .google .firestore .v1 .FirestoreGrpc ;
30
32
import io .grpc .CallCredentials ;
31
33
import io .grpc .CallOptions ;
32
34
import io .grpc .ClientCall ;
35
+ import io .grpc .ConnectivityState ;
33
36
import io .grpc .ManagedChannel ;
34
37
import io .grpc .ManagedChannelBuilder ;
35
38
import io .grpc .MethodDescriptor ;
@@ -45,11 +48,22 @@ public class GrpcCallProvider {
45
48
46
49
private static Supplier <ManagedChannelBuilder <?>> overrideChannelBuilderSupplier ;
47
50
48
- private final Task <ManagedChannel > channelTask ;
51
+ private Task <ManagedChannel > channelTask ;
49
52
private final AsyncQueue asyncQueue ;
50
53
51
54
private CallOptions callOptions ;
52
55
56
+ // This timeout is used when attempting to establish a connection in gRPC. If a connection attempt
57
+ // does not succeed in CONNECTIVITY_ATTEMPT_TIMEOUT_MS, we restart the channel and try
58
+ // reconnecting again, rather than waiting up to 2+ minutes for gRPC to timeout.
59
+ // More details about usage can be found in GrpcCallProvider.onConnectivityStateChanged().
60
+ private static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000 ;
61
+ private DelayedTask connectivityAttemptTimer ;
62
+
63
+ private final Context context ;
64
+ private final DatabaseInfo databaseInfo ;
65
+ private final CallCredentials firestoreHeaders ;
66
+
53
67
/**
54
68
* Helper function to globally override the channel that RPCs use. Useful for testing when you
55
69
* want to bypass SSL certificate checking.
@@ -69,24 +83,11 @@ public static void overrideChannelBuilder(
69
83
DatabaseInfo databaseInfo ,
70
84
CallCredentials firestoreHeaders ) {
71
85
this .asyncQueue = asyncQueue ;
86
+ this .context = context ;
87
+ this .databaseInfo = databaseInfo ;
88
+ this .firestoreHeaders = firestoreHeaders ;
72
89
73
- // We execute network initialization on a separate thread to not block operations that depend on
74
- // the AsyncQueue.
75
- this .channelTask =
76
- Tasks .call (
77
- Executors .BACKGROUND_EXECUTOR ,
78
- () -> {
79
- ManagedChannel channel = initChannel (context , databaseInfo );
80
- FirestoreGrpc .FirestoreStub firestoreStub =
81
- FirestoreGrpc .newStub (channel )
82
- .withCallCredentials (firestoreHeaders )
83
- // Ensure all callbacks are issued on the worker queue. If this call is
84
- // removed, all calls need to be audited to make sure they are executed on the
85
- // right thread.
86
- .withExecutor (asyncQueue .getExecutor ());
87
- callOptions = firestoreStub .getCallOptions ();
88
- return channel ;
89
- });
90
+ initChannelTask ();
90
91
}
91
92
92
93
/** Sets up the SSL provider and configures the gRPC channel. */
@@ -198,4 +199,80 @@ void shutdown() {
198
199
Thread .currentThread ().interrupt ();
199
200
}
200
201
}
202
+
203
+ /**
204
+ * Monitors the connectivity state of the gRPC channel and resets the channel when gRPC fails to
205
+ * connect.
206
+ *
207
+ * <p>We currently cannot configure timeouts in connection attempts for gRPC
208
+ * (https://github.com/grpc/grpc-java/issues/1943), and until they support doing so, the gRPC
209
+ * connection can stay open for up to 2+ minutes before notifying us that it has shut down.
210
+ *
211
+ * <p>We start a timer when the channel enters ConnectivityState.CONNECTING. If the timer elapses,
212
+ * we reset the channel by shutting it down and reinitializing the channelTask. Changes to the
213
+ * connectivity state will clear the timer and start a new one-time listener for the next
214
+ * ConnectivityState change.
215
+ *
216
+ * @param channel The channel to monitor the connectivity state of.
217
+ */
218
+ private void onConnectivityStateChange (ManagedChannel channel ) {
219
+ ConnectivityState newState = channel .getState (true );
220
+ Logger .debug (LOG_TAG , "Current gRPC connectivity state: " + newState );
221
+ // Clear the timer, so we don't end up with multiple connectivityAttemptTimers.
222
+ clearConnectivityAttemptTimer ();
223
+
224
+ if (newState == ConnectivityState .CONNECTING ) {
225
+ Logger .debug (LOG_TAG , "Setting the connectivityAttemptTimer" );
226
+ connectivityAttemptTimer =
227
+ asyncQueue .enqueueAfterDelay (
228
+ TimerId .CONNECTIVITY_ATTEMPT_TIMER ,
229
+ CONNECTIVITY_ATTEMPT_TIMEOUT_MS ,
230
+ () -> {
231
+ Logger .debug (LOG_TAG , "connectivityAttemptTimer elapsed. Resetting the channel." );
232
+ clearConnectivityAttemptTimer ();
233
+ resetChannel (channel );
234
+ });
235
+ }
236
+ // Re-listen for next state change.
237
+ channel .notifyWhenStateChanged (
238
+ newState , () -> asyncQueue .enqueueAndForget (() -> onConnectivityStateChange (channel )));
239
+ }
240
+
241
+ private void resetChannel (ManagedChannel channel ) {
242
+ asyncQueue .enqueueAndForget (
243
+ () -> {
244
+ channel .shutdownNow ();
245
+ initChannelTask ();
246
+ });
247
+ }
248
+
249
+ private void initChannelTask () {
250
+ // We execute network initialization on a separate thread to not block operations that depend on
251
+ // the AsyncQueue.
252
+ this .channelTask =
253
+ Tasks .call (
254
+ Executors .BACKGROUND_EXECUTOR ,
255
+ () -> {
256
+ ManagedChannel channel = initChannel (context , databaseInfo );
257
+ onConnectivityStateChange (channel );
258
+ FirestoreGrpc .FirestoreStub firestoreStub =
259
+ FirestoreGrpc .newStub (channel )
260
+ .withCallCredentials (firestoreHeaders )
261
+ // Ensure all callbacks are issued on the worker queue. If this call is
262
+ // removed, all calls need to be audited to make sure they are executed on the
263
+ // right thread.
264
+ .withExecutor (asyncQueue .getExecutor ());
265
+ callOptions = firestoreStub .getCallOptions ();
266
+ Logger .debug (LOG_TAG , "Channel successfully reset." );
267
+ return channel ;
268
+ });
269
+ }
270
+
271
+ private void clearConnectivityAttemptTimer () {
272
+ if (connectivityAttemptTimer != null ) {
273
+ Logger .debug (LOG_TAG , "Clearing the connectivityAttemptTimer" );
274
+ connectivityAttemptTimer .cancel ();
275
+ connectivityAttemptTimer = null ;
276
+ }
277
+ }
201
278
}
0 commit comments