Skip to content

Commit 34e3e0e

Browse files
cgringsfmbenhassine
authored andcommitted
Add support for bulk inserts in MongoItemWriter
This commit adds support for bulk inserts in the MongoItemWriter. It introduces a new enumeration to allow the user to choose the operation to apply to items (INSERT, UPSERT, DELETE). Resolves #4149
1 parent d84057c commit 34e3e0e

File tree

3 files changed

+205
-10
lines changed

3 files changed

+205
-10
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@
5858
*/
5959
public class MongoItemWriter<T> implements ItemWriter<T>, InitializingBean {
6060

61+
public enum Mode {
62+
INSERT, UPSERT, REMOVE;
63+
}
64+
6165
private static final String ID_KEY = "_id";
6266

6367
private MongoOperations template;
@@ -66,7 +70,7 @@ public class MongoItemWriter<T> implements ItemWriter<T>, InitializingBean {
6670

6771
private String collection;
6872

69-
private boolean delete = false;
73+
private Mode mode = Mode.UPSERT;
7074

7175
public MongoItemWriter() {
7276
super();
@@ -78,9 +82,19 @@ public MongoItemWriter() {
7882
* the data store. If set to false (default), the items will be saved. If set to true,
7983
* the items will be removed.
8084
* @param delete removal indicator
85+
* @deprecated use {@link MongoItemWriter#setMode(Mode)}
8186
*/
87+
@Deprecated
8288
public void setDelete(boolean delete) {
83-
this.delete = delete;
89+
this.mode = (delete) ? Mode.REMOVE : Mode.UPSERT;
90+
}
91+
92+
/**
93+
* Set the operating {@link Mode} to be applied by this writer.
94+
* @param mode the mode to be used.
95+
*/
96+
public void setMode(final Mode mode) {
97+
this.mode = mode;
8498
}
8599

86100
/**
@@ -133,15 +147,31 @@ public void write(Chunk<? extends T> chunk) throws Exception {
133147
*/
134148
protected void doWrite(Chunk<? extends T> chunk) {
135149
if (!CollectionUtils.isEmpty(chunk.getItems())) {
136-
if (this.delete) {
137-
delete(chunk);
138-
}
139-
else {
140-
saveOrUpdate(chunk);
150+
switch (this.mode) {
151+
case INSERT:
152+
save(chunk);
153+
break;
154+
case REMOVE:
155+
delete(chunk);
156+
break;
157+
default:
158+
saveOrUpdate(chunk);
159+
break;
141160
}
142161
}
143162
}
144163

164+
private void save(final Chunk<? extends T> chunk) {
165+
final BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, chunk.getItems().get(0));
166+
final MongoConverter mongoConverter = this.template.getConverter();
167+
for (final Object item : chunk) {
168+
final Document document = new Document();
169+
mongoConverter.write(item, document);
170+
bulkOperations.insert(document);
171+
}
172+
bulkOperations.execute();
173+
}
174+
145175
private void delete(Chunk<? extends T> chunk) {
146176
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, chunk.getItems().get(0));
147177
MongoConverter mongoConverter = this.template.getConverter();

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.batch.item.data.builder;
1818

1919
import org.springframework.batch.item.data.MongoItemWriter;
20+
import org.springframework.batch.item.data.MongoItemWriter.Mode;
2021
import org.springframework.data.mongodb.core.MongoOperations;
2122
import org.springframework.util.Assert;
2223

@@ -33,7 +34,7 @@ public class MongoItemWriterBuilder<T> {
3334

3435
private String collection;
3536

36-
private boolean delete = false;
37+
private Mode mode = Mode.UPSERT;
3738

3839
/**
3940
* Indicates if the items being passed to the writer are to be saved or removed from
@@ -42,9 +43,23 @@ public class MongoItemWriterBuilder<T> {
4243
* @param delete removal indicator
4344
* @return The current instance of the builder
4445
* @see MongoItemWriter#setDelete(boolean)
46+
* @deprecated use {@link MongoItemWriterBuilder#mode(Mode)}
4547
*/
48+
@Deprecated
4649
public MongoItemWriterBuilder<T> delete(boolean delete) {
47-
this.delete = delete;
50+
this.mode = (delete) ? Mode.REMOVE : Mode.UPSERT;
51+
52+
return this;
53+
}
54+
55+
/**
56+
* Set the operating {@link Mode} to be applied by this writer.
57+
* @param mode the mode to be used.
58+
* @return The current instance of the builder
59+
* @see MongoItemWriter#setMode(Mode)
60+
*/
61+
public MongoItemWriterBuilder<T> mode(final Mode mode) {
62+
this.mode = mode;
4863

4964
return this;
5065
}
@@ -83,7 +98,7 @@ public MongoItemWriter<T> build() {
8398

8499
MongoItemWriter<T> writer = new MongoItemWriter<>();
85100
writer.setTemplate(this.template);
86-
writer.setDelete(this.delete);
101+
writer.setMode(this.mode);
87102
writer.setCollection(this.collection);
88103

89104
return writer;

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.mockito.quality.Strictness;
2727

2828
import org.springframework.batch.item.Chunk;
29+
import org.springframework.batch.item.data.MongoItemWriter.Mode;
2930
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
3031
import org.springframework.data.mapping.context.MappingContext;
3132
import org.springframework.data.mongodb.core.BulkOperations;
@@ -297,6 +298,155 @@ void testResourceKeyCollision() {
297298
}
298299
}
299300

301+
// BATCH-4149
302+
303+
@Test
304+
void testInsertModeNoTransactionNoCollection() throws Exception {
305+
Chunk<Item> items = Chunk.of(new Item("Foo"), new Item("Bar"));
306+
307+
writer.setMode(Mode.INSERT);
308+
writer.write(items);
309+
310+
verify(template).bulkOps(any(), any(Class.class));
311+
verify(bulkOperations, times(2)).insert(any(Object.class));
312+
}
313+
314+
@Test
315+
void testInsertModeNoTransactionWithCollection() throws Exception {
316+
Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
317+
318+
writer.setMode(Mode.INSERT);
319+
writer.setCollection("collection");
320+
321+
writer.write(items);
322+
323+
verify(template).bulkOps(any(), eq("collection"));
324+
verify(bulkOperations, times(2)).insert(any(Object.class));
325+
}
326+
327+
@Test
328+
void testInsertModeNoTransactionNoItems() throws Exception {
329+
writer.setMode(Mode.INSERT);
330+
writer.write(new Chunk<>());
331+
332+
verifyNoInteractions(template);
333+
verifyNoInteractions(bulkOperations);
334+
}
335+
336+
@Test
337+
void testInsertModeTransactionNoCollection() {
338+
final Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
339+
340+
writer.setMode(Mode.INSERT);
341+
342+
new TransactionTemplate(transactionManager).execute((TransactionCallback<Void>) status -> {
343+
assertDoesNotThrow(() -> writer.write(items));
344+
return null;
345+
});
346+
347+
verify(template).bulkOps(any(), any(Class.class));
348+
verify(bulkOperations, times(2)).insert(any(Object.class));
349+
}
350+
351+
@Test
352+
void testInsertModeTransactionWithCollection() {
353+
final Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
354+
355+
writer.setMode(Mode.INSERT);
356+
writer.setCollection("collection");
357+
358+
new TransactionTemplate(transactionManager).execute((TransactionCallback<Void>) status -> {
359+
assertDoesNotThrow(() -> writer.write(items));
360+
return null;
361+
});
362+
363+
verify(template).bulkOps(any(), eq("collection"));
364+
verify(bulkOperations, times(2)).insert(any(Object.class));
365+
}
366+
367+
@Test
368+
void testInsertModeTransactionFails() {
369+
final Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
370+
371+
writer.setMode(Mode.INSERT);
372+
writer.setCollection("collection");
373+
374+
Exception exception = assertThrows(RuntimeException.class,
375+
() -> new TransactionTemplate(transactionManager).execute((TransactionCallback<Void>) status -> {
376+
assertDoesNotThrow(() -> writer.write(items));
377+
throw new RuntimeException("force rollback");
378+
}));
379+
assertEquals(exception.getMessage(), "force rollback");
380+
381+
verifyNoInteractions(template);
382+
verifyNoInteractions(bulkOperations);
383+
}
384+
385+
@Test
386+
void testInsertModeTransactionReadOnly() {
387+
final Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
388+
389+
writer.setMode(Mode.INSERT);
390+
writer.setCollection("collection");
391+
392+
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
393+
transactionTemplate.setReadOnly(true);
394+
transactionTemplate.execute((TransactionCallback<Void>) status -> {
395+
assertDoesNotThrow(() -> writer.write(items));
396+
return null;
397+
});
398+
399+
verifyNoInteractions(template);
400+
verifyNoInteractions(bulkOperations);
401+
}
402+
403+
@Test
404+
void testRemoveModeNoObjectIdNoCollection() throws Exception {
405+
writer.setMode(Mode.REMOVE);
406+
Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
407+
408+
writer.write(items);
409+
410+
verify(template).bulkOps(any(), any(Class.class));
411+
verify(bulkOperations, never()).remove(any(Query.class));
412+
}
413+
414+
@Test
415+
void testRemoveModeNoObjectIdWithCollection() throws Exception {
416+
writer.setMode(Mode.REMOVE);
417+
Chunk<Object> items = Chunk.of(new Item("Foo"), new Item("Bar"));
418+
419+
writer.setCollection("collection");
420+
writer.write(items);
421+
422+
verify(template).bulkOps(any(), eq("collection"));
423+
verify(bulkOperations, never()).remove(any(Query.class));
424+
}
425+
426+
@Test
427+
void testRemoveModeNoTransactionNoCollection() throws Exception {
428+
writer.setMode(Mode.REMOVE);
429+
Chunk<Object> items = Chunk.of(new Item(1), new Item(2));
430+
431+
writer.write(items);
432+
433+
verify(template).bulkOps(any(), any(Class.class));
434+
verify(bulkOperations, times(2)).remove(any(Query.class));
435+
}
436+
437+
@Test
438+
void testRemoveModeNoTransactionWithCollection() throws Exception {
439+
writer.setMode(Mode.REMOVE);
440+
Chunk<Object> items = Chunk.of(new Item(1), new Item(2));
441+
442+
writer.setCollection("collection");
443+
444+
writer.write(items);
445+
446+
verify(template).bulkOps(any(), eq("collection"));
447+
verify(bulkOperations, times(2)).remove(any(Query.class));
448+
}
449+
300450
static class Item {
301451

302452
Integer id;

0 commit comments

Comments
 (0)