Skip to content

Commit b564bee

Browse files
authored
SDK support for logical termination in Firestore BatchGetDocuments (#3622)
* SDK support for logical termination in Firestore BatchGetDocuments
1 parent ab5697b commit b564bee

File tree

2 files changed

+48
-33
lines changed

2 files changed

+48
-33
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/Datastore.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414

1515
package com.google.firebase.firestore.remote;
1616

17+
import static com.google.firebase.firestore.util.Util.exceptionFromStatus;
18+
1719
import android.content.Context;
1820
import android.os.Build;
1921
import androidx.annotation.Nullable;
2022
import com.google.android.gms.tasks.Task;
23+
import com.google.android.gms.tasks.TaskCompletionSource;
2124
import com.google.firebase.firestore.FirebaseFirestoreException;
2225
import com.google.firebase.firestore.auth.CredentialsProvider;
2326
import com.google.firebase.firestore.auth.User;
@@ -36,6 +39,7 @@
3639
import io.grpc.Status;
3740
import java.util.ArrayList;
3841
import java.util.Arrays;
42+
import java.util.Collections;
3943
import java.util.HashMap;
4044
import java.util.HashSet;
4145
import java.util.List;
@@ -170,21 +174,18 @@ public Task<List<MutableDocument>> lookup(List<DocumentKey> keys) {
170174
for (DocumentKey key : keys) {
171175
builder.addDocuments(serializer.encodeKey(key));
172176
}
173-
return channel
174-
.runStreamingResponseRpc(FirestoreGrpc.getBatchGetDocumentsMethod(), builder.build())
175-
.continueWith(
176-
workerQueue.getExecutor(),
177-
task -> {
178-
if (!task.isSuccessful()) {
179-
if (task.getException() instanceof FirebaseFirestoreException
180-
&& ((FirebaseFirestoreException) task.getException()).getCode()
181-
== FirebaseFirestoreException.Code.UNAUTHENTICATED) {
182-
channel.invalidateToken();
183-
}
184-
}
177+
List<BatchGetDocumentsResponse> responses = new ArrayList<>();
178+
TaskCompletionSource<List<MutableDocument>> completionSource = new TaskCompletionSource<>();
185179

180+
channel.runStreamingResponseRpc(
181+
FirestoreGrpc.getBatchGetDocumentsMethod(),
182+
builder.build(),
183+
new FirestoreChannel.StreamingListener<BatchGetDocumentsResponse>() {
184+
@Override
185+
public void onMessage(BatchGetDocumentsResponse message) {
186+
responses.add(message);
187+
if (responses.size() == keys.size()) {
186188
Map<DocumentKey, MutableDocument> resultMap = new HashMap<>();
187-
List<BatchGetDocumentsResponse> responses = task.getResult();
188189
for (BatchGetDocumentsResponse response : responses) {
189190
MutableDocument doc = serializer.decodeMaybeDocument(response);
190191
resultMap.put(doc.getKey(), doc);
@@ -193,8 +194,25 @@ public Task<List<MutableDocument>> lookup(List<DocumentKey> keys) {
193194
for (DocumentKey key : keys) {
194195
results.add(resultMap.get(key));
195196
}
196-
return results;
197-
});
197+
completionSource.trySetResult(results);
198+
}
199+
}
200+
201+
@Override
202+
public void onClose(Status status) {
203+
if (status.isOk()) {
204+
completionSource.trySetResult(Collections.emptyList());
205+
} else {
206+
FirebaseFirestoreException exception = exceptionFromStatus(status);
207+
if (exception.getCode() == FirebaseFirestoreException.Code.UNAUTHENTICATED) {
208+
channel.invalidateToken();
209+
}
210+
completionSource.trySetException(exception);
211+
}
212+
}
213+
});
214+
215+
return completionSource.getTask();
198216
}
199217

200218
/**

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import io.grpc.Metadata;
3434
import io.grpc.MethodDescriptor;
3535
import io.grpc.Status;
36-
import java.util.ArrayList;
37-
import java.util.List;
3836

3937
/**
4038
* Wrapper class around io.grpc.Channel that adds headers, exception handling and simplifies
@@ -44,6 +42,15 @@
4442
*/
4543
public class FirestoreChannel {
4644

45+
/** Listen to changes inside runStreamingResponseRpc */
46+
public abstract static class StreamingListener<T> {
47+
public StreamingListener() {}
48+
49+
public void onMessage(T message) {}
50+
51+
public void onClose(Status status) {}
52+
}
53+
4754
private static final Metadata.Key<String> X_GOOG_API_CLIENT_HEADER =
4855
Metadata.Key.of("x-goog-api-client", Metadata.ASCII_STRING_MARSHALLER);
4956

@@ -184,36 +191,28 @@ public void halfClose() {
184191
}
185192

186193
/** Creates and starts a streaming response RPC. */
187-
<ReqT, RespT> Task<List<RespT>> runStreamingResponseRpc(
188-
MethodDescriptor<ReqT, RespT> method, ReqT request) {
189-
TaskCompletionSource<List<RespT>> tcs = new TaskCompletionSource<>();
190-
194+
<ReqT, RespT> void runStreamingResponseRpc(
195+
MethodDescriptor<ReqT, RespT> method,
196+
ReqT request,
197+
FirestoreChannel.StreamingListener<RespT> callback) {
191198
callProvider
192199
.createClientCall(method)
193200
.addOnCompleteListener(
194201
asyncQueue.getExecutor(),
195202
result -> {
196203
ClientCall<ReqT, RespT> call = result.getResult();
197-
198-
List<RespT> results = new ArrayList<>();
199-
200204
call.start(
201205
new ClientCall.Listener<RespT>() {
202206
@Override
203207
public void onMessage(RespT message) {
204-
results.add(message);
205-
208+
callback.onMessage(message);
206209
// Make sure next message can be delivered
207210
call.request(1);
208211
}
209212

210213
@Override
211214
public void onClose(Status status, Metadata trailers) {
212-
if (status.isOk()) {
213-
tcs.setResult(results);
214-
} else {
215-
tcs.setException(exceptionFromStatus(status));
216-
}
215+
callback.onClose(status);
217216
}
218217
},
219218
requestHeaders());
@@ -224,8 +223,6 @@ public void onClose(Status status, Metadata trailers) {
224223
call.sendMessage(request);
225224
call.halfClose();
226225
});
227-
228-
return tcs.getTask();
229226
}
230227

231228
/** Creates and starts a single response RPC. */

0 commit comments

Comments
 (0)