35
35
import org .springframework .data .redis .connection .jedis .extension .JedisConnectionFactoryExtension ;
36
36
import org .springframework .data .redis .connection .lettuce .LettuceConnectionFactory ;
37
37
import org .springframework .data .redis .connection .lettuce .extension .LettuceConnectionFactoryExtension ;
38
- import org .springframework .data .redis .connection .stream .*;
38
+ import org .springframework .data .redis .connection .stream .Consumer ;
39
+ import org .springframework .data .redis .connection .stream .MapRecord ;
40
+ import org .springframework .data .redis .connection .stream .ObjectRecord ;
41
+ import org .springframework .data .redis .connection .stream .PendingMessages ;
42
+ import org .springframework .data .redis .connection .stream .PendingMessagesSummary ;
43
+ import org .springframework .data .redis .connection .stream .ReadOffset ;
44
+ import org .springframework .data .redis .connection .stream .RecordId ;
45
+ import org .springframework .data .redis .connection .stream .StreamOffset ;
46
+ import org .springframework .data .redis .connection .stream .StreamReadOptions ;
47
+ import org .springframework .data .redis .connection .stream .StreamRecords ;
39
48
import org .springframework .data .redis .test .condition .EnabledOnCommand ;
40
49
import org .springframework .data .redis .test .condition .EnabledOnRedisDriver ;
41
50
import org .springframework .data .redis .test .condition .EnabledOnRedisVersion ;
@@ -65,7 +74,7 @@ public class DefaultStreamOperationsIntegrationTests<K, HK, HV> {
65
74
private final StreamOperations <K , HK , HV > streamOps ;
66
75
67
76
public DefaultStreamOperationsIntegrationTests (RedisTemplate <K , ?> redisTemplate , ObjectFactory <K > keyFactory ,
68
- ObjectFactory <?> objectFactory ) {
77
+ ObjectFactory <?> objectFactory ) {
69
78
70
79
this .redisTemplate = redisTemplate ;
71
80
this .connectionFactory = redisTemplate .getRequiredConnectionFactory ();
@@ -81,15 +90,15 @@ public static Collection<Object[]> testParams() {
81
90
params .addAll (AbstractOperationsTestParams
82
91
.testParams (JedisConnectionFactoryExtension .getConnectionFactory (RedisStanalone .class )));
83
92
84
- if (RedisDetector .isClusterAvailable ()) {
93
+ if (RedisDetector .isClusterAvailable ()) {
85
94
params .addAll (AbstractOperationsTestParams
86
95
.testParams (JedisConnectionFactoryExtension .getConnectionFactory (RedisCluster .class )));
87
96
}
88
97
89
98
params .addAll (AbstractOperationsTestParams
90
99
.testParams (LettuceConnectionFactoryExtension .getConnectionFactory (RedisStanalone .class )));
91
100
92
- if (RedisDetector .isClusterAvailable ()) {
101
+ if (RedisDetector .isClusterAvailable ()) {
93
102
params .addAll (AbstractOperationsTestParams
94
103
.testParams (LettuceConnectionFactoryExtension .getConnectionFactory (RedisCluster .class )));
95
104
}
@@ -305,7 +314,8 @@ void readShouldReadSimpleMessage() {
305
314
RecordId messageId1 = streamOps .add (StreamRecords .objectBacked (value ).withStreamKey (key ));
306
315
streamOps .add (StreamRecords .objectBacked (value ).withStreamKey (key ));
307
316
308
- List <ObjectRecord <K , HV >> messages = streamOps .read ((Class <HV >) value .getClass (), StreamOffset .create (key , ReadOffset .from ("0-0" )));
317
+ List <ObjectRecord <K , HV >> messages = streamOps .read ((Class <HV >) value .getClass (),
318
+ StreamOffset .create (key , ReadOffset .from ("0-0" )));
309
319
310
320
assertThat (messages ).hasSize (2 );
311
321
@@ -384,8 +394,7 @@ void pendingShouldReadMessageSummary() {
384
394
RecordId messageId = streamOps .add (key , Collections .singletonMap (hashKey , value ));
385
395
streamOps .createGroup (key , ReadOffset .from ("0-0" ), "my-group" );
386
396
387
- streamOps .read (Consumer .from ("my-group" , "my-consumer" ),
388
- StreamOffset .create (key , ReadOffset .lastConsumed ()));
397
+ streamOps .read (Consumer .from ("my-group" , "my-consumer" ), StreamOffset .create (key , ReadOffset .lastConsumed ()));
389
398
390
399
PendingMessagesSummary pending = streamOps .pending (key , "my-group" );
391
400
@@ -403,8 +412,7 @@ void pendingShouldReadMessageDetails() {
403
412
RecordId messageId = streamOps .add (key , Collections .singletonMap (hashKey , value ));
404
413
streamOps .createGroup (key , ReadOffset .from ("0-0" ), "my-group" );
405
414
406
- streamOps .read (Consumer .from ("my-group" , "my-consumer" ),
407
- StreamOffset .create (key , ReadOffset .lastConsumed ()));
415
+ streamOps .read (Consumer .from ("my-group" , "my-consumer" ), StreamOffset .create (key , ReadOffset .lastConsumed ()));
408
416
409
417
PendingMessages pending = streamOps .pending (key , "my-group" , Range .unbounded (), 10L );
410
418
0 commit comments