Skip to content

Latest commit

 

History

History
1305 lines (1037 loc) · 56.1 KB

File metadata and controls

1305 lines (1037 loc) · 56.1 KB

RabbitMQ Stream Java API

Overview

This section describes the API to connect to the RabbitMQ Stream Plugin, publish messages, and consume messages. There are 3 main interfaces:

  • com.rabbitmq.stream.Environment for connecting to a node and optionally managing streams.

  • com.rabbitmq.stream.Producer to publish messages.

  • com.rabbitmq.stream.Consumer to consume messages.

Environment

Creating the Environment

The environment is the main entry point to a node or a cluster of nodes. Producer and Consumer instances are created from an Environment instance. Here is the simplest way to create an Environment instance:

Creating an environment with all the defaults
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Create an environment that will connect to localhost:5552

  2. Close the environment after usage

Note the environment must be closed to release resources when it is no longer needed.

Consider the environment like a long-lived object. An application will usually create one Environment instance when it starts up and close it when it exits.

It is possible to use a URI to specify all the necessary information to connect to a node:

Creating an environment with a URI
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Use the uri method to specify the URI to connect to

The previous snippet uses a URI that specifies the following information: host, port, username, password, and virtual host (/, which is encoded as %2f). The URI follows the same rules as the AMQP 0.9.1 URI, except the protocol must be rabbitmq-stream. TLS is enabled by using the rabbitmq-stream+tls scheme in the URI.

When using one URI, the corresponding node will be the main entry point to connect to. The Environment will then use the stream protocol to find out more about streams topology (leaders and replicas) when asked to create Producer and Consumer instances. The Environment may become blind if this node goes down though, so it may be more appropriate to specify several other URIs to try in case of failure of a node:

Creating an environment with several URIs
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Use the uris method to specify several URIs

By specifying several URIs, the environment will try to connect to the first one, and will pick a new URI randomly in case of disconnection.

Understanding Connection Logic

Creating the environment to connect to a cluster node works usually seamlessly. Creating publishers and consumers can cause problems as the client uses hints from the cluster to find the nodes where stream leaders and replicas are located to connect to the appropriate nodes.

These connection hints can be accurate or less appropriate depending on the infrastructure. If you hit some connection problems at some point – like hostnames impossible to resolve for client applications - this blog post should help you understand what is going on and fix the issues.

To make the local development experience simple, the client library can choose to always use localhost for producers and consumers. This happens if the following conditions are met: the initial host to connect to is localhost, the user is guest, and no custom address resolver has been provided. Provide a pass-through AddressResolver to EnvironmentBuilder#addressResolver(AddressResolver) to avoid this behavior. It is unlikely this behavior applies for any real-world deployment, where localhost and/or the default guest user should not be used.

Enabling TLS

TLS can be enabled by using the rabbitmq-stream+tls scheme in the URI. The default TLS port is 5551.

Use the EnvironmentBuilder#tls method to configure TLS. The most important setting is a io.netty.handler.ssl.SslContext instance, which is created and configured with the io.netty.handler.ssl.SslContext#forClient method. Note hostname verification is enabled by default.

The following snippet shows a common configuration, whereby the client is instructed to trust servers with certificates signed by the configured certificate authority (CA).

Creating an environment that uses TLS
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Load certificate authority (CA) certificate from PEM file

  2. Configure Netty SslContext to trust CA certificate

  3. Use TLS scheme in environment URI

  4. Set SslContext in environment configuration

It is sometimes handy to trust any server certificates in development environments. EnvironmentBuilder#tls provides the trustEverything method to do so. This should not be used in a production environment.

Creating a TLS environment that trusts all server certificates for development
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Trust all server certificates

Configuring the Environment

The following table sums up the main settings to create an Environment:

Parameter Name Description Default

uri

The URI of the node to connect to (single node).

rabbitmq-stream://guest:guest@localhost:5552/%2f

uris

The URI of the nodes to try to connect to (cluster).

rabbitmq-stream://guest:guest@localhost:5552/%2f singleton list

host

Host to connect to.

localhost

port

Port to use.

5552

username

Username to use to connect.

guest

password

Password to use to connect.

guest

virtualHost

Virtual host to connect to.

/

rpcTimeout

Timeout for RPC calls.

Duration.ofSeconds(10)

recoveryBackOffDelayPolicy

Delay policy to use for backoff on connection recovery.

Fixed delay of 5 seconds

topologyUpdateBackOffDelayPolicy

Delay policy to use for backoff on topology update, e.g. when a stream replica moves and a consumer needs to connect to another node.

Initial delay of 5 seconds then delay of 1 second.

scheduledExecutorService

Executor used to schedule infrastructure tasks like background publishing, producers and consumers migration after disconnection or topology update. If a custom executor is provided, it is the developer’s responsibility to close it once it is no longer necessary.

Executors
  .newScheduledThreadPool(
    Runtime
      .getRuntime()
      .availableProcessors()
);

maxProducersByConnection

The maximum number of Producer instances a single connection can maintain before a new connection is open. The value must be between 1 and 256. The limit may not be strictly enforced in case of too many concurrent creations.

256

maxTrackingConsumersByConnection

The maximum number of Consumer instances that store their offset a single connection can maintain before a new connection is open. The value must be between 1 and 256. The limit may not be strictly enforced in case of too many concurrent creations.

50

maxConsumersByConnection

The maximum number of Consumer instances a single connection can maintain before a new connection is open. The value must be between 1 and 256. The limit may not be strictly enforced in case of too many concurrent creations.

256

lazyInitialization

To delay the connection opening until necessary.

false

requestedHeartbeat

Heartbeat requested by the client.

60 seconds

forceReplicaForConsumers

Retry connecting until a replica is available for consumers. The client retries 5 times before falling back to the stream leader node. Set to true only for clustered environments, not for 1-node environments, where only the stream leader is available.

false

forceLeaderForProducers

Force connecting to a stream leader for producers. Set to false if it acceptable to stay connected to a stream replica when a load balancer is in use.

true

id

Informational ID for the environment instance. Used as a prefix for connection names.

rabbitmq-stream

addressResolver

Contract to change resolved node address to connect to.

Pass-through (no-op)

locatorConnectionCount

Number of locator connections to maintain (for metadata search)

The smaller of the number of URIs and 3.

tls

Configuration helper for TLS.

TLS is enabled if a rabbitmq-stream+tls URI is provided.

tls#sslContext

Set the io.netty.handler.ssl.SslContext used for the TLS connection. Use io.netty.handler.ssl.SslContextBuilder#forClient to configure it. The server certificate chain, the client private key, and hostname verification are the usual elements that need to be configured.

The JDK trust manager and no client private key.

tls#trustEverything

Helper to configure a SslContext that trusts all server certificates and does not use a client private key. Only for development.

Disabled by default.

netty

Configuration helper for Netty.

netty#eventLoopGroup

Netty’s event dispatcher. It is the developer’s responsibility to close the EventLoopGroup they provide.

NioEventLoopGroup instance closed automatically with the Environment instance.

netty#ByteBufAllocator

ByteBuf allocator.

ByteBufAllocator.DEFAULT

netty#channelCustomizer

Extension point to customize Netty’s Channel instances used for connections.

None

netty#bootstrapCustomizer

Extension point to customize Netty’s Bootstrap instances used to configure connections.

None

When a Load Balancer is in Use

A load balancer can misguide the client when it tries to connect to nodes that host stream leaders and replicas. The "Connecting to Streams" blog post covers why client applications must connect to the appropriate nodes in a cluster and how a load balancer can make things complicated for them.

The EnvironmentBuilder#addressResolver(AddressResolver) method allows intercepting the node resolution after metadata hints and before connection. Applications can use this hook to ignore metadata hints and always use the load balancer, as illustrated in the following snippet:

Using a custom address resolver to always use a load balancer
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Set the load balancer address

  2. Use load balancer address for initial connection

  3. Ignore metadata hints, always use load balancer

  4. Set the number of locator connections to maintain

Note the example above sets the number of locator connections the environment maintains. Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to). The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment. This is why it is recommended to set the value explicitly, 3 being a good default.

Managing Streams

Streams are usually long-lived, centrally-managed entities, that is, applications are not supposed to create and delete them. It is nevertheless possible to create and delete stream with the Environment. This comes in handy for development and testing purposes.

Streams are created with the Environment#streamCreator() method:

Creating a stream
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Create the my-stream stream

StreamCreator#create is idempotent: trying to re-create a stream with the same name and same properties (e.g. maximum size, see below) will not throw an exception. In other words, you can be sure the stream has been created once StreamCreator#create returns. Note it is not possible to create a stream with the same name as an existing stream but with different properties. Such a request will result in an exception.

Streams can be deleted with the Environment#delete(String) method:

Deleting a stream
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Delete the my-stream stream

Note you should avoid stream churn (creating and deleting streams repetitively) as their creation and deletion imply some significant housekeeping on the server side (interactions with the file system, communication between nodes of the cluster).

It is also possible to limit the size of a stream when creating it. A stream is an append-only data structure and reading from it does not remove data. This means a stream can grow indefinitely. RabbitMQ Stream supports a size-based and time-based retention policies: once the stream reaches a given size or a given age, it is truncated (starting from the beginning).

Important
Limit the size of streams if appropriate!

Make sure to set up a retention policy on potentially large streams if you don’t want to saturate the storage devices of your servers. Keep in mind that this means some data will be erased!

It is possible to set up the retention policy when creating the stream:

Setting the retention policy when creating a stream
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Set the maximum size to 10 GB

  2. Set the segment size to 500 MB

The previous snippet mentions a segment size. RabbitMQ Stream does not store a stream in a big, single file, it uses segment files for technical reasons. A stream is truncated by deleting whole segment files (and not part of them)so the maximum size of a stream is usually significantly higher than the size of segment files. 500 MB is a reasonable segment file size to begin with.

Note
When does the broker enforce the retention policy?

The broker enforces the retention policy when the segments of a stream roll over, that is when the current segment has reached its maximum size and is closed in favor of a new one. This means the maximum segment size is a critical setting in the retention mechanism.

RabbitMQ Stream also supports a time-based retention policy: segments get truncated when they reach a certain age. The following snippet illustrates how to set the time-based retention policy:

Setting a time-based retention policy when creating a stream
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Set the maximum age to 6 hours

  2. Set the segment size to 500 MB

Producer

Creating a Producer

A Producer instance is created from the Environment. The only mandatory setting to specify is the stream to publish to:

Creating a producer from the environment
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Use Environment#producerBuilder() to define the producer

  2. Specify the stream to publish to

  3. Create the producer instance with build()

  4. Close the producer after usage

Consider a Producer instance like a long-lived object, do not create one to send just one message.

Note
Producer thread safety

Producer instances are thread-safe. Deduplication imposes restrictions on the usage of threads though.

Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.

The following table sums up the main settings to create a Producer:

Parameter Name Description Default

stream

The stream to publish to.

No default, mandatory setting.

name

The logical name of the producer. Specify a name to enable message deduplication.

null (no deduplication)

batchSize

The maximum number of messages to accumulate before sending them to the broker.

100

subEntrySize

The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled. See the dedicated section for more information.

1 (meaning no use of sub-entry batching)

compression

Compression algorithm to use when sub-entry batching is in use. See the dedicated section for more information.

Compression.NONE

maxUnconfirmedMessages

The maximum number of unconfirmed outbound messages. Producer#send will start blocking when the limit is reached.

10,000

batchPublishingDelay

Period to send a batch of messages.

100 ms

dynamicBatch

Adapt batch size depending on ingress rate.

true

confirmTimeout

Time before the client calls the confirm callback to signal outstanding unconfirmed messages timed out.

30 seconds

enqueueTimeout

Time before enqueueing of a message fail when the maximum number of unconfirmed is reached. The callback of the message will be called with a negative status. Set the value to Duration.ZERO if there should be no timeout.

10 seconds

retryOnRecovery

Whether to republish unconfirmed messages after recovery. Set to false to not republish unconfirmed messages and get a negative ConfirmationStatus for unconfirmed messages.

true

Sending Messages

Once a Producer has been created, it is possible to send a message with the Producer#send(Message, ConfirmationHandler) method. The following snippet shows how to publish a message with a byte array payload:

Sending a message
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. The payload of a message is an array of bytes

  2. Create the message with Producer#messageBuilder()

  3. Define the behavior on publish confirmation

Messages are not only made of a byte[] payload, we will see in the next section they can also carry pre-defined and application properties.

Note
Use a MessageBuilder instance only once

A MessageBuilder instance is meant to create only one message. You need to create a new instance of MessageBuilder for every message you want to create.

The ConfirmationHandler defines an asynchronous callback invoked when the client received from the broker the confirmation the message has been taken into account. The ConfirmationHandler is the place for any logic on publishing confirmation, including re-publishing the message if it is negatively acknowledged.

Warning
Keep the confirmation callback as short as possible

The confirmation callback should be kept as short as possible to avoid blocking the connection thread. Not doing so can make the Environment, Producer, Consumer instances sluggish or even block them. Any long processing should be done in a separate thread (e.g. with an asynchronous ExecutorService).

Working with Complex Messages

The publishing example above showed that messages are made of a byte array payload, but it did not go much further. Messages in RabbitMQ Stream can actually be more sophisticated, as they comply to the AMQP 1.0 message format.

In a nutshell, a message in RabbitMQ Stream has the following structure:

  • properties: a defined set of standard properties of the message (e.g. message ID, correlation ID, content type, etc).

  • application properties: a set of arbitrary key/value pairs.

  • body: typically an array of bytes.

  • message annotations: a set of key/value pairs (aimed at the infrastructure).

The RabbitMQ Stream Java client uses the Message interface to abstract a message and the recommended way to create Message instances is to use the Producer#messageBuilder() method. To publish a Message, use the Producer#send(Message,ConfirmationHandler):

Creating a message with properties
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Get the message builder from the producer

  2. Get the properties builder and set some properties

  3. Go back to message builder

  4. Set byte array payload

  5. Build the message instance

  6. Publish the message

Note
Is RabbitMQ Stream based on AMQP 1.0?

AMQP 1.0 is a standard that defines an efficient binary peer-to-peer protocol for transporting messages between two processes over a network. It also defines an abstract message format, with concrete standard encoding. This is only the latter part that RabbitMQ Stream uses. The AMQP 1.0 protocol is not used, only AMQP 1.0 encoded messages are wrapped into the RabbitMQ Stream binary protocol.

The actual AMQP 1.0 message encoding and decoding happen on the client side, the RabbitMQ Stream plugin stores only bytes, it has no idea that AMQP 1.0 message format is used.

AMQP 1.0 message format was chosen because of its flexibility and its advanced type system. It provides good interoperability, which allows streams to be accessed as AMQP 0-9-1 queues, without data loss.

Message Deduplication

RabbitMQ Stream provides publisher confirms to avoid losing messages: once the broker has persisted a message it sends a confirmation for this message. But this can lead to duplicate messages: imagine the connection closes because of a network glitch after the message has been persisted but before the confirmation reaches the producer. Once reconnected, the producer will retry to send the same message, as it never received the confirmation. So the message will be persisted twice.

Luckily RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the producer name and the message publishing ID.

Warning
Only one publisher instance with a given name and no multithreading to guarantee deduplication

We’ll see below that deduplication works using a strictly increasing sequence for messages. This means messages must be published in order, so there must be only one publisher instance with a given name and this instance must publish messages within a single thread.

With several publisher instances with the same name, one instance can be "ahead" of the others for the sequence ID: if it publishes a message with sequence ID 100, any message from any instance with a smaller lower sequence ID will be filtered out.

If there is only one publisher instance with a given name, it should publish messages in a single thread. Even if messages are created in order, with the proper sequence ID, they can get out of order if they are published in several threads, e.g. message 5 can be published before message 2. The deduplication mechanism will then filter out message 2 in this case.

You have to be very careful about the way your applications publish messages when deduplication is in use: make sure publisher instances do not share the same name and use only a single thread. If you worry about performance, note it is possible to publish hundreds of thousands of messages in a single thread with RabbitMQ Stream.

Warning
Deduplication is not guaranteed when using sub-entries batching

It is not possible to guarantee deduplication when sub-entry batching is in use. Sub-entry batching is disabled by default and it does not prevent from batching messages in a single publish frame, which can already provide very high throughput.

Setting the Name of a Producer

The producer name is set when creating the producer instance, which automatically enables deduplication:

Naming a producer to enable message deduplication
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Set a name for the producer

  2. Disable confirm timeout check

Thanks to the name, the broker will be able to track the messages it has persisted on a given stream for this producer. If the producer connection unexpectedly closes, it will automatically recover and retry outstanding messages. The broker will then filter out messages it has already received and persisted. No more duplicates!

Important
Why setting confirmTimeout to 0 when using deduplication?

The point of deduplication is to avoid duplicates when retrying unconfirmed messages. But why retrying in the first place? To avoid losing messages, that is enforcing at-least-once semantics. If the client does not stubbornly retry messages and gives up at some point, messages can be lost, which maps to at-most-once semantics. This is why the deduplication examples set the confirmTimeout setting to Duration.ZERO: to disable the background task that calls the confirmation callback for outstanding messages that time out. This way the client will do its best to retry messages until they are confirmed.

A producer name must be stable and clear to a human reader. It must not be a random sequence that changes when the producer application is restarted. Names like online-shop-order or online-shop-invoice are better names than 3d235e79-047a-46a6-8c80-9d159d3e1b05. There should be only one living instance of a producer with a given name on a given stream at the same time.

Understanding Publishing ID

The producer name is only one part of the deduplication mechanism, the other part is the message publishing ID. If the producer has a name, the client automatically assigns a publishing ID to each outbound message for the producer. The publishing ID is a strictly increasing sequence, starting at 0 and incremented for each message. The default publishing sequence is good enough for deduplication, but it is possible to assign a publishing ID to each message:

Using an explicit publishing ID
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Set a publishing ID on a message

There are a few rules to follow when using a custom publishing ID sequence:

  • the sequence must be strictly increasing

  • there can be gaps in the sequence (e.g. 0, 1, 2, 3, 6, 7, 9, 10, etc)

  • the sequence does not have to start at 0, as long as it is increasing

A custom publishing ID sequence has usually a meaning: it can be the line number of a file or the primary key in a database.

Note the publishing ID is not part of the message: it is not stored with the message and so is not available when consuming the message. It is still possible to store the value in the AMQP 1.0 message application properties or in an appropriate properties (e.g. messageId).

Important
Do not mix client-assigned and custom publishing ID

As soon as a producer name is set, message deduplication is enabled. It is then possible to let the producer assign a publishing ID to each message or assign custom publishing IDs. Do one or the other, not both!

Restarting a Producer Where It Left Off

Using a custom publishing sequence is even more useful to restart a producer where it left off. Imagine a scenario whereby the producer is sending a message for each line in a file and the application uses the line number as the publishing ID. If the application restarts because of some necessary maintenance or even a crash, the producer can restart from the beginning of the file: there would no duplicate messages because the producer has a name and the application sets publishing IDs appropriately. Nevertheless, this is far from ideal, it would be much better to restart just after the last line the broker successfully confirmed. Fortunately this is possible thanks to the Producer#getLastPublishing() method, which returns the last publishing ID for a given producer. As the publishing ID in this case is the line number, the application can easily scroll to the next line and restart publishing from there.

The next snippet illustrates the use of Producer#getLastPublishing():

Setting a producer where it left off
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Set a name for the producer

  2. Disable confirm timeout check

  3. Query last publishing ID for this producer and increment it

  4. Scroll to the content for the next publishing ID

  5. Set the message publishing

Sub-Entry Batching and Compression

RabbitMQ Stream provides a special mode to publish, store, and dispatch messages: sub-entry batching. This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled. It also allows using compression to reduce bandwidth and storage if messages are reasonably similar, at the cost of increasing CPU usage on the client side.

Sub-entry batching consists in squeezing several messages – a batch – in the slot that is usually used for one message. This means outbound messages are not only batched in publishing frames, but in sub-entries as well.

You can enable sub-entry batching by setting the ProducerBuilder#subEntrySize parameter to a value greater than 1, like in the following snippet:

Enabling sub-entry batching
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Set batch size to 100 (the default)

  2. Set sub-entry size to 10

Reasonable values for the sub-entry size usually go from 10 to a few dozens.

A sub-entry batch will go directly to disc after it reached the broker, so the publishing client has complete control over it. This is the occasion to take advantage of the similarity of messages and compress them. There is no compression by default but you can choose among several algorithms with the ProducerBuilder#compression(Compression) method:

Enabling compression of sub-entry messages
link:../../test/java/com/rabbitmq/stream/docs/ProducerUsage.java[role=include]
  1. Set batch size to 100 (the default)

  2. Set sub-entry size to 10

  3. Use the Zstandard compression algorithm

Note the messages in a sub-entry are compressed altogether to benefit from their potential similarity, not one by one.

The following table lists the supported algorithms, general information about them, and the respective implementations used by default.

Algorithm Overview Implementation used

gzip

Has a high compression ratio but is slow compared to other algorithms.

JDK implementation

Snappy

Aims for reasonable compression ratio and very high speeds.

Xerial Snappy (framed)

LZ4

Aims for good trade-off between speed and compression ratio.

LZ4 Java (framed)

zstd (Zstandard)

Aims for high compression ratio and high speed, especially for decompression.

zstd-jni

You are encouraged to test and evaluate the compression algorithms depending on your needs.

The compression libraries are pluggable thanks to the EnvironmentBuilder#compressionCodecFactory(CompressionCodecFactory) method.

Note
Consumers, sub-entry batching, and compression

There is no configuration required for consumers with regard to sub-entry batching and compression. The broker dispatches messages to client libraries: they are supposed to figure out the format of messages, extract them from their sub-entry, and decompress them if necessary. So when you set up sub-entry batching and compression in your publishers, the consuming applications must use client libraries that support this mode, which is the case for the stream Java client.

Consumer

Consumer is the API to consume messages from a stream.

Creating a Consumer

A Consumer instance is created with Environment#consumerBuilder(). The main settings are the stream to consume from, the place in the stream to start consuming from (the offset), and a callback when a message is received (the MessageHandler). The next snippet shows how to create a Consumer:

Creating a consumer
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Use Environment#consumerBuilder() to define the consumer

  2. Specify the stream to consume from

  3. Specify where to start consuming from

  4. Define behavior on message consumption

  5. Build the consumer

  6. Close consumer after usage

The broker start sending messages as soon as the Consumer instance is created.

Warning
The message processing callback can take its time, but not too much

The message processing callback should not take too long or it could impact other consumers sharing the same connection. The EnvironmentBuilder#maxConsumersByConnection(int) method allows isolating consumers from each other, at the cost of creating and maintaining more connections. Consider using a separate thread for long processing (e.g. with an asynchronous ExecutorService). Note message processing callbacks run in a dedicated thread, they do not impact other network frames, which run in their own thread.

The following table sums up the main settings to create a Consumer:

Parameter Name Description Default

stream

The stream to consume from.

No default, mandatory setting.

offset

The offset to start consuming from.

OffsetSpecification#next()

messageHandler

The callback for inbound messages.

No default, mandatory setting.

name

The consumer name (for offset tracking.)

null (no offset tracking)

AutoTrackingStrategy

Enable and configure the auto-tracking strategy.

This is the default tracking strategy if a consumer name is provided.

AutoTrackingStrategy#messageCountBeforeStorage

Number of messages before storing.

10,000

AutoTrackingStrategy#flushInterval

Interval to check and store the last received offset in case of inactivity.

Duration.ofSeconds(5)

ManualTrackingStrategy

Enable and configure the manual tracking strategy.

Disabled by default.

ManualTrackingStrategy#checkInterval

Interval to check if the last requested stored offset has been actually stored.

Duration.ofSeconds(5)

noTrackingStrategy

Disable server-side offset tracking even if a name is provided. Useful when single active consumer is enabled and an external store is used for offset tracking.

false

subscriptionListener

A callback before the subscription is created. Useful when using an external store for offset tracking.

null

flow

Configuration helper for flow control.

flow#initialCredits

Number of credits when the subscription is created. Increase for higher throughput at the expense of memory usage.

10

flow#strategy

The ConsumerFlowStrategy to use.

ConsumerFlowStrategy#creditOnChunkArrival(10)

Note
Why is my consumer not consuming?

A consumer starts consuming at the very end of a stream by default (next offset). This means the consumer will receive messages as soon as a producer publishes to the stream. This also means that if no producers are currently publishing to the stream, the consumer will stay idle, waiting for new messages to come in. Use the ConsumerBuilder#offset(OffsetSpecification) to change the default behavior and see the offset section to find out more about the different types of offset specification.

Specifying an Offset

The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:

  • OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).

  • OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).

  • OffsetSpecification.next(): starting from the next offset to be written. Contrary to OffsetSpecification.last(), consuming with OffsetSpecification.next() will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.

  • OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.

  • OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary.

Note
What is a chunk of messages?

A chunk is simply a batch of messages. This is the storage and transportation unit used in RabbitMQ Stream, that is messages are stored contiguously in a chunk and they are delivered as part of a chunk. A chunk can be made of one to several thousands of messages, depending on the ingress.

The following figure shows the different offset specifications in a stream made of 2 chunks:

Offset specifications in a stream made of 2 chunks
   +------------------------------------------+ +-------------------------+
   |  +-----+ +-----+ +-----+ +-----+ +-----+ | | +-----+ +-----+ +-----+ |
   |  |  0  | |  1  | |  2  | |  3  | |  4  | | | |  5  | |  6  | |  7  | |
   |  +-----+ +-----+ +-----+ +-----+ +-----+ | | +-----+ +-----+ +-----+ |
   +------------------------------------------+ +-------------------------+
         ^            Chunk 1    ^                   ^    Chunk 2            ^
         |                       |                   |                       |
       FIRST                  OFFSET 3              LAST                    NEXT

Each chunk contains a timestamp of its creation time. This is this timestamp the broker uses to find the appropriate chunk to start from when using a timestamp specification. The broker chooses the closest chunk before the specified timestamp, that is why consumers may see messages published a bit before what they specified.

Tracking the Offset for a Consumer

RabbitMQ Stream provides server-side offset tracking. This means a consumer can track the offset it has reached in a stream. It allows a new incarnation of the consumer to restart consuming where it left off. All of this without an extra datastore, as the broker stores the offset tracking information.

Offset tracking works in 2 steps:

  • the consumer must have a name. The name is set with ConsumerBuilder#name(String). The name can be any value (under 256 characters) and is expected to be unique (from the application point of view). Note neither the client library, nor the broker enforces uniqueness of the name: if 2 Consumer Java instances share the same name, their offset tracking will likely be interleaved, which applications usually do not expect.

  • the consumer must periodically store the offset it has reached so far. The way offsets are stored depends on the tracking strategy: automatic or manual.

Whatever tracking strategy you use, a consumer must have a name to be able to store offsets.

Automatic Offset Tracking

The following snippet shows how to enable automatic tracking with the defaults:

Using automatic tracking strategy with the defaults
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the consumer name (mandatory for offset tracking)

  2. Use automatic tracking strategy with defaults

The automatic tracking strategy has the following available settings:

  • message count before storage: the client will store the offset after the specified number of messages, right after the execution of the message handler. The default is every 10,000 messages.

  • flush interval: the client will make sure to store the last received offset at the specified interval. This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds.

Those settings are configurable, as shown in the following snippet:

Configuring the automatic tracking strategy
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the consumer name (mandatory for offset tracking)

  2. Use automatic tracking strategy

  3. Store every 50,000 messages

  4. Make sure to store offset at least every 10 seconds

Note the automatic tracking is the default tracking strategy, so if you are fine with its defaults, it is enabled as soon as you specify a name for the consumer:

Setting only the consumer name to enable automatic tracking
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set only the consumer name to enable automatic tracking with defaults

Automatic tracking is simple and provides good guarantees. It is nevertheless possible to have more fine-grained control over offset tracking by using manual tracking.

Manual Offset Tracking

The manual tracking strategy lets the developer in charge of storing offsets whenever they want, not only after a given number of messages has been received and supposedly processed, like automatic tracking does.

The following snippet shows how to enable manual tracking and how to store the offset at some point:

Using manual tracking with defaults
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the consumer name (mandatory for offset tracking)

  2. Use manual tracking with defaults

  3. Store the current offset on some condition

Manual tracking has only one setting: the check interval. The client checks that the last requested stored offset has been actually stored at the specified interval. The default check interval is 5 seconds.

The following snippet shows the configuration of manual tracking:

Configuring manual tracking strategy
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the consumer name (mandatory for offset tracking)

  2. Use manual tracking with defaults

  3. Check last requested offset every 10 seconds

  4. Store the current offset on some condition

The snippet above uses MessageHandler.Context#storeOffset() to store at the offset of the current message, but it is possible to store anywhere in the stream with MessageHandler.Context#consumer()#store(long) or simply Consumer#store(long).

Considerations On Offset Tracking

When to store offsets? Avoid storing offsets too often or, worse, for each message. Even though offset tracking is a small and fast operation, it will make the stream grow unnecessarily, as the broker persists offset tracking entries in the stream itself.

A good rule of thumb is to store the offset every few thousands of messages. Of course, when the consumer will restart consuming in a new incarnation, the last tracked offset may be a little behind the very last message the previous incarnation actually processed, so the consumer may see some messages that have been already processed.

A solution to this problem is to make sure processing is idempotent or filter out the last duplicated messages.


Is the offset a reliable absolute value? Message offsets may not be contiguous. This implies that the message at offset 500 in a stream may not be the 501 message in the stream (offsets start at 0). There can be different types of entries in a stream storage, a message is just one of them. For example, storing an offset creates an offset tracking entry, which has its own offset.

This means one must be careful when basing some decision on offset values, like a modulo to perform an operation every X messages. As the message offsets have no guarantee to be contiguous, the operation may not happen exactly every X messages.

Subscription Listener

The client provides a SubscriptionListener interface callback to add behavior before a subscription is created. This callback can be used to customize the offset the client library computed for the subscription. The callback is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection or a topology change).

Warning
This API is experimental, it is subject to change.

It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides. The following code snippet shows how this can be done (note the interaction with the external store is not detailed):

Using an external store for offset tracking with a subscription listener
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set subscription listener

  2. Get offset from external store

  3. Set offset to use for the subscription

  4. Store the offset in the external store after processing

When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use.

Using a subscription listener can also be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex. This requires a good understanding on how and when subscription occurs in the client, and so when the subscription listener is called:

  • for a consumer with no name (server-side offset tracking disabled)

    • on the first subscription (when the consumer is created): the offset specification is the one specified with ConsumerBuilder#offset(OffsetSpecification), the default being OffsetSpecification#next()

    • on re-subscription (after a disconnection or topology change): the offset specification is the offset of the last dispatched message

  • for a consumer with a name (server-side offset tracking enabled)

    • on the first subscription (when the consumer is created): the server-side stored offset (if any) overrides the value specified with ConsumerBuilder#offset(OffsetSpecification)

    • on re-subscription (after a disconnection or topology change): the server-side stored offset is used

The subscription listener comes in handy on re-subscription. The application can track the last processed offset in-memory, with an AtomicLong for example. The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the client may not be perfectly appropriate on re-subscription.

Let’s take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval. When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed. Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate. A custom SubscriptionListener lets the application developer uses what’s best for the application if the computed value is not optimal.

Flow Control

This section covers how a consumer can tell the broker when to send more messages.

By default, the broker keeps sending messages as long as messages are processed and the MessageHandler#handle(Context, Message) method returns. This strategy works fine if message processing is fast enough. If message processing takes longer, one can be tempted to process messages in parallel with an ExecutorService. This will make the handle method return immediately and the broker will keep sending messages, potentially overflowing the consumer.

What we miss in the parallel processing case is a way to tell the library we are done processing a message and that we are ready at some point to handle more messages. This is the goal of the MessageHandler.Context#processed() method.

This method is by default a no-op because the default flow control strategy keeps asking for more messages as soon as message processing is done. This method gets some real behavior to control the flow of messages when an appropriate ConsumerFlowStrategy is set ConsumerBuilder#flow(). The following code snippet shows how to set a handy consumer flow strategy:

Setting a consumer flow control strategy
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the flow control strategy

  2. Make sure to call Context#processed()

In the example we set up the creditWhenHalfMessagesProcessed strategy which asks for more messages once half of the current messages have been marked as processed. The broker does not send messages one by one, it sends chunks of messages. A chunk of messages can contain 1 to several thousands of messages. So with the strategy set above, once processed() has been called for half of the messages of the current chunk, the library will ask the broker for another one (it will provide a credit for the subscription). By doing this, the next chunk should arrive by the time we are done with the other half of the current chunk. This way the consumer is neither overwhelmed nor idle.

The ConsumerFlowStrategy interface provides some static helpers to configure the appropriate strategy.

Additional notes on consumer flow control:

  • Make sure to call the processed() method once you set up a ConsumerFlowStrategy. The method is a no-op by default, but it is essential to call it with count-based strategies like creditWhenHalfMessagesProcessed or creditOnProcessedMessageCount. No calling it will stop the dispatching of messages.

  • Make sure to call processed() only once. Whether the method is idempotent depends on the flow strategy implementation. Apart from the default one, the implementations the library provides does not make processed() idempotent.

Single Active Consumer

Warning
Single Active Consumer requires RabbitMQ 3.11 or more.

When the single active consumer feature is enabled for several consumer instances sharing the same stream and name, only one of these instances will be active at a time and so will receive messages. The other instances will be idle.

The single active consumer feature provides 2 benefits:

  • Messages are processed in order: there is only one consumer at a time.

  • Consumption continuity is maintained: a consumer from the group will take over if the active one stops or crashes.

A typical sequence of events would be the following:

  • Several instances of the same consuming application start up.

  • Each application instance registers a single active consumer. The consumer instances share the same name.

  • The broker makes the first registered consumer the active one.

  • The active consumer receives and processes messages, the other consumer instances remain idle.

  • The active consumer stops or crashes.

  • The broker chooses the consumer next in line to become the new active one.

  • The new active consumer starts receiving messages.

The next figures illustrates this mechanism. There can be only one active consumer:

The first registered consumer is active, the next ones are inactive
                    +----------+
             +------+ consumer + Active
             |      +----------+
             |
+--------+   |      +=---------+
+ stream +---+------+ consumer + Inactive
+--------+   |      +----------+
             |
             |      +=---------+
             +------+ consumer + Inactive
                    +----------+

The broker rolls over to another consumer when the active one stops or crashes:

When the active consumer stops, the next in line becomes active
                    +=---------+
                    | consumer + Closed
                    +----------+

+--------+          +----------+
+ stream +---+------+ consumer + Active
+--------+   |      +----------+
             |
             |      +=---------+
             +------+ consumer + Inactive
                    +----------+

Note there can be several groups of single active consumers on the same stream. What makes them different from each other is the name used by the consumers. The broker deals with them independently. Let’s use an example. Imagine 2 different app-1 and app-2 applications consuming from the same stream, with 3 identical instances each. Each instance registers 1 single active consumer with the name of the application. We end up with 3 app-1 consumers and 3 app-2 consumers, 1 active consumer in each group, so overall 6 consumers and 2 active ones, all of this on the same stream.

Let’s see now the API for single active consumer.

Enabling Single Active Consumer

Use the ConsumerBuilder#singleActiveConsumer() method to enable the feature:

Enabling single active consumer
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the consumer name (mandatory to enable single active consumer)

  2. Enable single active consumer

With the configuration above, the consumer will take part in the application-1 group on the my-stream stream. If the consumer instance is the first in a group, it will get messages as soon as there are some available. If it is not the first in the group, it will remain idle until it is its turn to be active (likely when all the instances registered before it are gone).

Offset Tracking

Single active consumer and offset tracking work together: when the active consumer goes away, another consumer takes over and resumes when the former active left off. Well, this is how things should work and luckily this is what happens when using server-side offset tracking. So as long as you use automatic offset tracking or manual offset tracking, the handoff between a former active consumer and the new one will go well.

The story is different is you are using an external store for offset tracking. In this case you need to tell the client library where to resume from and you can do this by implementing the ConsumerUpdateListener API.

Reacting to Consumer State Change

The broker notifies a consumer that becomes active before dispatching messages to it. The broker expects a response from the consumer and this response contains the offset the dispatching should start from. So this is the consumer’s responsibility to compute the appropriate offset, not the broker’s. The default behavior is to look up the last stored offset for the consumer on the stream. This works when server-side offset tracking is in use, but it does not when the application chose to use an external store for offset tracking. In this case, it is possible to use the ConsumerBuilder#consumerUpdateListener(ConsumerUpdateListener) method like demonstrated in the following snippet:

Fetching the last stored offset from an external store in the consumer update listener callback
link:../../test/java/com/rabbitmq/stream/docs/ConsumerUsage.java[role=include]
  1. Set the consumer name (mandatory to enable single active consumer)

  2. Enable single active consumer

  3. Disable server-side offset tracking

  4. Set the consumer update listener

  5. Fetch last offset from external store

  6. Return the offset to resume consuming from to the broker