Skip to content

Commit ec1c2ea

Browse files
committed
Expose stream name in SubscriptionListener.SubscriptionContext
Fixes #540
1 parent e7769b4 commit ec1c2ea

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

src/main/java/com/rabbitmq/stream/SubscriptionListener.java

+7
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,12 @@ interface SubscriptionContext {
6464
* @param offsetSpecification the offset specification to use
6565
*/
6666
void offsetSpecification(OffsetSpecification offsetSpecification);
67+
68+
/**
69+
* The stream involved.
70+
*
71+
* @return the stream
72+
*/
73+
String stream();
6774
}
6875
}

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ synchronized void add(
10211021
// TODO consider using/emulating ConsumerUpdateListener, to have only one API, not 2
10221022
// even when the consumer is not a SAC.
10231023
SubscriptionContext subscriptionContext =
1024-
new DefaultSubscriptionContext(offsetSpecification);
1024+
new DefaultSubscriptionContext(offsetSpecification, subscriptionTracker.stream);
10251025
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
10261026
LOGGER.info(
10271027
"Computed offset specification {}, offset specification used after subscription listener {}",
@@ -1217,9 +1217,12 @@ public int hashCode() {
12171217
private static final class DefaultSubscriptionContext implements SubscriptionContext {
12181218

12191219
private volatile OffsetSpecification offsetSpecification;
1220+
private final String name;
12201221

1221-
private DefaultSubscriptionContext(OffsetSpecification computedOffsetSpecification) {
1222+
private DefaultSubscriptionContext(
1223+
OffsetSpecification computedOffsetSpecification, String name) {
12221224
this.offsetSpecification = computedOffsetSpecification;
1225+
this.name = name;
12231226
}
12241227

12251228
@Override
@@ -1232,6 +1235,11 @@ public void offsetSpecification(OffsetSpecification offsetSpecification) {
12321235
this.offsetSpecification = offsetSpecification;
12331236
}
12341237

1238+
@Override
1239+
public String stream() {
1240+
return this.name;
1241+
}
1242+
12351243
@Override
12361244
public String toString() {
12371245
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';

0 commit comments

Comments
 (0)