22
22
import com .rabbitmq .stream .impl .Client .QueryOffsetResponse ;
23
23
import com .rabbitmq .stream .impl .StreamConsumerBuilder .TrackingConfiguration ;
24
24
import com .rabbitmq .stream .impl .StreamEnvironment .TrackingConsumerRegistration ;
25
+ import java .util .Objects ;
25
26
import java .util .concurrent .atomic .AtomicBoolean ;
27
+ import java .util .concurrent .atomic .AtomicLong ;
26
28
import java .util .function .LongConsumer ;
27
29
import org .slf4j .Logger ;
28
30
import org .slf4j .LoggerFactory ;
29
31
30
32
class StreamConsumer implements Consumer {
31
33
34
+ private static final AtomicLong ID_SEQUENCE = new AtomicLong (0 );
35
+
32
36
private static final Logger LOGGER = LoggerFactory .getLogger (StreamConsumer .class );
37
+ private final long id ;
33
38
private final Runnable closingTrackingCallback ;
34
39
private final AtomicBoolean closed = new AtomicBoolean (false );
35
40
private final String name ;
@@ -52,6 +57,7 @@ class StreamConsumer implements Consumer {
52
57
boolean lazyInit ,
53
58
SubscriptionListener subscriptionListener ) {
54
59
60
+ this .id = ID_SEQUENCE .getAndIncrement ();
55
61
try {
56
62
this .name = name ;
57
63
this .stream = stream ;
@@ -227,4 +233,26 @@ enum Status {
227
233
NOT_AVAILABLE ,
228
234
CLOSED
229
235
}
236
+
237
+ @ Override
238
+ public boolean equals (Object o ) {
239
+ if (this == o ) {
240
+ return true ;
241
+ }
242
+ if (o == null || getClass () != o .getClass ()) {
243
+ return false ;
244
+ }
245
+ StreamConsumer that = (StreamConsumer ) o ;
246
+ return id == that .id && stream .equals (that .stream );
247
+ }
248
+
249
+ @ Override
250
+ public int hashCode () {
251
+ return Objects .hash (id , stream );
252
+ }
253
+
254
+ @ Override
255
+ public String toString () {
256
+ return "StreamConsumer{" + "id=" + id + ", stream='" + stream + '\'' + '}' ;
257
+ }
230
258
}
0 commit comments