This section covers the basics of the RabbitMQ Stream Java API by building a small publish/consume application. This is a good way to get an overview of the API. If you want a more comprehensive introduction, you can go to the reference documentation section.
The sample application publishes some messages and then registers a consumer to make some computations out of them. The source code is available on GitHub.
The sample class starts with a few imports:
link:../../test/java/com/rabbitmq/stream/docs/SampleApplication.java[role=include]
The next step is to create the Environment
. It is a management object
used to manage streams and create producers as well as consumers. The
next snippet shows how to create an Environment
instance and
create the stream used in the application:
link:../../test/java/com/rabbitmq/stream/docs/SampleApplication.java[role=include]
-
Use
Environment#builder
to create the environment -
Create the stream
Then comes the publishing part. The next snippet shows how to create
a Producer
, send messages, and handle publishing confirmations, to
make sure the broker has taken outbound messages into account.
The application uses a count down latch to move on once the messages
have been confirmed.
link:../../test/java/com/rabbitmq/stream/docs/SampleApplication.java[role=include]
-
Create the
Producer
withEnvironment#producerBuilder
-
Send messages with
Producer#send(Message, ConfirmationHandler)
-
Create a message with
Producer#messageBuilder
-
Count down on message publishing confirmation
-
Wait for all publishing confirmations to have arrived
-
Close the producer
It is now time to consume the messages. The Environment
lets us create a Consumer
and provide some logic on each incoming message by implementing a MessageHandler
.
The next snippet does this to calculate a sum and output it once all the messages
have been received:
link:../../test/java/com/rabbitmq/stream/docs/SampleApplication.java[role=include]
-
Create the
Consumer
withEnvironment#consumerBuilder
-
Start consuming from the beginning of the stream
-
Set up the logic to handle messages
-
Add the value in the message body to the sum
-
Count down on each message
-
Wait for all messages to have arrived
-
Output the sum
-
Close the consumer
The application has some cleaning to do before terminating, that is deleting the stream and closing the environment:
link:../../test/java/com/rabbitmq/stream/docs/SampleApplication.java[role=include]
-
Delete the stream
-
Close the environment
You can run the sample application from the root of the project (you need a running local RabbitMQ node with the stream plugin enabled):
$ ./mvnw -q test-compile exec:java -Dexec.classpathScope="test" \ -Dexec.mainClass="com.rabbitmq.stream.docs.SampleApplication" Starting publishing... Published 10000 messages Starting consuming... Sum: 49995000
You can remove the -q
flag if you want more insight on the execution of the build.