Skip to content

Switch mongoItemReader to use a cursor instead of paging #3824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,29 @@
package org.springframework.batch.item.data;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.bson.Document;
import org.bson.codecs.DecoderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec;
import org.springframework.data.mongodb.util.json.ParameterBindingJsonReader;
import org.springframework.data.util.CloseableIterator;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import org.bson.Document;
import org.bson.codecs.DecoderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <p>
* Restartable {@link ItemReader} that reads documents from MongoDB
Expand All @@ -51,17 +50,11 @@
* If you set JSON String query {@link #setQuery(String)} then
* it executes the JSON to retrieve the requested documents.
* </p>
*
*
* <p>
* If you set Query object {@link #setQuery(Query)} then
* it executes the Query to retrieve the requested documents.
* </p>
*
* <p>
* The query is executed using paged requests specified in the
* {@link #setPageSize(int)}. Additional pages are requested as needed to
* provide data when the {@link #read()} method is called.
* </p>
*
* <p>
* The JSON String query provided supports parameter substitution via ?&lt;index&gt;
Expand All @@ -81,10 +74,10 @@
* @author Mahmoud Ben Hassine
* @author Parikshit Dutta
*/
public class MongoItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
public class MongoItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {

private static final Logger log = LoggerFactory.getLogger(MongoItemReader.class);

private MongoOperations template;
private Query query;
private String queryString;
Expand All @@ -94,12 +87,13 @@ public class MongoItemReader<T> extends AbstractPaginatedDataItemReader<T> imple
private String fields;
private String collection;
private List<Object> parameterValues = new ArrayList<>();
private CloseableIterator<? extends T> cursor = null;

public MongoItemReader() {
super();
setName(ClassUtils.getShortName(MongoItemReader.class));
}

/**
* A Mongo Query to be used.
*
Expand Down Expand Up @@ -187,47 +181,6 @@ public void setHint(String hint) {
this.hint = hint;
}

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {
if (queryString != null) {
Pageable pageRequest = PageRequest.of(page, pageSize, sort);

String populatedQuery = replacePlaceholders(queryString, parameterValues);

Query mongoQuery;

if(StringUtils.hasText(fields)) {
mongoQuery = new BasicQuery(populatedQuery, fields);
}
else {
mongoQuery = new BasicQuery(populatedQuery);
}

mongoQuery.with(pageRequest);

if(StringUtils.hasText(hint)) {
mongoQuery.withHint(hint);
}

if(StringUtils.hasText(collection)) {
return (Iterator<T>) template.find(mongoQuery, type, collection).iterator();
} else {
return (Iterator<T>) template.find(mongoQuery, type).iterator();
}

} else {
Pageable pageRequest = PageRequest.of(page, pageSize);
query.with(pageRequest);

if(StringUtils.hasText(collection)) {
return (Iterator<T>) template.find(query, type, collection).iterator();
} else {
return (Iterator<T>) template.find(query, type).iterator();
}
}
}

/**
* Checks mandatory properties
*
Expand All @@ -238,7 +191,7 @@ public void afterPropertiesSet() throws Exception {
Assert.state(template != null, "An implementation of MongoOperations is required.");
Assert.state(type != null, "A type to convert the input into is required.");
Assert.state(queryString != null || query != null, "A query is required.");

if (queryString != null) {
Assert.state(sort != null, "A sort is required.");
}
Expand All @@ -260,4 +213,50 @@ private Sort convertToSort(Map<String, Sort.Direction> sorts) {

return Sort.by(sortValues);
}

@Override
protected T doRead() throws Exception {
return cursor.hasNext() ? cursor.next() : null;
}

@Override
protected void doOpen() throws Exception {
if (queryString != null) {
String populatedQuery = replacePlaceholders(queryString, parameterValues);

Query mongoQuery;

if (StringUtils.hasText(fields)) {
mongoQuery = new BasicQuery(populatedQuery, fields);
}
else {
mongoQuery = new BasicQuery(populatedQuery);
}

if (StringUtils.hasText(hint)) {
mongoQuery.withHint(hint);
}
mongoQuery.with(sort);
if (StringUtils.hasText(collection)) {
cursor = template.stream(mongoQuery, type, collection);
}
else {
cursor = template.stream(mongoQuery, type);
}

}
else {
if (StringUtils.hasText(collection)) {
cursor = template.stream(query, type, collection);
}
else {
cursor = template.stream(query, type);
}
}
}

@Override
protected void doClose() throws Exception {
cursor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public class MongoItemReaderBuilder<T> {

private List<Object> parameterValues = new ArrayList<>();

protected int pageSize = 10;

private boolean saveState = true;

private String name;
Expand Down Expand Up @@ -245,20 +243,7 @@ public MongoItemReaderBuilder<T> hint(String hint) {
}

/**
* The number of items to be read with each page.
*
* @param pageSize the number of items
* @return this instance for method chaining
* @see MongoItemReader#setPageSize(int)
*/
public MongoItemReaderBuilder<T> pageSize(int pageSize) {
this.pageSize = pageSize;

return this;
}

/**
* Provide a Spring Data Mongo {@link Query}. This will take precedence over a JSON
* Provide a Spring Data Mongo {@link Query}. This will take precedence over a JSON
* configured query.
*
* @param query Query to execute
Expand Down Expand Up @@ -299,7 +284,6 @@ public MongoItemReader<T> build() {
reader.setParameterValues(this.parameterValues);
reader.setQuery(this.query);

reader.setPageSize(this.pageSize);
reader.setName(this.name);
reader.setSaveState(this.saveState);
reader.setCurrentItemCount(this.currentItemCount);
Expand Down
Loading