@@ -82,12 +82,6 @@ public class DefaultReactiveStreamOperationsIntegrationTests<K, HK, HV> {
82
82
return ReactiveOperationsTestParams .testParams ();
83
83
}
84
84
85
- /**
86
- * @param redisTemplate
87
- * @param keyFactory
88
- * @param valueFactory
89
- * @param label parameterized test label, no further use besides that.
90
- */
91
85
public DefaultReactiveStreamOperationsIntegrationTests (Fixture <K , HV > fixture ) {
92
86
93
87
this .serializer = fixture .getSerializer ();
@@ -208,27 +202,25 @@ void addMaxLenShouldLimitMessagesSize() {
208
202
209
203
RecordId messageId = streamOperations .add (key , Collections .singletonMap (hashKey , newValue ), options ).block ();
210
204
211
- streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create )
212
- .consumeNextWith (actual -> {
205
+ streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create ).consumeNextWith (actual -> {
213
206
214
- assertThat (actual .getId ()).isEqualTo (messageId );
215
- assertThat (actual .getStream ()).isEqualTo (key );
216
- assertThat (actual ).hasSize (1 );
207
+ assertThat (actual .getId ()).isEqualTo (messageId );
208
+ assertThat (actual .getStream ()).isEqualTo (key );
209
+ assertThat (actual ).hasSize (1 );
217
210
218
- if (!(key instanceof byte [] || value instanceof byte [])) {
219
- assertThat (actual .getValue ()).containsEntry (hashKey , newValue );
220
- }
211
+ if (!(key instanceof byte [] || value instanceof byte [])) {
212
+ assertThat (actual .getValue ()).containsEntry (hashKey , newValue );
213
+ }
221
214
222
- })
223
- .verifyComplete ();
215
+ }).verifyComplete ();
224
216
}
225
217
226
218
@ ParameterizedRedisTest // GH-2915
227
219
void addMaxLenShouldLimitSimpleMessagesSize () {
228
220
229
221
assumeTrue (!(serializer instanceof Jackson2JsonRedisSerializer )
230
- && !(serializer instanceof GenericJackson2JsonRedisSerializer )
231
- && !(serializer instanceof JdkSerializationRedisSerializer ) && !(serializer instanceof OxmSerializer ));
222
+ && !(serializer instanceof GenericJackson2JsonRedisSerializer )
223
+ && !(serializer instanceof JdkSerializationRedisSerializer ) && !(serializer instanceof OxmSerializer ));
232
224
233
225
K key = keyFactory .instance ();
234
226
HV value = valueFactory .instance ();
@@ -241,31 +233,29 @@ void addMaxLenShouldLimitSimpleMessagesSize() {
241
233
RecordId messageId = streamOperations .add (StreamRecords .objectBacked (newValue ).withStreamKey (key ), options ).block ();
242
234
243
235
streamOperations .range ((Class <HV >) value .getClass (), key , Range .unbounded ()).as (StepVerifier ::create )
244
- .consumeNextWith (actual -> {
236
+ .consumeNextWith (actual -> {
245
237
246
- assertThat (actual .getId ()).isEqualTo (messageId );
247
- assertThat (actual .getStream ()).isEqualTo (key );
248
- assertThat (actual .getValue ()).isEqualTo (newValue );
238
+ assertThat (actual .getId ()).isEqualTo (messageId );
239
+ assertThat (actual .getStream ()).isEqualTo (key );
240
+ assertThat (actual .getValue ()).isEqualTo (newValue );
249
241
250
- })
251
- .expectNextCount (0 )
252
- .verifyComplete ();
242
+ }).expectNextCount (0 ).verifyComplete ();
253
243
}
254
244
255
245
@ ParameterizedRedisTest // GH-2915
256
246
void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize () {
257
247
258
248
assumeTrue (!(serializer instanceof Jackson2JsonRedisSerializer )
259
- && !(serializer instanceof GenericJackson2JsonRedisSerializer ));
249
+ && !(serializer instanceof GenericJackson2JsonRedisSerializer ));
260
250
261
251
SerializationPair <K > keySerializer = redisTemplate .getSerializationContext ().getKeySerializationPair ();
262
252
263
253
RedisSerializationContext <K , String > serializationContext = RedisSerializationContext
264
- .<K , String > newSerializationContext (StringRedisSerializer .UTF_8 ).key (keySerializer )
265
- .hashValue (SerializationPair .raw ()).hashKey (SerializationPair .raw ()).build ();
254
+ .<K , String > newSerializationContext (StringRedisSerializer .UTF_8 ).key (keySerializer )
255
+ .hashValue (SerializationPair .raw ()).hashKey (SerializationPair .raw ()).build ();
266
256
267
257
ReactiveRedisTemplate <K , String > raw = new ReactiveRedisTemplate <>(redisTemplate .getConnectionFactory (),
268
- serializationContext );
258
+ serializationContext );
269
259
270
260
K key = keyFactory .instance ();
271
261
Person value = new PersonObjectFactory ().instance ();
@@ -275,18 +265,17 @@ void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() {
275
265
Person newValue = new PersonObjectFactory ().instance ();
276
266
XAddOptions options = XAddOptions .maxlen (1 ).approximateTrimming (false );
277
267
278
- RecordId messageId = raw .opsForStream ().add (StreamRecords .objectBacked (newValue ).withStreamKey (key ), options ).block ();
268
+ RecordId messageId = raw .opsForStream ().add (StreamRecords .objectBacked (newValue ).withStreamKey (key ), options )
269
+ .block ();
279
270
280
271
raw .opsForStream ().range ((Class <HV >) value .getClass (), key , Range .unbounded ()).as (StepVerifier ::create )
281
- .consumeNextWith (it -> {
272
+ .consumeNextWith (it -> {
282
273
283
- assertThat (it .getId ()).isEqualTo (messageId );
284
- assertThat (it .getStream ()).isEqualTo (key );
285
- assertThat (it .getValue ()).isEqualTo (newValue );
274
+ assertThat (it .getId ()).isEqualTo (messageId );
275
+ assertThat (it .getStream ()).isEqualTo (key );
276
+ assertThat (it .getValue ()).isEqualTo (newValue );
286
277
287
- })
288
- .expectNextCount (0 )
289
- .verifyComplete ();
278
+ }).expectNextCount (0 ).verifyComplete ();
290
279
}
291
280
292
281
@ ParameterizedRedisTest // GH-2915
@@ -303,17 +292,13 @@ void addMinIdShouldEvictLowerIdMessages() {
303
292
304
293
RecordId messageId2 = streamOperations .add (key , Collections .singletonMap (hashKey , value ), options ).block ();
305
294
306
- streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create )
307
- .consumeNextWith (actual -> {
308
- assertThat (actual .getId ()).isEqualTo (messageId1 );
309
- assertThat (actual .getStream ()).isEqualTo (key );
310
- })
311
- .consumeNextWith (actual -> {
312
- assertThat (actual .getId ()).isEqualTo (messageId2 );
313
- assertThat (actual .getStream ()).isEqualTo (key );
314
- })
315
- .expectNextCount (0 )
316
- .verifyComplete ();
295
+ streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create ).consumeNextWith (actual -> {
296
+ assertThat (actual .getId ()).isEqualTo (messageId1 );
297
+ assertThat (actual .getStream ()).isEqualTo (key );
298
+ }).consumeNextWith (actual -> {
299
+ assertThat (actual .getId ()).isEqualTo (messageId2 );
300
+ assertThat (actual .getStream ()).isEqualTo (key );
301
+ }).expectNextCount (0 ).verifyComplete ();
317
302
}
318
303
319
304
@ ParameterizedRedisTest // GH-2915
@@ -327,13 +312,9 @@ void addMakeNoStreamShouldNotCreateStreamWhenNoStreamExists() {
327
312
328
313
streamOperations .add (key , Collections .singletonMap (hashKey , value ), options ).block ();
329
314
330
- streamOperations .size (key ).as (StepVerifier ::create )
331
- .expectNext (0L )
332
- .verifyComplete ();
315
+ streamOperations .size (key ).as (StepVerifier ::create ).expectNext (0L ).verifyComplete ();
333
316
334
- streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create )
335
- .expectNextCount (0L )
336
- .verifyComplete ();
317
+ streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create ).expectNextCount (0L ).verifyComplete ();
337
318
}
338
319
339
320
@ ParameterizedRedisTest // GH-2915
@@ -349,13 +330,9 @@ void addMakeNoStreamShouldCreateStreamWhenStreamExists() {
349
330
350
331
streamOperations .add (key , Collections .singletonMap (hashKey , value ), options ).block ();
351
332
352
- streamOperations .size (key ).as (StepVerifier ::create )
353
- .expectNext (2L )
354
- .verifyComplete ();
333
+ streamOperations .size (key ).as (StepVerifier ::create ).expectNext (2L ).verifyComplete ();
355
334
356
- streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create )
357
- .expectNextCount (2L )
358
- .verifyComplete ();
335
+ streamOperations .range (key , Range .unbounded ()).as (StepVerifier ::create ).expectNextCount (2L ).verifyComplete ();
359
336
}
360
337
361
338
@ ParameterizedRedisTest // DATAREDIS-864
@@ -525,7 +502,6 @@ void pendingShouldReadMessageDetails() {
525
502
assertThat (pending .get (0 ).getConsumerName ()).isEqualTo ("my-consumer" );
526
503
assertThat (pending .get (0 ).getTotalDeliveryCount ()).isOne ();
527
504
}).verifyComplete ();
528
-
529
505
}
530
506
531
507
@ ParameterizedRedisTest // GH-2465
@@ -550,6 +526,5 @@ void claimShouldReadMessageDetails() {
550
526
assertThat (claimed .getValue ()).isEqualTo (content );
551
527
assertThat (claimed .getId ()).isEqualTo (messageId );
552
528
}).verifyComplete ();
553
-
554
529
}
555
530
}
0 commit comments