Skip to content

Commit fca0b5c

Browse files
authored
Make flow extensions be retryable (apollographql#1997)
* Make flow extensions be retryable Closes apollographql#1986 * Fix issue with converting `ApolloCall` to flow, when chanel is not close when flow is canceled Fix KDocs formatting * Fix apollographql#1808 * Deprecate `toChannel`
1 parent c082390 commit fca0b5c

File tree

4 files changed

+188
-94
lines changed

4 files changed

+188
-94
lines changed

apollo-coroutines-support/src/main/kotlin/com/apollographql/apollo/coroutines/CoroutinesExtensions.kt

+116-65
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ import com.apollographql.apollo.api.Response
88
import com.apollographql.apollo.exception.ApolloException
99
import kotlinx.coroutines.*
1010
import kotlinx.coroutines.channels.Channel
11-
import kotlinx.coroutines.flow.flow
11+
import kotlinx.coroutines.channels.awaitClose
12+
import kotlinx.coroutines.flow.*
1213

1314
private class ChannelCallback<T>(val channel: Channel<Response<T>>) : ApolloCall.Callback<T>() {
1415

16+
@ExperimentalCoroutinesApi
1517
override fun onResponse(response: Response<T>) {
16-
channel.offer(response)
18+
if (!channel.isClosedForSend) {
19+
channel.offer(response)
20+
}
1721
}
1822

1923
override fun onFailure(e: ApolloException) {
@@ -39,64 +43,72 @@ private fun checkCapacity(capacity: Int) {
3943
}
4044

4145
/**
42-
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.flow.Flow}.
46+
* Converts an [ApolloCall] to an [Flow].
4347
*
4448
* @param <T> the value type.
45-
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
46-
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
47-
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
48-
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
49-
* @return a flow which emits Responses<T>
49+
* @return a flow which emits [Responses<T>]
5050
*/
51-
fun <T> ApolloCall<T>.toFlow(capacity: Int = Channel.UNLIMITED) = flow {
52-
checkCapacity(capacity)
53-
val channel = Channel<Response<T>>(capacity)
54-
55-
enqueue(ChannelCallback(channel = channel))
56-
try {
57-
for (item in channel) {
58-
emit(item)
51+
@ExperimentalCoroutinesApi
52+
fun <T> ApolloCall<T>.toFlow() = callbackFlow {
53+
clone().enqueue(
54+
object : ApolloCall.Callback<T>() {
55+
override fun onResponse(response: Response<T>) {
56+
offer(response)
5957
}
60-
} finally {
61-
cancel()
62-
}
58+
59+
override fun onFailure(e: ApolloException) {
60+
close(e)
61+
}
62+
63+
override fun onStatusEvent(event: ApolloCall.StatusEvent) {
64+
if (event == ApolloCall.StatusEvent.COMPLETED) {
65+
close()
66+
}
67+
}
68+
}
69+
)
70+
awaitClose { this@toFlow.cancel() }
6371
}
6472

6573
/**
66-
* Converts an {@link ApolloQueryWatcher} to an {@link kotlinx.coroutines.flow.Flow}.
74+
* Converts an [ApolloQueryWatcher] to an [Flow].
6775
*
6876
* @param <T> the value type.
69-
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
70-
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
71-
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
72-
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
73-
* @return a flow which emits Responses<T>
77+
* @return a flow which emits [Responses<T>]
7478
*/
75-
fun <T> ApolloQueryWatcher<T>.toFlow(capacity: Int = Channel.UNLIMITED) = flow {
76-
checkCapacity(capacity)
77-
val channel = Channel<Response<T>>(capacity)
78-
79-
enqueueAndWatch(ChannelCallback(channel = channel))
80-
try {
81-
for (item in channel) {
82-
emit(item)
79+
@ExperimentalCoroutinesApi
80+
fun <T> ApolloQueryWatcher<T>.toFlow() = callbackFlow {
81+
clone().enqueueAndWatch(
82+
object : ApolloCall.Callback<T>() {
83+
override fun onResponse(response: Response<T>) {
84+
offer(response)
8385
}
84-
} finally {
85-
cancel()
86-
}
86+
87+
override fun onFailure(e: ApolloException) {
88+
close(e)
89+
}
90+
91+
override fun onStatusEvent(event: ApolloCall.StatusEvent) {
92+
if (event == ApolloCall.StatusEvent.COMPLETED) {
93+
close()
94+
}
95+
}
96+
}
97+
)
98+
awaitClose { this@toFlow.cancel() }
8799
}
88100

89101
/**
90-
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.channels.Channel}. The number of values produced
91-
* by the channel is based on the {@link com.apollographql.apollo.fetcher.ResponseFetcher} used with the call.
102+
* Converts an [ApolloCall] to an [Channel]. The number of values produced by the channel is based on the
103+
* [com.apollographql.apollo.fetcher.ResponseFetcher] used with the call.
92104
*
93105
* @param <T> the value type.
94-
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
95-
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
96-
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
97-
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
98-
* @return the converted channel
106+
* @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment
107+
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
108+
* @return a channel which emits [Responses<T>]
99109
*/
110+
@ExperimentalCoroutinesApi
111+
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
100112
fun <T> ApolloCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
101113
checkCapacity(capacity)
102114
val channel = Channel<Response<T>>(capacity)
@@ -110,9 +122,8 @@ fun <T> ApolloCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Resp
110122
}
111123

112124
/**
113-
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.Deferred}. This is a convenience method
114-
* that will only return the first value emitted. If the more than one response is required, for an example
115-
* to retrieve cached and network response, use {@link toChannel} instead.
125+
* Converts an [ApolloCall] to an [Deferred]. This is a convenience method that will only return the first value emitted.
126+
* If the more than one response is required, for an example to retrieve cached and network response, use [toChannel] instead.
116127
*
117128
* @param <T> the value type.
118129
* @return the deferred
@@ -127,31 +138,34 @@ fun <T> ApolloCall<T>.toDeferred(): Deferred<Response<T>> {
127138
}
128139
enqueue(object : ApolloCall.Callback<T>() {
129140
override fun onResponse(response: Response<T>) {
130-
deferred.complete(response)
141+
if (deferred.isActive) {
142+
deferred.complete(response)
143+
}
131144
}
132145

133146
override fun onFailure(e: ApolloException) {
134-
deferred.completeExceptionally(e)
147+
if (deferred.isActive) {
148+
deferred.completeExceptionally(e)
149+
}
135150
}
136151
})
137152

138153
return deferred
139154
}
140155

141156
/**
142-
* Converts an {@link ApolloQueryWatcher} to an {@link kotlinx.coroutines.channels.Channel}.
157+
* Converts an [ApolloQueryWatcher] to an [Channel].
143158
*
144159
* @param <T> the value type.
145-
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
146-
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
147-
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
148-
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
149-
* @return the converted channel
160+
* @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment
161+
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
162+
* @return a channel which emits [Responses<T>]
150163
*/
164+
@ExperimentalCoroutinesApi
165+
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
151166
fun <T> ApolloQueryWatcher<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
152167
checkCapacity(capacity)
153168
val channel = Channel<Response<T>>(capacity)
154-
155169
channel.invokeOnClose {
156170
cancel()
157171
}
@@ -161,19 +175,19 @@ fun <T> ApolloQueryWatcher<T>.toChannel(capacity: Int = Channel.UNLIMITED): Chan
161175
}
162176

163177
/**
164-
* Converts an {@link ApolloSubscriptionCall} to an {@link kotlinx.coroutines.channels.Channel}.
178+
* Converts an [ApolloSubscriptionCall] to an [Channel].
165179
*
166180
* @param <T> the value type.
167-
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
168-
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
169-
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
170-
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
171-
* @return the converted channel
181+
* @param capacity the {@link Capacity} used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported
182+
* at the moment
183+
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
184+
* @return a channel which emits [Responses<T>]
172185
*/
186+
@ExperimentalCoroutinesApi
187+
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
173188
fun <T> ApolloSubscriptionCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
174189
checkCapacity(capacity)
175190
val channel = Channel<Response<T>>(capacity)
176-
177191
channel.invokeOnClose {
178192
cancel()
179193
}
@@ -202,7 +216,40 @@ fun <T> ApolloSubscriptionCall<T>.toChannel(capacity: Int = Channel.UNLIMITED):
202216
}
203217

204218
/**
205-
* Converts an {@link ApolloPrefetch} to an {@link kotlinx.coroutines.Job}.
219+
* Converts an [ApolloSubscriptionCall] to an [Flow].
220+
*
221+
* @param <T> the value type.
222+
* @return a flow which emits [Responses<T>]
223+
*/
224+
@ExperimentalCoroutinesApi
225+
fun <T> ApolloSubscriptionCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
226+
clone().execute(
227+
object : ApolloSubscriptionCall.Callback<T> {
228+
override fun onConnected() {
229+
}
230+
231+
override fun onResponse(response: Response<T>) {
232+
channel.offer(response)
233+
}
234+
235+
override fun onFailure(e: ApolloException) {
236+
channel.close(e)
237+
}
238+
239+
override fun onCompleted() {
240+
channel.close()
241+
}
242+
243+
override fun onTerminated() {
244+
channel.close()
245+
}
246+
}
247+
)
248+
awaitClose { this@toFlow.cancel() }
249+
}
250+
251+
/**
252+
* Converts an [ApolloPrefetch] to [Job].
206253
*
207254
* @param <T> the value type.
208255
* @return the converted job
@@ -218,11 +265,15 @@ fun ApolloPrefetch.toJob(): Job {
218265

219266
enqueue(object : ApolloPrefetch.Callback() {
220267
override fun onSuccess() {
221-
deferred.complete(Unit)
268+
if (deferred.isActive) {
269+
deferred.complete(Unit)
270+
}
222271
}
223272

224273
override fun onFailure(e: ApolloException) {
225-
deferred.completeExceptionally(e)
274+
if (deferred.isActive) {
275+
deferred.completeExceptionally(e)
276+
}
226277
}
227278
})
228279

apollo-integration/src/test/java/com/apollographql/apollo/CoroutinesApolloTest.kt

+62-28
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ import com.apollographql.apollo.integration.normalizer.EpisodeHeroNameQuery
1818
import com.apollographql.apollo.integration.normalizer.HeroAndFriendsNamesWithIDsQuery
1919
import com.apollographql.apollo.integration.normalizer.type.Episode
2020
import com.google.common.truth.Truth.assertThat
21-
import kotlinx.coroutines.GlobalScope
21+
import kotlinx.coroutines.*
2222
import kotlinx.coroutines.channels.Channel
23-
import kotlinx.coroutines.delay
23+
import kotlinx.coroutines.flow.first
24+
import kotlinx.coroutines.flow.retry
25+
import kotlinx.coroutines.flow.single
2426
import kotlinx.coroutines.flow.toList
25-
import kotlinx.coroutines.launch
26-
import kotlinx.coroutines.runBlocking
2727
import okhttp3.Dispatcher
2828
import okhttp3.OkHttpClient
2929
import okhttp3.mockwebserver.MockResponse
@@ -32,7 +32,6 @@ import org.junit.Before
3232
import org.junit.Rule
3333
import org.junit.Test
3434

35-
3635
class CoroutinesApolloTest {
3736
private lateinit var apolloClient: ApolloClient
3837
@get:Rule
@@ -197,37 +196,72 @@ class CoroutinesApolloTest {
197196
assertThat(channel.isClosedForReceive).isEqualTo(true)
198197
}
199198

200-
@Test
201-
fun flowCanBeRead() {
202-
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
199+
@Test
200+
fun flowCanBeRead() {
201+
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
203202

204-
val flow = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toFlow()
203+
val flow = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toFlow()
205204

206-
runBlocking {
207-
val result = mutableListOf<Response<EpisodeHeroNameQuery.Data>>()
208-
flow.toList(result)
209-
assertThat(result.size).isEqualTo(1)
210-
assertThat(result[0].data()?.hero()?.name()).isEqualTo("R2-D2")
211-
}
205+
runBlocking {
206+
val result = mutableListOf<Response<EpisodeHeroNameQuery.Data>>()
207+
flow.toList(result)
208+
assertThat(result.size).isEqualTo(1)
209+
assertThat(result[0].data()?.hero()?.name()).isEqualTo("R2-D2")
212210
}
211+
}
213212

214-
@Test
215-
fun flowError() {
216-
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))
213+
@Test
214+
fun flowError() {
215+
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))
217216

218-
val flow = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toFlow()
217+
val flow = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toFlow()
219218

220-
runBlocking {
221-
val result = mutableListOf<Response<EpisodeHeroNameQuery.Data>>()
222-
try {
223-
flow.toList(result)
224-
} catch (e: ApolloException) {
225-
return@runBlocking
226-
}
219+
runBlocking {
220+
val result = mutableListOf<Response<EpisodeHeroNameQuery.Data>>()
221+
try {
222+
flow.toList(result)
223+
} catch (e: ApolloException) {
224+
return@runBlocking
225+
}
227226

228-
throw Exception("exception has not been thrown")
229-
}
227+
throw Exception("exception has not been thrown")
230228
}
229+
}
230+
231+
@Test
232+
@ExperimentalCoroutinesApi
233+
fun callFlowRetry() {
234+
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))
235+
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
236+
237+
val response = runBlocking {
238+
apolloClient
239+
.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE)))
240+
.toFlow()
241+
.retry(retries = 1)
242+
.single()
243+
}
244+
245+
assertThat(response.data()!!.hero()!!.name()).isEqualTo("R2-D2")
246+
}
247+
248+
@Test
249+
@ExperimentalCoroutinesApi
250+
fun watcherFlowRetry() {
251+
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))
252+
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
253+
254+
val response = runBlocking {
255+
apolloClient
256+
.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE)))
257+
.watcher()
258+
.toFlow()
259+
.retry(retries = 1)
260+
.first()
261+
}
262+
263+
assertThat(response.data()!!.hero()!!.name()).isEqualTo("R2-D2")
264+
}
231265

232266
companion object {
233267

0 commit comments

Comments
 (0)