Skip to content

Commit 8fe734d

Browse files
committed
DATAMONGO-2115 - Polishing.
Simplify asTimestampOfType(…) retrieval and move the cast to outer method. Simplify test. Original pull request: #624.
1 parent 9246d33 commit 8fe734d

File tree

2 files changed

+19
-29
lines changed

2 files changed

+19
-29
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

+9-13
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,15 @@ public Optional<Collation> getCollation() {
8686
* @return {@link Optional#empty()} if not set.
8787
*/
8888
public Optional<Instant> getResumeTimestamp() {
89-
return Optional.ofNullable(resumeTimestamp).map(this::asInstant);
89+
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, Instant.class));
9090
}
9191

9292
/**
9393
* @return {@link Optional#empty()} if not set.
9494
* @since 2.2
9595
*/
9696
public Optional<BsonTimestamp> getResumeBsonTimestamp() {
97-
return Optional.ofNullable(resumeTimestamp).map(this::asBsonTimestamp);
97+
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
9898
}
9999

100100
/**
@@ -114,30 +114,26 @@ public static ChangeStreamOptionsBuilder builder() {
114114
return new ChangeStreamOptionsBuilder();
115115
}
116116

117-
private Instant asInstant(Object timestamp) {
118-
return asTimestampOfType(timestamp, Instant.class);
117+
private static <T> T asTimestampOfType(Object timestamp, Class<T> targetType) {
118+
return targetType.cast(doGetTimestamp(timestamp, targetType));
119119
}
120120

121-
private BsonTimestamp asBsonTimestamp(Object timestamp) {
122-
return asTimestampOfType(timestamp, BsonTimestamp.class);
123-
}
124-
125-
private <T> T asTimestampOfType(Object timestamp, Class<T> targetType) {
121+
private static <T> Object doGetTimestamp(Object timestamp, Class<T> targetType) {
126122

127123
if (ClassUtils.isAssignableValue(targetType, timestamp)) {
128-
return (T) timestamp;
124+
return timestamp;
129125
}
130126

131127
if (timestamp instanceof Instant) {
132-
return (T) new BsonTimestamp((int) ((Instant) timestamp).getEpochSecond(), 0);
128+
return new BsonTimestamp((int) ((Instant) timestamp).getEpochSecond(), 0);
133129
}
134130

135131
if (timestamp instanceof BsonTimestamp) {
136-
return (T) Instant.ofEpochSecond(((BsonTimestamp) timestamp).getTime());
132+
return Instant.ofEpochSecond(((BsonTimestamp) timestamp).getTime());
137133
}
138134

139135
throw new IllegalArgumentException(
140-
"o_O that should actually not happen. The timestampt should be an Instant or a BsonTimestamp but was "
136+
"o_O that should actually not happen. The timestamp should be an Instant or a BsonTimestamp but was "
141137
+ ObjectUtils.nullSafeClassName(timestamp));
142138
}
143139

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

+10-16
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import lombok.Data;
2525
import lombok.NoArgsConstructor;
2626
import lombok.experimental.Wither;
27-
import org.bson.BsonTimestamp;
2827
import reactor.core.Disposable;
2928
import reactor.core.publisher.Flux;
3029
import reactor.core.publisher.Mono;
@@ -46,6 +45,7 @@
4645
import org.assertj.core.api.Assertions;
4746
import org.assertj.core.api.Assumptions;
4847
import org.bson.BsonDocument;
48+
import org.bson.BsonTimestamp;
4949
import org.bson.Document;
5050
import org.bson.types.ObjectId;
5151
import org.junit.After;
@@ -1420,27 +1420,21 @@ public void resumesAtBsonTimestampCorrectly() throws InterruptedException {
14201420
StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete();
14211421
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
14221422

1423-
Thread.sleep(500); // just give it some time to link receive all events
1424-
1425-
disposable.dispose();
1426-
14271423
documents.take(); // skip first
14281424
BsonTimestamp resumeAt = documents.take().getBsonTimestamp(); // take 2nd
14291425

1426+
disposable.dispose();
1427+
14301428
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
14311429

1432-
BlockingQueue<ChangeStreamEvent<Person>> resumeDocuments = new LinkedBlockingQueue<>(100);
14331430
template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class)
1434-
.doOnNext(resumeDocuments::add).subscribe();
1435-
1436-
Thread.sleep(500); // just give it some time to link receive all events
1437-
1438-
try {
1439-
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1440-
.containsExactly(person2, person3);
1441-
} finally {
1442-
disposable.dispose();
1443-
}
1431+
.map(ChangeStreamEvent::getBody) //
1432+
.buffer(2) //
1433+
.as(StepVerifier::create) //
1434+
.consumeNextWith(actual -> {
1435+
assertThat(actual).containsExactly(person2, person3);
1436+
}).thenCancel() //
1437+
.verify();
14441438
}
14451439

14461440
private PersonWithAList createPersonWithAList(String firstname, int age) {

0 commit comments

Comments
 (0)