Skip to content

Commit 54b16fb

Browse files
committed
Add lag and entries read to StreamInfo
1 parent 8b5f29e commit 54b16fb

File tree

3 files changed

+28
-0
lines changed

3 files changed

+28
-0
lines changed

src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java

+20
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,26 @@ public Long pendingCount() {
361361
public String lastDeliveredId() {
362362
return getRequired("last-delivered-id", String.class);
363363
}
364+
365+
/**
366+
* The logical "read counter" of the last entry delivered to the group's consumers. Corresponds to {@literal entries-read}.
367+
*
368+
* @return
369+
*/
370+
public Long entriesRead() {
371+
return getRequired("entries-read", Long.class);
372+
}
373+
374+
/**
375+
* The number of entries in the stream that are still waiting to be delivered to the group's consumers,
376+
* or a NULL when that number can't be determined. Corresponds to {@literal lag}.
377+
*
378+
* @return
379+
*/
380+
@Nullable
381+
public Long lag() {
382+
return get("entries-read", Long.class);
383+
}
364384
}
365385

366386
public static class XInfoConsumers implements Streamable<XInfoConsumer> {

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -4017,6 +4017,8 @@ public void xinfoGroups() {
40174017
assertThat(info.get(0).groupName()).isEqualTo("my-group");
40184018
assertThat(info.get(0).consumerCount()).isEqualTo(1L);
40194019
assertThat(info.get(0).pendingCount()).isEqualTo(2L);
4020+
assertThat(info.get(0).lag()).isEqualTo(0L);
4021+
assertThat(info.get(0).entriesRead()).isEqualTo(2L);
40204022
assertThat(info.get(0).lastDeliveredId()).isEqualTo(lastRecord.getValue());
40214023
}
40224024

@@ -4055,6 +4057,8 @@ public void xinfoGroupsNoConsumer() {
40554057
assertThat(info.get(0).groupName()).isEqualTo("my-group");
40564058
assertThat(info.get(0).consumerCount()).isZero();
40574059
assertThat(info.get(0).pendingCount()).isZero();
4060+
assertThat(info.get(0).lag()).isZero();
4061+
assertThat(info.get(0).entriesRead()).isZero();
40584062
assertThat(info.get(0).lastDeliveredId()).isEqualTo("0-0");
40594063
}
40604064

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,8 @@ void xinfoGroups() {
429429
assertThat(info.groupName()).isEqualTo("my-group");
430430
assertThat(info.consumerCount()).isEqualTo(1L);
431431
assertThat(info.pendingCount()).isEqualTo(2L);
432+
assertThat(info.lag()).isZero();
433+
assertThat(info.entriesRead()).isEqualTo(2L);
432434
assertThat(info.lastDeliveredId()).isEqualTo(lastRecord);
433435
}).verifyComplete();
434436
}
@@ -455,6 +457,8 @@ void xinfoGroupsNoConsumer() {
455457
assertThat(info.groupName()).isEqualTo("my-group");
456458
assertThat(info.consumerCount()).isZero();
457459
assertThat(info.pendingCount()).isZero();
460+
assertThat(info.entriesRead()).isZero();
461+
assertThat(info.lag()).isZero();
458462
assertThat(info.lastDeliveredId()).isEqualTo("0-0");
459463
}).verifyComplete();
460464
}

0 commit comments

Comments
 (0)