As of MongoDB 3.6, Change Streams let applications get notified about changes without having to tail the oplog.
Note
|
Change Stream support is only possible for replica sets or for a sharded cluster. |
Change Streams can be consumed with both, the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still obtain change events by using the messaging concept that is already prevalent in the Spring ecosystem.
It is possible to watch both on a collection as well as database level, whereas the database level variant publishes
changes from all collections within the database. When subscribing to a database change stream, make sure to use a
suitable type for the event type as conversion might not apply correctly across different entity types.
In doubt, use Document
.
Listening to a Change Stream by using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component.
In this case, we need to first create a MessageListenerContainer
, which will be the main entry point for running the specific SubscriptionRequest
tasks.
Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate
and is capable of creating and executing Task
instances for a ChangeStreamRequest
.
The following example shows how to use Change Streams with MessageListener
instances:
MessageListener
instancesMessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); (3)
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); (4)
// ...
container.stop(); (5)
-
Starting the container intializes the resources and starts
Task
instances for already registeredSubscriptionRequest
instances. Requests added after startup are ran immediately. -
Define the listener called when a
Message
is received. TheMessage#getBody()
is converted to the requested domain type. UseDocument
to receive raw results without conversion. -
Set the collection to listen to and provide additional options through
ChangeStreamOptions
. -
Register the request. The returned
Subscription
can be used to check the currentTask
state and cancel its execution to free resources. -
Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running
Task
instances within the container.
Subscribing to Change Streams with the reactive API is a more natural approach to work with streams. Still, the essential building blocks, such as ChangeStreamOptions
, remain the same. The following example shows how to use Change Streams emitting ChangeStreamEvent
s:
ChangeStreamEvent
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(newAggregation(User.class, match(where("age").gte(38))) (1)
.build();
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream("user", options, User.class); (2)
-
Use an aggregation pipeline to filter events.
-
Obtain a
Flux
of change stream events. TheChangeStreamEvent#getBody()
is converted to the requested domain type. UseDocument
to receive raw results without conversion.
Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume
token or the last known server time (in UTC). Use ChangeStreamOptions
to set the value accordingly.
The following example shows how to set the resume offset using server time:
ChangeStreamOptions = ChangeStreamOptions.builder()
.resumeAt(Instant.now().minusSeconds(1)) (1)
.build()
Flux<ChangeStreamEvent<Person>> resumed = template.changeStream("person", options, User.class)
-
You may obtain the server time of an
ChangeStreamEvent
through thegetTimestamp
method or use theresumeToken
exposed throughgetResumeToken
.
Tip
|
In some cases an Instant might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native
BsonTimestamp for that purpose.
|