17
17
package org .springframework .batch .item .data ;
18
18
19
19
import java .util .ArrayList ;
20
- import java .util .Iterator ;
21
20
import java .util .List ;
22
21
import java .util .Map ;
23
22
24
- import org .bson .Document ;
25
- import org .bson .codecs .DecoderContext ;
26
- import org .slf4j .Logger ;
27
- import org .slf4j .LoggerFactory ;
28
-
29
23
import org .springframework .batch .item .ExecutionContext ;
30
24
import org .springframework .batch .item .ItemReader ;
25
+ import org .springframework .batch .item .support .AbstractItemCountingItemStreamItemReader ;
31
26
import org .springframework .beans .factory .InitializingBean ;
32
- import org .springframework .data .domain .PageRequest ;
33
- import org .springframework .data .domain .Pageable ;
34
27
import org .springframework .data .domain .Sort ;
35
28
import org .springframework .data .mongodb .core .MongoOperations ;
36
29
import org .springframework .data .mongodb .core .query .BasicQuery ;
37
30
import org .springframework .data .mongodb .core .query .Query ;
38
31
import org .springframework .data .mongodb .util .json .ParameterBindingDocumentCodec ;
39
32
import org .springframework .data .mongodb .util .json .ParameterBindingJsonReader ;
33
+ import org .springframework .data .util .CloseableIterator ;
40
34
import org .springframework .util .Assert ;
41
35
import org .springframework .util .ClassUtils ;
42
36
import org .springframework .util .StringUtils ;
43
37
38
+ import org .bson .Document ;
39
+ import org .bson .codecs .DecoderContext ;
40
+ import org .slf4j .Logger ;
41
+ import org .slf4j .LoggerFactory ;
42
+
44
43
/**
45
44
* <p>
46
45
* Restartable {@link ItemReader} that reads documents from MongoDB
51
50
* If you set JSON String query {@link #setQuery(String)} then
52
51
* it executes the JSON to retrieve the requested documents.
53
52
* </p>
54
- *
53
+ *
55
54
* <p>
56
55
* If you set Query object {@link #setQuery(Query)} then
57
56
* it executes the Query to retrieve the requested documents.
58
57
* </p>
59
- *
60
- * <p>
61
- * The query is executed using paged requests specified in the
62
- * {@link #setPageSize(int)}. Additional pages are requested as needed to
63
- * provide data when the {@link #read()} method is called.
64
- * </p>
65
58
*
66
59
* <p>
67
60
* The JSON String query provided supports parameter substitution via ?<index>
81
74
* @author Mahmoud Ben Hassine
82
75
* @author Parikshit Dutta
83
76
*/
84
- public class MongoItemReader <T > extends AbstractPaginatedDataItemReader <T > implements InitializingBean {
85
-
77
+ public class MongoItemReader <T > extends AbstractItemCountingItemStreamItemReader <T > implements InitializingBean {
78
+
86
79
private static final Logger log = LoggerFactory .getLogger (MongoItemReader .class );
87
-
80
+
88
81
private MongoOperations template ;
89
82
private Query query ;
90
83
private String queryString ;
@@ -94,12 +87,13 @@ public class MongoItemReader<T> extends AbstractPaginatedDataItemReader<T> imple
94
87
private String fields ;
95
88
private String collection ;
96
89
private List <Object > parameterValues = new ArrayList <>();
90
+ private CloseableIterator <? extends T > cursor = null ;
97
91
98
92
public MongoItemReader () {
99
93
super ();
100
94
setName (ClassUtils .getShortName (MongoItemReader .class ));
101
95
}
102
-
96
+
103
97
/**
104
98
* A Mongo Query to be used.
105
99
*
@@ -187,47 +181,6 @@ public void setHint(String hint) {
187
181
this .hint = hint ;
188
182
}
189
183
190
- @ Override
191
- @ SuppressWarnings ("unchecked" )
192
- protected Iterator <T > doPageRead () {
193
- if (queryString != null ) {
194
- Pageable pageRequest = PageRequest .of (page , pageSize , sort );
195
-
196
- String populatedQuery = replacePlaceholders (queryString , parameterValues );
197
-
198
- Query mongoQuery ;
199
-
200
- if (StringUtils .hasText (fields )) {
201
- mongoQuery = new BasicQuery (populatedQuery , fields );
202
- }
203
- else {
204
- mongoQuery = new BasicQuery (populatedQuery );
205
- }
206
-
207
- mongoQuery .with (pageRequest );
208
-
209
- if (StringUtils .hasText (hint )) {
210
- mongoQuery .withHint (hint );
211
- }
212
-
213
- if (StringUtils .hasText (collection )) {
214
- return (Iterator <T >) template .find (mongoQuery , type , collection ).iterator ();
215
- } else {
216
- return (Iterator <T >) template .find (mongoQuery , type ).iterator ();
217
- }
218
-
219
- } else {
220
- Pageable pageRequest = PageRequest .of (page , pageSize );
221
- query .with (pageRequest );
222
-
223
- if (StringUtils .hasText (collection )) {
224
- return (Iterator <T >) template .find (query , type , collection ).iterator ();
225
- } else {
226
- return (Iterator <T >) template .find (query , type ).iterator ();
227
- }
228
- }
229
- }
230
-
231
184
/**
232
185
* Checks mandatory properties
233
186
*
@@ -238,7 +191,7 @@ public void afterPropertiesSet() throws Exception {
238
191
Assert .state (template != null , "An implementation of MongoOperations is required." );
239
192
Assert .state (type != null , "A type to convert the input into is required." );
240
193
Assert .state (queryString != null || query != null , "A query is required." );
241
-
194
+
242
195
if (queryString != null ) {
243
196
Assert .state (sort != null , "A sort is required." );
244
197
}
@@ -260,4 +213,50 @@ private Sort convertToSort(Map<String, Sort.Direction> sorts) {
260
213
261
214
return Sort .by (sortValues );
262
215
}
216
+
217
+ @ Override
218
+ protected T doRead () throws Exception {
219
+ return cursor .hasNext () ? cursor .next () : null ;
220
+ }
221
+
222
+ @ Override
223
+ protected void doOpen () throws Exception {
224
+ if (queryString != null ) {
225
+ String populatedQuery = replacePlaceholders (queryString , parameterValues );
226
+
227
+ Query mongoQuery ;
228
+
229
+ if (StringUtils .hasText (fields )) {
230
+ mongoQuery = new BasicQuery (populatedQuery , fields );
231
+ }
232
+ else {
233
+ mongoQuery = new BasicQuery (populatedQuery );
234
+ }
235
+
236
+ if (StringUtils .hasText (hint )) {
237
+ mongoQuery .withHint (hint );
238
+ }
239
+ mongoQuery .with (sort );
240
+ if (StringUtils .hasText (collection )) {
241
+ cursor = template .stream (mongoQuery , type , collection );
242
+ }
243
+ else {
244
+ cursor = template .stream (mongoQuery , type );
245
+ }
246
+
247
+ }
248
+ else {
249
+ if (StringUtils .hasText (collection )) {
250
+ cursor = template .stream (query , type , collection );
251
+ }
252
+ else {
253
+ cursor = template .stream (query , type );
254
+ }
255
+ }
256
+ }
257
+
258
+ @ Override
259
+ protected void doClose () throws Exception {
260
+ cursor .close ();
261
+ }
263
262
}
0 commit comments