Skip to content

Commit 5f6bc63

Browse files
authored
dataconnect: GrpcMetadataIntegrationTest.kt: Fix race condition waiting for auth/appcheck to be ready (#6446)
1 parent 4100ebf commit 5f6bc63

File tree

5 files changed

+271
-59
lines changed

5 files changed

+271
-59
lines changed

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.google.android.gms.tasks.Tasks
2323
import com.google.firebase.appcheck.AppCheckProvider
2424
import com.google.firebase.appcheck.AppCheckProviderFactory
2525
import com.google.firebase.appcheck.FirebaseAppCheck
26+
import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
2627
import com.google.firebase.dataconnect.generated.GeneratedConnector
2728
import com.google.firebase.dataconnect.generated.GeneratedMutation
2829
import com.google.firebase.dataconnect.generated.GeneratedQuery
@@ -137,6 +138,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
137138
fun executeQueryShouldNotSendAuthMetadataWhenNotLoggedIn() = runTest {
138139
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
139140
val dataConnect = dataConnectFactory.newInstance(grpcServer)
141+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
140142
val queryRef = dataConnect.query("qryfyk7yfppfe", Unit, serializer<Unit>(), serializer<Unit>())
141143
val metadatasJob = async { grpcServer.metadatas.first() }
142144

@@ -149,6 +151,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
149151
fun executeMutationShouldNotSendAuthMetadataWhenNotLoggedIn() = runTest {
150152
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
151153
val dataConnect = dataConnectFactory.newInstance(grpcServer)
154+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
152155
val mutationRef =
153156
dataConnect.mutation("mutckjpte9v9j", Unit, serializer<Unit>(), serializer<Unit>())
154157
val metadatasJob = async { grpcServer.metadatas.first() }
@@ -162,6 +165,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
162165
fun executeQueryShouldSendAuthMetadataWhenLoggedIn() = runTest {
163166
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
164167
val dataConnect = dataConnectFactory.newInstance(grpcServer)
168+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
165169
val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer<Unit>(), serializer<Unit>())
166170
val metadatasJob = async { grpcServer.metadatas.first() }
167171
firebaseAuthSignIn(dataConnect)
@@ -175,6 +179,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
175179
fun executeMutationShouldSendAuthMetadataWhenLoggedIn() = runTest {
176180
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
177181
val dataConnect = dataConnectFactory.newInstance(grpcServer)
182+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
178183
val mutationRef =
179184
dataConnect.mutation("mutayn7as5k7d", Unit, serializer<Unit>(), serializer<Unit>())
180185
val metadatasJob = async { grpcServer.metadatas.first() }
@@ -189,6 +194,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
189194
fun executeQueryShouldNotSendAuthMetadataAfterLogout() = runTest {
190195
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
191196
val dataConnect = dataConnectFactory.newInstance(grpcServer)
197+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
192198
val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer<Unit>(), serializer<Unit>())
193199
val metadatasJob1 = async { grpcServer.metadatas.first() }
194200
val metadatasJob2 = async { grpcServer.metadatas.take(2).last() }
@@ -206,6 +212,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
206212
fun executeMutationShouldNotSendAuthMetadataAfterLogout() = runTest {
207213
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
208214
val dataConnect = dataConnectFactory.newInstance(grpcServer)
215+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
209216
val mutationRef =
210217
dataConnect.mutation("mutvw945ag3vv", Unit, serializer<Unit>(), serializer<Unit>())
211218
val metadatasJob1 = async { grpcServer.metadatas.first() }
@@ -226,6 +233,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
226233
// appcheck token is sent at all.
227234
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
228235
val dataConnect = dataConnectFactory.newInstance(grpcServer)
236+
(dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady()
229237
val queryRef = dataConnect.query("qrybbeekpkkck", Unit, serializer<Unit>(), serializer<Unit>())
230238
val metadatasJob = async { grpcServer.metadatas.first() }
231239

@@ -240,6 +248,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
240248
// appcheck token is sent at all.
241249
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
242250
val dataConnect = dataConnectFactory.newInstance(grpcServer)
251+
(dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady()
243252
val mutationRef =
244253
dataConnect.mutation("mutbs7hhxk39c", Unit, serializer<Unit>(), serializer<Unit>())
245254
val metadatasJob = async { grpcServer.metadatas.first() }
@@ -253,6 +262,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
253262
fun executeQueryShouldSendAppCheckMetadataWhenAppCheckIsEnabled() = runTest {
254263
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
255264
val dataConnect = dataConnectFactory.newInstance(grpcServer)
265+
(dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady()
256266
val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer<Unit>(), serializer<Unit>())
257267
val metadatasJob = async { grpcServer.metadatas.first() }
258268
val appCheck = FirebaseAppCheck.getInstance(dataConnect.app)
@@ -267,6 +277,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() {
267277
fun executeMutationShouldSendAppCheckMetadataWhenAppCheckIsEnabled() = runTest {
268278
val grpcServer = inProcessDataConnectGrpcServer.newInstance()
269279
val dataConnect = dataConnectFactory.newInstance(grpcServer)
280+
(dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady()
270281
val mutationRef =
271282
dataConnect.mutation("mutz4hzqzpgb4", Unit, serializer<Unit>(), serializer<Unit>())
272283
val metadatasJob = async { grpcServer.metadatas.first() }

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ import kotlinx.coroutines.SupervisorJob
4545
import kotlinx.coroutines.async
4646
import kotlinx.coroutines.cancel
4747
import kotlinx.coroutines.ensureActive
48+
import kotlinx.coroutines.flow.MutableStateFlow
49+
import kotlinx.coroutines.flow.StateFlow
50+
import kotlinx.coroutines.flow.asStateFlow
4851
import kotlinx.coroutines.launch
4952
import kotlinx.coroutines.yield
5053

@@ -58,6 +61,9 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any, L : Any>(
5861
val instanceId: String
5962
get() = logger.nameWithId
6063

64+
private val _providerAvailable = MutableStateFlow(false)
65+
val providerAvailable: StateFlow<Boolean> = _providerAvailable.asStateFlow()
66+
6167
@Suppress("LeakingThis") private val weakThis = WeakReference(this)
6268

6369
private val coroutineScope =
@@ -448,6 +454,8 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any, L : Any>(
448454
break
449455
}
450456
}
457+
458+
_providerAvailable.value = true
451459
}
452460

453461
/**

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ import kotlinx.coroutines.async
5454
import kotlinx.coroutines.cancel
5555
import kotlinx.coroutines.flow.MutableStateFlow
5656
import kotlinx.coroutines.flow.collect
57+
import kotlinx.coroutines.flow.first
58+
import kotlinx.coroutines.launch
5759
import kotlinx.coroutines.runBlocking
5860
import kotlinx.coroutines.sync.Mutex
5961
import kotlinx.coroutines.sync.withLock
@@ -72,6 +74,9 @@ internal interface FirebaseDataConnectInternal : FirebaseDataConnect {
7274

7375
val lazyGrpcClient: SuspendingLazy<DataConnectGrpcClient>
7476
val lazyQueryManager: SuspendingLazy<QueryManager>
77+
78+
suspend fun awaitAuthReady()
79+
suspend fun awaitAppCheckReady()
7580
}
7681

7782
internal class FirebaseDataConnectImpl(
@@ -107,11 +112,18 @@ internal class FirebaseDataConnectImpl(
107112
SupervisorJob() +
108113
nonBlockingDispatcher +
109114
CoroutineName(instanceId) +
110-
CoroutineExceptionHandler { _, throwable ->
111-
logger.warn(throwable) { "uncaught exception from a coroutine" }
115+
CoroutineExceptionHandler { context, throwable ->
116+
logger.warn(throwable) {
117+
val coroutineName = context[CoroutineName]?.name
118+
"WARNING: uncaught exception from coroutine named \"$coroutineName\" " +
119+
"(error code jszxcbe37k)"
120+
}
112121
}
113122
)
114123

124+
private val authProviderAvailable = MutableStateFlow(false)
125+
private val appCheckProviderAvailable = MutableStateFlow(false)
126+
115127
// Protects `closed`, `grpcClient`, `emulatorSettings`, and `queryManager`.
116128
private val mutex = Mutex()
117129

@@ -121,29 +133,49 @@ internal class FirebaseDataConnectImpl(
121133
// All accesses to this variable _must_ have locked `mutex`.
122134
private var closed = false
123135

124-
private val lazyDataConnectAuth =
125-
SuspendingLazy(mutex) {
126-
if (closed) throw IllegalStateException("FirebaseDataConnect instance has been closed")
127-
DataConnectAuth(
128-
deferredAuthProvider = deferredAuthProvider,
129-
parentCoroutineScope = coroutineScope,
130-
blockingDispatcher = blockingDispatcher,
131-
logger = Logger("DataConnectAuth").apply { debug { "created by $instanceId" } },
132-
)
133-
.apply { initialize() }
136+
private val dataConnectAuth: DataConnectAuth =
137+
DataConnectAuth(
138+
deferredAuthProvider = deferredAuthProvider,
139+
parentCoroutineScope = coroutineScope,
140+
blockingDispatcher = blockingDispatcher,
141+
logger = Logger("DataConnectAuth").apply { debug { "created by $instanceId" } },
142+
)
143+
144+
override suspend fun awaitAuthReady() {
145+
authProviderAvailable.first { it }
146+
}
147+
148+
init {
149+
coroutineScope.launch(CoroutineName("DataConnectAuth initializer for $instanceId")) {
150+
dataConnectAuth.initialize()
151+
dataConnectAuth.providerAvailable.collect { isProviderAvailable ->
152+
logger.debug { "authProviderAvailable=$isProviderAvailable" }
153+
authProviderAvailable.value = isProviderAvailable
154+
}
134155
}
156+
}
135157

136-
private val lazyDataConnectAppCheck =
137-
SuspendingLazy(mutex) {
138-
if (closed) throw IllegalStateException("FirebaseDataConnect instance has been closed")
139-
DataConnectAppCheck(
140-
deferredAppCheckTokenProvider = deferredAppCheckProvider,
141-
parentCoroutineScope = coroutineScope,
142-
blockingDispatcher = blockingDispatcher,
143-
logger = Logger("DataConnectAppCheck").apply { debug { "created by $instanceId" } },
144-
)
145-
.apply { initialize() }
158+
private val dataConnectAppCheck: DataConnectAppCheck =
159+
DataConnectAppCheck(
160+
deferredAppCheckTokenProvider = deferredAppCheckProvider,
161+
parentCoroutineScope = coroutineScope,
162+
blockingDispatcher = blockingDispatcher,
163+
logger = Logger("DataConnectAppCheck").apply { debug { "created by $instanceId" } },
164+
)
165+
166+
override suspend fun awaitAppCheckReady() {
167+
appCheckProviderAvailable.first { it }
168+
}
169+
170+
init {
171+
coroutineScope.launch(CoroutineName("DataConnectAppCheck initializer for $instanceId")) {
172+
dataConnectAppCheck.initialize()
173+
dataConnectAppCheck.providerAvailable.collect { isProviderAvailable ->
174+
logger.debug { "appCheckProviderAvailable=$isProviderAvailable" }
175+
appCheckProviderAvailable.value = isProviderAvailable
176+
}
146177
}
178+
}
147179

148180
private val lazyGrpcRPCs =
149181
SuspendingLazy(mutex) {
@@ -181,8 +213,8 @@ internal class FirebaseDataConnectImpl(
181213
val grpcMetadata =
182214
DataConnectGrpcMetadata.forSystemVersions(
183215
firebaseApp = app,
184-
dataConnectAuth = lazyDataConnectAuth.getLocked(),
185-
dataConnectAppCheck = lazyDataConnectAppCheck.getLocked(),
216+
dataConnectAuth = dataConnectAuth,
217+
dataConnectAppCheck = dataConnectAppCheck,
186218
connectorLocation = config.location,
187219
parentLogger = logger,
188220
)
@@ -210,8 +242,8 @@ internal class FirebaseDataConnectImpl(
210242
projectId = projectId,
211243
connector = config,
212244
grpcRPCs = lazyGrpcRPCs.getLocked(),
213-
dataConnectAuth = lazyDataConnectAuth.getLocked(),
214-
dataConnectAppCheck = lazyDataConnectAppCheck.getLocked(),
245+
dataConnectAuth = dataConnectAuth,
246+
dataConnectAppCheck = dataConnectAppCheck,
215247
logger = Logger("DataConnectGrpcClient").apply { debug { "created by $instanceId" } },
216248
)
217249
}
@@ -397,8 +429,8 @@ internal class FirebaseDataConnectImpl(
397429

398430
// Close Auth and AppCheck synchronously to avoid race conditions with auth callbacks.
399431
// Since close() is re-entrant, this is safe even if they have already been closed.
400-
lazyDataConnectAuth.initializedValueOrNull?.close()
401-
lazyDataConnectAppCheck.initializedValueOrNull?.close()
432+
dataConnectAuth.close()
433+
dataConnectAppCheck.close()
402434

403435
// Start the job to asynchronously close the gRPC client.
404436
while (true) {

0 commit comments

Comments
 (0)