25
25
import com .google .api .gax .rpc .StreamController ;
26
26
import com .google .cloud .Timestamp ;
27
27
import com .google .cloud .firestore .v1 .FirestoreSettings ;
28
+ import com .google .common .collect .ImmutableMap ;
28
29
import com .google .firestore .v1 .RunAggregationQueryRequest ;
29
30
import com .google .firestore .v1 .RunAggregationQueryResponse ;
30
31
import com .google .firestore .v1 .RunQueryRequest ;
40
41
import java .util .Map ;
41
42
import java .util .Objects ;
42
43
import java .util .Set ;
43
- import java .util .concurrent .atomic .AtomicBoolean ;
44
44
import javax .annotation .Nonnull ;
45
45
import javax .annotation .Nullable ;
46
46
49
49
public class AggregateQuery {
50
50
@ Nonnull private final Query query ;
51
51
52
- @ Nonnull private List <AggregateField > aggregateFieldList ;
52
+ @ Nonnull private final List <AggregateField > aggregateFieldList ;
53
53
54
- @ Nonnull private Map <String , String > aliasMap ;
54
+ @ Nonnull private final Map <String , String > aliasMap ;
55
55
56
56
AggregateQuery (@ Nonnull Query query , @ Nonnull List <AggregateField > aggregateFields ) {
57
57
this .query = query ;
@@ -75,6 +75,26 @@ public ApiFuture<AggregateQuerySnapshot> get() {
75
75
return get (null , null );
76
76
}
77
77
78
+ /**
79
+ * Plans and optionally executes this query. Returns an ApiFuture that will be resolved with the
80
+ * planner information, statistics from the query execution (if any), and the query results (if
81
+ * any).
82
+ *
83
+ * @return An ApiFuture that will be resolved with the planner information, statistics from the
84
+ * query execution (if any), and the query results (if any).
85
+ */
86
+ @ Nonnull
87
+ public ApiFuture <ExplainResults <AggregateQuerySnapshot >> explain (ExplainOptions options ) {
88
+ AggregateQueryExplainResponseDeliverer responseDeliverer =
89
+ new AggregateQueryExplainResponseDeliverer (
90
+ /* transactionId= */ null ,
91
+ /* readTime= */ null ,
92
+ /* startTimeNanos= */ query .rpcContext .getClock ().nanoTime (),
93
+ /* explainOptions= */ options );
94
+ runQuery (responseDeliverer );
95
+ return responseDeliverer .getFuture ();
96
+ }
97
+
78
98
@ Nonnull
79
99
ApiFuture <AggregateQuerySnapshot > get (
80
100
@ Nullable final ByteString transactionId , @ Nullable com .google .protobuf .Timestamp readTime ) {
@@ -85,25 +105,34 @@ ApiFuture<AggregateQuerySnapshot> get(
85
105
return responseDeliverer .getFuture ();
86
106
}
87
107
88
- private void runQuery (AggregateQueryResponseDeliverer responseDeliverer ) {
108
+ private < T > void runQuery (ResponseDeliverer < T > responseDeliverer ) {
89
109
RunAggregationQueryRequest request =
90
- toProto (responseDeliverer .transactionId , responseDeliverer .readTime );
91
- AggregateQueryResponseObserver responseObserver =
92
- new AggregateQueryResponseObserver (responseDeliverer );
110
+ toProto (
111
+ responseDeliverer .getTransactionId (),
112
+ responseDeliverer .getReadTime (),
113
+ responseDeliverer .getExplainOptions ());
114
+ AggregateQueryResponseObserver <T > responseObserver =
115
+ new AggregateQueryResponseObserver <T >(responseDeliverer );
93
116
ServerStreamingCallable <RunAggregationQueryRequest , RunAggregationQueryResponse > callable =
94
117
query .rpcContext .getClient ().runAggregationQueryCallable ();
95
118
query .rpcContext .streamRequest (request , responseObserver , callable );
96
119
}
97
120
98
- private final class AggregateQueryResponseDeliverer {
121
+ @ Nonnull
122
+ private Map <String , Value > convertServerAggregateFieldsMapToClientAggregateFieldsMap (
123
+ @ Nonnull Map <String , Value > data ) {
124
+ ImmutableMap .Builder <String , Value > builder = ImmutableMap .builder ();
125
+ data .forEach ((serverAlias , value ) -> builder .put (aliasMap .get (serverAlias ), value ));
126
+ return builder .build ();
127
+ }
99
128
100
- @ Nullable private final ByteString transactionId ;
101
- @ Nullable private final com .google .protobuf .Timestamp readTime ;
129
+ private abstract static class ResponseDeliverer <T > {
130
+ private final @ Nullable ByteString transactionId ;
131
+ private final @ Nullable com .google .protobuf .Timestamp readTime ;
102
132
private final long startTimeNanos ;
103
- private final SettableApiFuture <AggregateQuerySnapshot > future = SettableApiFuture .create ();
104
- private final AtomicBoolean isFutureCompleted = new AtomicBoolean (false );
133
+ private final SettableApiFuture <T > future = SettableApiFuture .create ();
105
134
106
- AggregateQueryResponseDeliverer (
135
+ ResponseDeliverer (
107
136
@ Nullable ByteString transactionId ,
108
137
@ Nullable com .google .protobuf .Timestamp readTime ,
109
138
long startTimeNanos ) {
@@ -112,52 +141,148 @@ private final class AggregateQueryResponseDeliverer {
112
141
this .startTimeNanos = startTimeNanos ;
113
142
}
114
143
115
- ApiFuture <AggregateQuerySnapshot > getFuture () {
144
+ @ Nullable
145
+ ByteString getTransactionId () {
146
+ return transactionId ;
147
+ }
148
+
149
+ @ Nullable
150
+ com .google .protobuf .Timestamp getReadTime () {
151
+ return readTime ;
152
+ }
153
+
154
+ long getStartTimeNanos () {
155
+ return startTimeNanos ;
156
+ }
157
+
158
+ @ Nullable
159
+ ExplainOptions getExplainOptions () {
160
+ return null ;
161
+ }
162
+
163
+ ApiFuture <T > getFuture () {
116
164
return future ;
117
165
}
118
166
119
- void deliverResult (@ Nonnull Map <String , Value > data , Timestamp readTime ) {
120
- if (isFutureCompleted .compareAndSet (false , true )) {
121
- Map <String , Value > mappedData = new HashMap <>();
122
- data .forEach ((serverAlias , value ) -> mappedData .put (aliasMap .get (serverAlias ), value ));
123
- future .set (new AggregateQuerySnapshot (AggregateQuery .this , readTime , mappedData ));
124
- }
167
+ protected void setFuture (T value ) {
168
+ future .set (value );
125
169
}
126
170
127
171
void deliverError (Throwable throwable ) {
128
- if (isFutureCompleted .compareAndSet (false , true )) {
129
- future .setException (throwable );
172
+ future .setException (throwable );
173
+ }
174
+
175
+ abstract void deliverResult (
176
+ @ Nullable Map <String , Value > serverData ,
177
+ Timestamp readTime ,
178
+ @ Nullable ExplainMetrics metrics );
179
+ }
180
+
181
+ private class AggregateQueryResponseDeliverer extends ResponseDeliverer <AggregateQuerySnapshot > {
182
+ AggregateQueryResponseDeliverer (
183
+ @ Nullable ByteString transactionId ,
184
+ @ Nullable com .google .protobuf .Timestamp readTime ,
185
+ long startTimeNanos ) {
186
+ super (transactionId , readTime , startTimeNanos );
187
+ }
188
+
189
+ @ Override
190
+ void deliverResult (
191
+ @ Nullable Map <String , Value > serverData ,
192
+ Timestamp readTime ,
193
+ @ Nullable ExplainMetrics metrics ) {
194
+ if (serverData == null ) {
195
+ deliverError (new RuntimeException ("Did not receive any aggregate query results." ));
196
+ return ;
130
197
}
198
+ setFuture (
199
+ new AggregateQuerySnapshot (
200
+ AggregateQuery .this ,
201
+ readTime ,
202
+ convertServerAggregateFieldsMapToClientAggregateFieldsMap (serverData )));
131
203
}
132
204
}
133
205
134
- private final class AggregateQueryResponseObserver
135
- implements ResponseObserver <RunAggregationQueryResponse > {
206
+ private final class AggregateQueryExplainResponseDeliverer
207
+ extends ResponseDeliverer <ExplainResults <AggregateQuerySnapshot >> {
208
+ private final @ Nullable ExplainOptions explainOptions ;
136
209
137
- private final AggregateQueryResponseDeliverer responseDeliverer ;
138
- private StreamController streamController ;
210
+ AggregateQueryExplainResponseDeliverer (
211
+ @ Nullable ByteString transactionId ,
212
+ @ Nullable com .google .protobuf .Timestamp readTime ,
213
+ long startTimeNanos ,
214
+ @ Nullable ExplainOptions explainOptions ) {
215
+ super (transactionId , readTime , startTimeNanos );
216
+ this .explainOptions = explainOptions ;
217
+ }
139
218
140
- AggregateQueryResponseObserver (AggregateQueryResponseDeliverer responseDeliverer ) {
141
- this .responseDeliverer = responseDeliverer ;
219
+ @ Override
220
+ @ Nullable
221
+ ExplainOptions getExplainOptions () {
222
+ return explainOptions ;
142
223
}
143
224
144
225
@ Override
145
- public void onStart (StreamController streamController ) {
146
- this .streamController = streamController ;
226
+ void deliverResult (
227
+ @ Nullable Map <String , Value > serverData ,
228
+ Timestamp readTime ,
229
+ @ Nullable ExplainMetrics metrics ) {
230
+ // The server is required to provide ExplainMetrics for explain queries.
231
+ if (metrics == null ) {
232
+ deliverError (new RuntimeException ("Did not receive any metrics for explain query." ));
233
+ return ;
234
+ }
235
+ AggregateQuerySnapshot snapshot =
236
+ serverData == null
237
+ ? null
238
+ : new AggregateQuerySnapshot (
239
+ AggregateQuery .this ,
240
+ readTime ,
241
+ convertServerAggregateFieldsMapToClientAggregateFieldsMap (serverData ));
242
+ setFuture (new ExplainResults <>(metrics , snapshot ));
243
+ }
244
+ }
245
+
246
+ private final class AggregateQueryResponseObserver <T >
247
+ implements ResponseObserver <RunAggregationQueryResponse > {
248
+ private final ResponseDeliverer <T > responseDeliverer ;
249
+ private Timestamp readTime = Timestamp .MAX_VALUE ;
250
+ @ Nullable private Map <String , Value > aggregateFieldsMap = null ;
251
+ @ Nullable private ExplainMetrics metrics = null ;
252
+
253
+ AggregateQueryResponseObserver (ResponseDeliverer <T > responseDeliverer ) {
254
+ this .responseDeliverer = responseDeliverer ;
147
255
}
148
256
257
+ private boolean isExplainQuery () {
258
+ return this .responseDeliverer .getExplainOptions () != null ;
259
+ }
260
+
261
+ @ Override
262
+ public void onStart (StreamController streamController ) {}
263
+
149
264
@ Override
150
265
public void onResponse (RunAggregationQueryResponse response ) {
151
- // Close the stream to avoid it dangling, since we're not expecting any more responses.
152
- streamController .cancel ();
266
+ if (response .hasReadTime ()) {
267
+ readTime = Timestamp .fromProto (response .getReadTime ());
268
+ }
269
+
270
+ if (response .hasResult ()) {
271
+ aggregateFieldsMap = response .getResult ().getAggregateFieldsMap ();
272
+ }
153
273
154
- // Extract the aggregations and read time from the RunAggregationQueryResponse.
155
- Timestamp readTime = Timestamp .fromProto (response .getReadTime ());
274
+ if (response .hasExplainMetrics ()) {
275
+ metrics = new ExplainMetrics (response .getExplainMetrics ());
276
+ }
156
277
157
- // Deliver the result; even though the `RunAggregationQuery` RPC is a "streaming" RPC, meaning
158
- // that `onResponse()` can be called multiple times, it _should_ only be called once. But even
159
- // if it is called more than once, `responseDeliverer` will drop superfluous results.
160
- responseDeliverer .deliverResult (response .getResult ().getAggregateFieldsMap (), readTime );
278
+ if (!isExplainQuery ()) {
279
+ // Deliver the result; even though the `RunAggregationQuery` RPC is a "streaming" RPC,
280
+ // meaning that `onResponse()` can be called multiple times, it _should_ only be called
281
+ // once for non-explain queries. But even if it is called more than once,
282
+ // `responseDeliverer` will drop superfluous results. For explain queries, there will
283
+ // be more than one response, and the last response will contain the metrics.
284
+ onComplete ();
285
+ }
161
286
}
162
287
163
288
@ Override
@@ -170,17 +295,26 @@ public void onError(Throwable throwable) {
170
295
}
171
296
172
297
private boolean shouldRetry (Throwable throwable ) {
298
+ // Do not retry EXPLAIN requests because it'd be executing
299
+ // multiple queries. This means stats would have to be aggregated,
300
+ // and that may not even make sense for many statistics.
301
+ if (isExplainQuery ()) {
302
+ return false ;
303
+ }
304
+
173
305
Set <StatusCode .Code > retryableCodes =
174
306
FirestoreSettings .newBuilder ().runAggregationQuerySettings ().getRetryableCodes ();
175
307
return query .shouldRetryQuery (
176
308
throwable ,
177
- responseDeliverer .transactionId ,
178
- responseDeliverer .startTimeNanos ,
309
+ responseDeliverer .getTransactionId () ,
310
+ responseDeliverer .getStartTimeNanos () ,
179
311
retryableCodes );
180
312
}
181
313
182
314
@ Override
183
- public void onComplete () {}
315
+ public void onComplete () {
316
+ responseDeliverer .deliverResult (aggregateFieldsMap , readTime , metrics );
317
+ }
184
318
}
185
319
186
320
/**
@@ -191,13 +325,14 @@ public void onComplete() {}
191
325
*/
192
326
@ Nonnull
193
327
public RunAggregationQueryRequest toProto () {
194
- return toProto (null , null );
328
+ return toProto (/* transactionId= */ null , /* readTime= */ null , /* explainOptions= */ null );
195
329
}
196
330
197
331
@ Nonnull
198
332
RunAggregationQueryRequest toProto (
199
333
@ Nullable final ByteString transactionId ,
200
- @ Nullable final com .google .protobuf .Timestamp readTime ) {
334
+ @ Nullable final com .google .protobuf .Timestamp readTime ,
335
+ @ Nullable ExplainOptions explainOptions ) {
201
336
RunQueryRequest runQueryRequest = query .toProto ();
202
337
203
338
RunAggregationQueryRequest .Builder request = RunAggregationQueryRequest .newBuilder ();
@@ -209,6 +344,10 @@ RunAggregationQueryRequest toProto(
209
344
request .setReadTime (readTime );
210
345
}
211
346
347
+ if (explainOptions != null ) {
348
+ request .setExplainOptions (explainOptions .toProto ());
349
+ }
350
+
212
351
StructuredAggregationQuery .Builder structuredAggregationQuery =
213
352
request .getStructuredAggregationQueryBuilder ();
214
353
structuredAggregationQuery .setStructuredQuery (runQueryRequest .getStructuredQuery ());
0 commit comments