Skip to content

Support super stream creation/deletion #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
cache: 'maven'
- name: Start broker
run: ci/start-broker.sh
env:
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-frames-otp-max-bazel'
- name: Test
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
Expand Down
45 changes: 41 additions & 4 deletions src/docs/asciidoc/super-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,51 @@ When a super stream is in use, the stream Java client queries this information t
From the application code point of view, using a super stream is mostly configuration-based.
Some logic must also be provided to extract routing information from messages.

==== Super Stream Creation
==== Super Stream Creation and Deletion

It is possible to create the topology of a super stream with any AMQP 0.9.1 library or with the https://www.rabbitmq.com/management.html[management plugin], but the `rabbitmq-streams add_super_stream` command is a handy shortcut.
Here is how to create an invoices super stream with 3 partitions:
It is possible to manage super streams with

* the stream Java client, by using `Environment#streamCreator()` and `Environment#deleteSuperStream(String)`
* the `add_super_stream` and `delete_super_stream` commands in `rabbitmq-streams` (CLI)
* any AMQP 0.9.1 client library
* the https://www.rabbitmq.com/management.html[management plugin]

The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).

===== With the Client Library

Here is how to create an `invoices` super stream with 5 partitions:

.Creating a super stream by specifying the number of partitions
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions]
--------

The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`.
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.

It is also possible to specify binding keys when creating a super stream:

.Creating a super stream by specifying the binding keys
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys]
--------

The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case.

Using one type of topology or the other depends on the use cases, especially how messages are processed.
See the next sections on publishing and consuming to find out more.

===== With the CLI

Here is how to create an `invoices` super stream with 5 partitions:

.Creating a super stream from the CLI
----
rabbitmq-streams add_super_stream invoices --partitions 3
rabbitmq-streams add_super_stream invoices --partitions 5
----

Use `rabbitmq-streams add_super_stream --help` to learn more about the command.
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/rabbitmq/stream/Constants.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down Expand Up @@ -72,6 +72,8 @@ public final class Constants {
public static final short COMMAND_CONSUMER_UPDATE = 26;
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
public static final short COMMAND_STREAM_STATS = 28;
public static final short COMMAND_CREATE_SUPER_STREAM = 29;
public static final short COMMAND_DELETE_SUPER_STREAM = 30;

public static final short VERSION_1 = 1;
public static final short VERSION_2 = 2;
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/stream/Environment.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down Expand Up @@ -58,9 +58,19 @@ static EnvironmentBuilder builder() {
* Delete a stream
*
* @param stream the stream to delete
* @since 0.15.0
*/
void deleteStream(String stream);

/**
* Delete a super stream.
*
* <p>Requires RabbitMQ 3.13.0 or more.
*
* @param superStream the super stream to delete
*/
void deleteSuperStream(String superStream);

/**
* Query statistics on a stream.
*
Expand Down
60 changes: 58 additions & 2 deletions src/main/java/com/rabbitmq/stream/StreamCreator.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand All @@ -23,13 +23,24 @@ public interface StreamCreator {
ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB");

/**
* The name of the stream
* The name of the stream.
*
* <p>Alias for {@link #name(String)}.
*
* @param stream
* @return this creator instance
*/
StreamCreator stream(String stream);

/**
* The name of the (super) stream.
*
* @param name
* @return this creator instance
* @since 0.15.0
*/
StreamCreator name(String name);

/**
* The maximum size of the stream before it gets truncated.
*
Expand Down Expand Up @@ -80,6 +91,16 @@ public interface StreamCreator {
*/
StreamCreator filterSize(int size);

/**
* Configure the super stream to create.
*
* <p>Requires RabbitMQ 3.13.0 or more.
*
* @return the super stream configuration
* @since 0.15.0
*/
SuperStreamConfiguration superStream();

/**
* Create the stream.
*
Expand Down Expand Up @@ -142,4 +163,39 @@ public String value() {
return this.value;
}
}

/**
* Super stream configuration.
*
* @since 0.15.0
*/
interface SuperStreamConfiguration {

/**
* The number of partitions of the super stream.
*
* <p>Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3.
*
* @param partitions
* @return this super stream configuration instance
*/
SuperStreamConfiguration partitions(int partitions);

/**
* The binding keys to use when declaring the super stream partitions.
*
* <p>Mutually exclusive with {@link #partitions(int)}. Default is null.
*
* @param bindingKeys
* @return this super stream configuration instance
*/
SuperStreamConfiguration bindingKeys(String... bindingKeys);

/**
* Go back to the creator.
*
* @return the stream creator
*/
StreamCreator creator();
}
}
Loading