Skip to content

Commit eeec44f

Browse files
DATAMONGO-2089 - Add fluent change stream API to ReactiveMongoTemplate.
We now offer a fluent API for more intuitive change stream interaction. Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream() .watchCollection("persons") .filter(where("age").gte(38)) .as(User.class) .listen();
1 parent ac0f106 commit eeec44f

File tree

7 files changed

+791
-14
lines changed

7 files changed

+791
-14
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import reactor.core.publisher.Flux;
19+
20+
import java.time.Instant;
21+
import java.util.function.Consumer;
22+
23+
import org.bson.BsonTimestamp;
24+
import org.bson.BsonValue;
25+
import org.bson.Document;
26+
import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
27+
import org.springframework.data.mongodb.core.ReactiveFindOperation.ReactiveFind;
28+
import org.springframework.data.mongodb.core.aggregation.Aggregation;
29+
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
30+
31+
/**
32+
* {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB
33+
* <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> operations in a fluent API * style. <br />
34+
* The starting {@literal domainType} is used for mapping a potentially given
35+
* {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the
36+
* originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}.
37+
* However, it is possible to define an different {@literal returnType} via {@code as}.<br />
38+
* The collection to operate on is optional in which case call collection with the actual database are watched, use
39+
* {@literal watchCollection} to define a fixed collection.
40+
*
41+
* <pre>
42+
* <code>
43+
* changeStream(Human.class)
44+
* .watchCollection("star-wars")
45+
* .filter(where("operationType").is("insert"))
46+
* .as(Jedi.class)
47+
* .resumeAt(Instant.now())
48+
* .listen();
49+
* </code>
50+
* </pre>
51+
*
52+
* @author Christoph Strobl
53+
* @since 2.2
54+
*/
55+
public interface ReactiveChangeStreamOperation {
56+
57+
/**
58+
* Start creating a change stream operation.watching all collections within the database. <br />
59+
* Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or
60+
* {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}.
61+
*
62+
* @return new instance of {@link ReactiveFind}. Never {@literal null}.
63+
*/
64+
ReactiveChangeStream<Document> changeStream();
65+
66+
/**
67+
* Start creating a change stream operation for the given {@literal domainType} watching all collections within the
68+
* database. <br />
69+
* Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or
70+
* {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}.
71+
*
72+
* @param domainType must not be {@literal null}.
73+
* @return new instance of {@link ReactiveChangeStream}. Never {@literal null}.
74+
* @throws IllegalArgumentException if domainType is {@literal null}.
75+
*/
76+
<T> ReactiveChangeStream<T> changeStream(Class<T> domainType);
77+
78+
/**
79+
* Compose change stream execution by calling one of the terminating methods.
80+
*/
81+
interface TerminatingChangeStream<T> {
82+
83+
/**
84+
* Start listening to changes. The stream will not be completed unless the {@link org.reactivestreams.Subscription}
85+
* is {@link org.reactivestreams.Subscription#cancel() canceled}.
86+
* <p />
87+
* However, the stream may become dead, or invalid, if all watched collections, databases are dropped.
88+
*/
89+
Flux<ChangeStreamEvent<T>> listen();
90+
}
91+
92+
/**
93+
* Collection override (optional).
94+
*/
95+
interface ChangeStreamWithCollection<T> {
96+
97+
/**
98+
* Explicitly set the name of the collection to watch.<br />
99+
* Skip this step to watch all collections within the database.
100+
*
101+
* @param collection must not be {@literal null} nor {@literal empty}.
102+
* @return new instance of {@link ChangeStreamWithCollection}.
103+
* @throws IllegalArgumentException if collection is {@literal null}.
104+
*/
105+
ChangeStreamWithProjection<T> watchCollection(String collection);
106+
}
107+
108+
/**
109+
* Result type override (optional).
110+
*/
111+
interface ChangeStreamWithProjection<T> extends ChangeStreamWithFilter<T> {
112+
113+
/**
114+
* Define the target type fields should be mapped to.
115+
*
116+
* @param resultType must not be {@literal null}.
117+
* @param <R> result type.
118+
* @return new instance of {@link ChangeStreamWithProjection}.
119+
* @throws IllegalArgumentException if resultType is {@literal null}.
120+
*/
121+
<R> ChangeStreamWithFilter<R> as(Class<R> resultType);
122+
}
123+
124+
/**
125+
* Provide a filter for limiting results (optional).
126+
*/
127+
interface ChangeStreamWithFilter<T> extends ResumingChangeStream<T>, TerminatingChangeStream<T> {
128+
129+
/**
130+
* Use an {@link Aggregation} to filter matching events.
131+
*
132+
* @param by must not be {@literal null}.
133+
* @return new instance of {@link ChangeStreamWithProjection}.
134+
* @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}.
135+
*/
136+
ChangeStreamWithProjection<T> filter(Aggregation by);
137+
138+
/**
139+
* Use a {@link CriteriaDefinition critera} to filter matching events via an
140+
* {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}.
141+
*
142+
* @param by must not be {@literal null}.
143+
* @return new instance of {@link ChangeStreamWithProjection}.
144+
* @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}.
145+
*/
146+
ChangeStreamWithProjection<T> filter(CriteriaDefinition by);
147+
}
148+
149+
/**
150+
* Resume a change stream. (optional).
151+
*/
152+
interface ResumingChangeStream<T> extends TerminatingChangeStream<T> {
153+
154+
/**
155+
* Resume the change stream at a given point.
156+
*
157+
* @param beacon an {@link Instant} or {@link BsonTimestamp}
158+
* @return new instance of {@link TerminatingChangeStream}.
159+
* @see ChangeStreamOptionsBuilder#resumeAt(Instant)
160+
* @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp)
161+
* @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}.
162+
*/
163+
TerminatingChangeStream<T> resumeAt(Object beacon);
164+
165+
/**
166+
* Resume the change stream after a given point.
167+
*
168+
* @param beacon an {@link Instant} or {@link BsonTimestamp}
169+
* @return new instance of {@link TerminatingChangeStream}.
170+
* @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue)
171+
* @see ChangeStreamOptionsBuilder#resumeToken(BsonValue)
172+
* @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
173+
*/
174+
TerminatingChangeStream<T> resumeAfter(Object beacon);
175+
176+
/**
177+
* Start the change stream after a given point.
178+
*
179+
* @param beacon an {@link Instant} or {@link BsonTimestamp}
180+
* @return new instance of {@link TerminatingChangeStream}.
181+
* @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue)
182+
* @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
183+
*/
184+
TerminatingChangeStream<T> startAfter(Object beacon);
185+
}
186+
187+
/**
188+
* Provide some options.
189+
*/
190+
interface ChangeStreamWithOptions<T> {
191+
192+
/**
193+
* Provide some options via the callback by modifying the given {@link ChangeStreamOptionsBuilder}. Previously
194+
* defined options like a {@link ResumingChangeStream#resumeAfter(Object) resumeToken} are carried over to the
195+
* builder and can be overwritten via eg. {@link ChangeStreamOptionsBuilder#resumeToken(BsonValue)}.
196+
*
197+
* @param optionsConsumer never {@literal null}.
198+
* @return new instance of {@link ReactiveChangeStream}.
199+
*/
200+
ReactiveChangeStream<T> withOptions(Consumer<ChangeStreamOptionsBuilder> optionsConsumer);
201+
}
202+
203+
/**
204+
* {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way.
205+
*/
206+
interface ReactiveChangeStream<T> extends ChangeStreamWithOptions<T>, ChangeStreamWithCollection<T>,
207+
TerminatingChangeStream<T>, ResumingChangeStream<T>, ChangeStreamWithFilter<T> {}
208+
}

0 commit comments

Comments
 (0)