27
27
import java .util .*;
28
28
import java .util .Map .Entry ;
29
29
import java .util .concurrent .TimeUnit ;
30
- import java .util .stream .Collectors ;
31
30
32
31
import org .bson .Document ;
33
32
import org .bson .conversions .Bson ;
@@ -1934,7 +1933,8 @@ protected <O> AggregationResults<O> aggregate(Aggregation aggregation, String co
1934
1933
Assert .notNull (aggregation , "Aggregation pipeline must not be null!" );
1935
1934
Assert .notNull (outputType , "Output type must not be null!" );
1936
1935
1937
- Document commandResult = new BatchAggregationLoader (this , readPreference , Integer .MAX_VALUE )
1936
+ Document commandResult = new BatchAggregationLoader (this , queryMapper , mappingContext , readPreference ,
1937
+ Integer .MAX_VALUE )
1938
1938
.aggregate (collectionName , aggregation , context );
1939
1939
1940
1940
return new AggregationResults <>(returnPotentiallyMappedResults (outputType , commandResult , collectionName ),
@@ -1957,9 +1957,9 @@ private <O> List<O> returnPotentiallyMappedResults(Class<O> outputType, Document
1957
1957
return Collections .emptyList ();
1958
1958
}
1959
1959
1960
- DocumentCallback <O > callback = new UnwrapAndReadDocumentCallback <O >(mongoConverter , outputType , collectionName );
1960
+ DocumentCallback <O > callback = new UnwrapAndReadDocumentCallback <>(mongoConverter , outputType , collectionName );
1961
1961
1962
- List <O > mappedResults = new ArrayList <O >();
1962
+ List <O > mappedResults = new ArrayList <>();
1963
1963
for (Document document : resultSet ) {
1964
1964
mappedResults .add (callback .doWith (document ));
1965
1965
}
@@ -1974,17 +1974,18 @@ protected <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, Stri
1974
1974
Assert .notNull (aggregation , "Aggregation pipeline must not be null!" );
1975
1975
Assert .notNull (outputType , "Output type must not be null!" );
1976
1976
1977
- AggregationOperationContext rootContext = context == null ? Aggregation .DEFAULT_CONTEXT : context ;
1977
+ AggregationUtil aggregationUtil = new AggregationUtil (queryMapper , mappingContext );
1978
+ AggregationOperationContext rootContext = aggregationUtil .prepareAggregationContext (aggregation , context );
1978
1979
1979
- Document command = aggregation . toDocument (collectionName , rootContext );
1980
+ Document command = aggregationUtil . createCommand (collectionName , aggregation , rootContext );
1980
1981
1981
1982
assertNotExplain (command );
1982
1983
1983
1984
if (LOGGER .isDebugEnabled ()) {
1984
1985
LOGGER .debug ("Streaming aggregation: {}" , serializeToJsonSafely (command ));
1985
1986
}
1986
1987
1987
- ReadDocumentCallback <O > readCallback = new ReadDocumentCallback <O >(mongoConverter , outputType , collectionName );
1988
+ ReadDocumentCallback <O > readCallback = new ReadDocumentCallback <>(mongoConverter , outputType , collectionName );
1988
1989
1989
1990
return execute (collectionName , new CollectionCallback <CloseableIterator <O >>() {
1990
1991
@@ -2008,7 +2009,7 @@ public CloseableIterator<O> doInCollection(MongoCollection<Document> collection)
2008
2009
cursor = cursor .collation (options .getCollation ().map (Collation ::toMongoCollation ).get ());
2009
2010
}
2010
2011
2011
- return new CloseableIterableCursorAdapter <O >(cursor .iterator (), exceptionTranslator , readCallback );
2012
+ return new CloseableIterableCursorAdapter <>(cursor .iterator (), exceptionTranslator , readCallback );
2012
2013
}
2013
2014
});
2014
2015
}
@@ -2577,72 +2578,6 @@ private Document addFieldsForProjection(Document fields, Class<?> domainType, Cl
2577
2578
return fields ;
2578
2579
}
2579
2580
2580
- /**
2581
- * Prepare the {@link AggregationOperationContext} for a given aggregation by either returning the context itself it
2582
- * is not {@literal null}, create a {@link TypeBasedAggregationOperationContext} if the aggregation contains type
2583
- * information (is a {@link TypedAggregation}) or use the {@link Aggregation#DEFAULT_CONTEXT}.
2584
- *
2585
- * @param aggregation must not be {@literal null}.
2586
- * @param context can be {@literal null}.
2587
- * @return the root {@link AggregationOperationContext} to use.
2588
- */
2589
- private AggregationOperationContext prepareAggregationContext (Aggregation aggregation ,
2590
- @ Nullable AggregationOperationContext context ) {
2591
-
2592
- if (context != null ) {
2593
- return context ;
2594
- }
2595
-
2596
- if (aggregation instanceof TypedAggregation ) {
2597
- return new TypeBasedAggregationOperationContext (((TypedAggregation ) aggregation ).getInputType (), mappingContext ,
2598
- queryMapper );
2599
- }
2600
-
2601
- return Aggregation .DEFAULT_CONTEXT ;
2602
- }
2603
-
2604
- /**
2605
- * Extract and map the aggregation pipeline.
2606
- *
2607
- * @param aggregation
2608
- * @param context
2609
- * @return
2610
- */
2611
- private Document aggregationToPipeline (String inputCollectionName , Aggregation aggregation , AggregationOperationContext context ) {
2612
-
2613
- if (!ObjectUtils .nullSafeEquals (context , Aggregation .DEFAULT_CONTEXT )) {
2614
- return aggregation .toDocument (inputCollectionName , context );
2615
- }
2616
-
2617
- return queryMapper .getMappedObject (aggregation .toDocument (inputCollectionName , context ), Optional .empty ());
2618
- }
2619
-
2620
- /**
2621
- * Extract the command and map the aggregation pipeline.
2622
- *
2623
- * @param aggregation
2624
- * @param context
2625
- * @return
2626
- */
2627
- private Document aggregationToCommand (String collection , Aggregation aggregation ,
2628
- AggregationOperationContext context ) {
2629
-
2630
- Document command = aggregation .toDocument (collection , context );
2631
-
2632
- if (!ObjectUtils .nullSafeEquals (context , Aggregation .DEFAULT_CONTEXT )) {
2633
- return command ;
2634
- }
2635
-
2636
- command .put ("pipeline" , mapAggregationPipeline (command .get ("pipeline" , List .class )));
2637
-
2638
- return command ;
2639
- }
2640
-
2641
- private List <Document > mapAggregationPipeline (List <Document > pipeline ) {
2642
-
2643
- return pipeline .stream ().map (val -> queryMapper .getMappedObject (val , Optional .empty ()))
2644
- .collect (Collectors .toList ());
2645
- }
2646
2581
2647
2582
/**
2648
2583
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
@@ -3157,12 +3092,18 @@ static class BatchAggregationLoader {
3157
3092
private static final String OK = "ok" ;
3158
3093
3159
3094
private final MongoTemplate template ;
3095
+ private final QueryMapper queryMapper ;
3096
+ private final MappingContext <? extends MongoPersistentEntity <?>, MongoPersistentProperty > mappingContext ;
3160
3097
private final ReadPreference readPreference ;
3161
3098
private final int batchSize ;
3162
3099
3163
- BatchAggregationLoader (MongoTemplate template , ReadPreference readPreference , int batchSize ) {
3100
+ BatchAggregationLoader (MongoTemplate template , QueryMapper queryMapper ,
3101
+ MappingContext <? extends MongoPersistentEntity <?>, MongoPersistentProperty > mappingContext ,
3102
+ ReadPreference readPreference , int batchSize ) {
3164
3103
3165
3104
this .template = template ;
3105
+ this .queryMapper = queryMapper ;
3106
+ this .mappingContext = mappingContext ;
3166
3107
this .readPreference = readPreference ;
3167
3108
this .batchSize = batchSize ;
3168
3109
}
@@ -3185,11 +3126,13 @@ Document aggregate(String collectionName, Aggregation aggregation, AggregationOp
3185
3126
* Pre process the aggregation command sent to the server by adding {@code cursor} options to match execution on
3186
3127
* different server versions.
3187
3128
*/
3188
- private static Document prepareAggregationCommand (String collectionName , Aggregation aggregation ,
3129
+ private Document prepareAggregationCommand (String collectionName , Aggregation aggregation ,
3189
3130
@ Nullable AggregationOperationContext context , int batchSize ) {
3190
3131
3191
- AggregationOperationContext rootContext = context == null ? Aggregation .DEFAULT_CONTEXT : context ;
3192
- Document command = aggregation .toDocument (collectionName , rootContext );
3132
+ AggregationUtil aggregationUtil = new AggregationUtil (queryMapper , mappingContext );
3133
+
3134
+ AggregationOperationContext rootContext = aggregationUtil .prepareAggregationContext (aggregation , context );
3135
+ Document command = aggregationUtil .createCommand (collectionName , aggregation , rootContext );
3193
3136
3194
3137
if (!aggregation .getOptions ().isExplain ()) {
3195
3138
command .put (CURSOR_FIELD , new Document (BATCH_SIZE_FIELD , batchSize ));
0 commit comments