Skip to content

Commit 4a17048

Browse files
christophstroblmp911de
authored andcommitted
DATAMONGO-2089 - Add fluent change stream API to ReactiveMongoTemplate.
We now offer a fluent API for more intuitive change stream interaction. Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) .watchCollection("people") .filter(where("age").gte(38)) .listen(); Original pull request: #751.
1 parent 06018fa commit 4a17048

File tree

9 files changed

+874
-13
lines changed

9 files changed

+874
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import reactor.core.publisher.Flux;
19+
20+
import java.time.Instant;
21+
import java.util.function.Consumer;
22+
23+
import org.bson.BsonTimestamp;
24+
import org.bson.BsonValue;
25+
import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
26+
import org.springframework.data.mongodb.core.aggregation.Aggregation;
27+
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
28+
29+
/**
30+
* {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB
31+
* <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> operations in a fluent API * style. <br />
32+
* The starting {@literal domainType} is used for mapping a potentially given
33+
* {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the
34+
* originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}.
35+
* However, it is possible to define an different {@literal returnType} via {@code as}.<br />
36+
* The collection to operate on is optional in which case call collection with the actual database are watched, use
37+
* {@literal watchCollection} to define a fixed collection.
38+
*
39+
* <pre>
40+
* <code>
41+
* changeStream(Human.class)
42+
* .watchCollection("star-wars")
43+
* .filter(where("operationType").is("insert"))
44+
* .as(Jedi.class)
45+
* .resumeAt(Instant.now())
46+
* .listen();
47+
* </code>
48+
* </pre>
49+
*
50+
* @author Christoph Strobl
51+
* @since 2.2
52+
*/
53+
public interface ReactiveChangeStreamOperation {
54+
55+
/**
56+
* Start creating a change stream operation for the given {@literal domainType} watching all collections within the
57+
* database. <br />
58+
* Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or
59+
* {@link ChangeStreamWithFilterAndProjection#filter(CriteriaDefinition) filter}.
60+
*
61+
* @param domainType must not be {@literal null}. Use {@link org.bson.Document} to obtain raw elements.
62+
* @return new instance of {@link ReactiveChangeStream}. Never {@literal null}.
63+
* @throws IllegalArgumentException if domainType is {@literal null}.
64+
*/
65+
<T> ReactiveChangeStream<T> changeStream(Class<T> domainType);
66+
67+
/**
68+
* Compose change stream execution by calling one of the terminating methods.
69+
*/
70+
interface TerminatingChangeStream<T> {
71+
72+
/**
73+
* Start listening to changes. The stream will not be completed unless the {@link org.reactivestreams.Subscription}
74+
* is {@link org.reactivestreams.Subscription#cancel() canceled}.
75+
* <p />
76+
* However, the stream may become dead, or invalid, if all watched collections, databases are dropped.
77+
*/
78+
Flux<ChangeStreamEvent<T>> listen();
79+
}
80+
81+
/**
82+
* Collection override (optional).
83+
*/
84+
interface ChangeStreamWithCollection<T> {
85+
86+
/**
87+
* Explicitly set the name of the collection to watch.<br />
88+
* Skip this step to watch all collections within the database.
89+
*
90+
* @param collection must not be {@literal null} nor {@literal empty}.
91+
* @return new instance of {@link ChangeStreamWithCollection}.
92+
* @throws IllegalArgumentException if collection is {@literal null}.
93+
*/
94+
ChangeStreamWithFilterAndProjection<T> watchCollection(String collection);
95+
}
96+
97+
/**
98+
* Provide a filter for limiting results (optional).
99+
*/
100+
interface ChangeStreamWithFilterAndProjection<T> extends ResumingChangeStream<T>, TerminatingChangeStream<T> {
101+
102+
/**
103+
* Use an {@link Aggregation} to filter matching events.
104+
*
105+
* @param by must not be {@literal null}.
106+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
107+
* @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}.
108+
*/
109+
ChangeStreamWithFilterAndProjection<T> filter(Aggregation by);
110+
111+
/**
112+
* Use a {@link CriteriaDefinition critera} to filter matching events via an
113+
* {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}.
114+
*
115+
* @param by must not be {@literal null}.
116+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
117+
* @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}.
118+
*/
119+
ChangeStreamWithFilterAndProjection<T> filter(CriteriaDefinition by);
120+
121+
/**
122+
* Define the target type fields should be mapped to.
123+
*
124+
* @param resultType must not be {@literal null}.
125+
* @param <R> result type.
126+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
127+
* @throws IllegalArgumentException if resultType is {@literal null}.
128+
*/
129+
<R> ChangeStreamWithFilterAndProjection<R> as(Class<R> resultType);
130+
}
131+
132+
/**
133+
* Resume a change stream. (optional).
134+
*/
135+
interface ResumingChangeStream<T> extends TerminatingChangeStream<T> {
136+
137+
/**
138+
* Resume the change stream at a given point.
139+
*
140+
* @param token an {@link Instant} or {@link BsonTimestamp}
141+
* @return new instance of {@link TerminatingChangeStream}.
142+
* @see ChangeStreamOptionsBuilder#resumeAt(Instant)
143+
* @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp)
144+
* @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}.
145+
*/
146+
TerminatingChangeStream<T> resumeAt(Object token);
147+
148+
/**
149+
* Resume the change stream after a given point.
150+
*
151+
* @param token an {@link Instant} or {@link BsonTimestamp}
152+
* @return new instance of {@link TerminatingChangeStream}.
153+
* @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue)
154+
* @see ChangeStreamOptionsBuilder#resumeToken(BsonValue)
155+
* @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
156+
*/
157+
TerminatingChangeStream<T> resumeAfter(Object token);
158+
159+
/**
160+
* Start the change stream after a given point.
161+
*
162+
* @param token an {@link Instant} or {@link BsonTimestamp}
163+
* @return new instance of {@link TerminatingChangeStream}.
164+
* @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue)
165+
* @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
166+
*/
167+
TerminatingChangeStream<T> startAfter(Object token);
168+
}
169+
170+
/**
171+
* Provide some options.
172+
*/
173+
interface ChangeStreamWithOptions<T> {
174+
175+
/**
176+
* Provide some options via the callback by modifying the given {@link ChangeStreamOptionsBuilder}. Previously
177+
* defined options like a {@link ResumingChangeStream#resumeAfter(Object) resumeToken} are carried over to the
178+
* builder and can be overwritten via eg. {@link ChangeStreamOptionsBuilder#resumeToken(BsonValue)}.
179+
*
180+
* @param optionsConsumer never {@literal null}.
181+
* @return new instance of {@link ReactiveChangeStream}.
182+
*/
183+
ReactiveChangeStream<T> withOptions(Consumer<ChangeStreamOptionsBuilder> optionsConsumer);
184+
}
185+
186+
/**
187+
* {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way.
188+
*/
189+
interface ReactiveChangeStream<T> extends ChangeStreamWithOptions<T>, ChangeStreamWithCollection<T>,
190+
TerminatingChangeStream<T>, ResumingChangeStream<T>, ChangeStreamWithFilterAndProjection<T> {}
191+
}

0 commit comments

Comments
 (0)