Skip to content

Commit 5918f91

Browse files
mp911dechristophstrobl
authored andcommitted
Introduce doCreate… methods without connection proxying.
doCreateMono and doCreateFlux now no longer proxy the connection for commands invoked directly from the ReactiveRedisTemplate implementations as proxying isn't necessary for simple command invocation. Original Pull Request: #2129
1 parent 24b0f61 commit 5918f91

9 files changed

+113
-81
lines changed

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import org.springframework.data.redis.domain.geo.GeoReference;
19-
import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference;
2018
import reactor.core.publisher.Flux;
2119
import reactor.core.publisher.Mono;
2220

@@ -28,6 +26,7 @@
2826
import java.util.stream.Collectors;
2927

3028
import org.reactivestreams.Publisher;
29+
3130
import org.springframework.data.geo.Circle;
3231
import org.springframework.data.geo.Distance;
3332
import org.springframework.data.geo.GeoResult;
@@ -37,6 +36,8 @@
3736
import org.springframework.data.redis.connection.RedisGeoCommands;
3837
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation;
3938
import org.springframework.data.redis.connection.RedisGeoCommands.GeoRadiusCommandArgs;
39+
import org.springframework.data.redis.domain.geo.GeoReference;
40+
import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference;
4041
import org.springframework.data.redis.domain.geo.GeoShape;
4142
import org.springframework.data.redis.serializer.RedisSerializationContext;
4243
import org.springframework.util.Assert;
@@ -320,7 +321,7 @@ public final Mono<Long> remove(K key, V... members) {
320321
Assert.notEmpty(members, "Members must not be null or empty!");
321322
Assert.noNullElements(members, "Members must not contain null elements!");
322323

323-
return template.createMono(connection -> Flux.fromArray(members) //
324+
return template.doCreateMono(connection -> Flux.fromArray(members) //
324325
.map(this::rawValue) //
325326
.collectList() //
326327
.flatMap(serialized -> connection.zSetCommands().zRem(rawKey(key), serialized)));
@@ -335,10 +336,10 @@ public Mono<Boolean> delete(K key) {
335336

336337
Assert.notNull(key, "Key must not be null!");
337338

338-
return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
339+
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
339340
}
340341

341-
/*
342+
/*
342343
* (non-Javadoc)
343344
* @see org.springframework.data.redis.core.ReactiveGeoOperations#search(K, RedisGeoCommands.GeoReference, GeoShape, GeoSearchCommandArgs)
344345
*/
@@ -350,11 +351,11 @@ public Flux<GeoResult<GeoLocation<V>>> search(K key, GeoReference<V> reference,
350351
Assert.notNull(reference, "GeoReference must not be null!");
351352
GeoReference<ByteBuffer> rawReference = getGeoReference(reference);
352353

353-
return template.createFlux(connection -> connection.geoCommands()
354+
return template.doCreateFlux(connection -> connection.geoCommands()
354355
.geoSearch(rawKey(key), rawReference, geoPredicate, args).map(this::readGeoResult));
355356
}
356357

357-
/*
358+
/*
358359
* (non-Javadoc)
359360
* @see org.springframework.data.redis.core.ReactiveGeoOperations#searchAndStore(K, K, RedisGeoCommands.GeoReference, GeoShape, GeoSearchStoreCommandArgs)
360361
*/
@@ -366,22 +367,22 @@ public Mono<Long> searchAndStore(K key, K destKey, GeoReference<V> reference,
366367
Assert.notNull(reference, "GeoReference must not be null!");
367368
GeoReference<ByteBuffer> rawReference = getGeoReference(reference);
368369

369-
return template.createMono(connection -> connection.geoCommands().geoSearchStore(rawKey(destKey), rawKey(key),
370+
return template.doCreateMono(connection -> connection.geoCommands().geoSearchStore(rawKey(destKey), rawKey(key),
370371
rawReference, geoPredicate, args));
371372
}
372373

373374
private <T> Mono<T> createMono(Function<ReactiveGeoCommands, Publisher<T>> function) {
374375

375376
Assert.notNull(function, "Function must not be null!");
376377

377-
return template.createMono(connection -> function.apply(connection.geoCommands()));
378+
return template.doCreateMono(connection -> function.apply(connection.geoCommands()));
378379
}
379380

380381
private <T> Flux<T> createFlux(Function<ReactiveGeoCommands, Publisher<T>> function) {
381382

382383
Assert.notNull(function, "Function must not be null!");
383384

384-
return template.createFlux(connection -> function.apply(connection.geoCommands()));
385+
return template.doCreateFlux(connection -> function.apply(connection.geoCommands()));
385386
}
386387

387388
@SuppressWarnings("unchecked")

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,20 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import org.springframework.data.redis.connection.convert.Converters;
1918
import reactor.core.publisher.Flux;
2019
import reactor.core.publisher.Mono;
2120

2221
import java.nio.ByteBuffer;
2322
import java.util.ArrayList;
2423
import java.util.Collection;
25-
import java.util.Collections;
2624
import java.util.List;
2725
import java.util.Map;
2826
import java.util.function.Function;
2927

3028
import org.reactivestreams.Publisher;
29+
3130
import org.springframework.data.redis.connection.ReactiveHashCommands;
31+
import org.springframework.data.redis.connection.convert.Converters;
3232
import org.springframework.data.redis.serializer.RedisSerializationContext;
3333
import org.springframework.util.Assert;
3434

@@ -125,7 +125,7 @@ public Mono<Long> increment(H key, HK hashKey, long delta) {
125125
Assert.notNull(key, "Key must not be null!");
126126
Assert.notNull(hashKey, "Hash key must not be null!");
127127

128-
return template.createMono(connection -> connection //
128+
return template.doCreateMono(connection -> connection //
129129
.numberCommands() //
130130
.hIncrBy(rawKey(key), rawHashKey(hashKey), delta));
131131
}
@@ -140,7 +140,7 @@ public Mono<Double> increment(H key, HK hashKey, double delta) {
140140
Assert.notNull(key, "Key must not be null!");
141141
Assert.notNull(hashKey, "Hash key must not be null!");
142142

143-
return template.createMono(connection -> connection //
143+
return template.doCreateMono(connection -> connection //
144144
.numberCommands() //
145145
.hIncrBy(rawKey(key), rawHashKey(hashKey), delta));
146146
}
@@ -154,7 +154,7 @@ public Mono<HK> randomKey(H key) {
154154

155155
Assert.notNull(key, "Key must not be null!");
156156

157-
return template.createMono(connection -> connection //
157+
return template.doCreateMono(connection -> connection //
158158
.hashCommands().hRandField(rawKey(key))).map(this::readHashKey);
159159
}
160160

@@ -167,7 +167,7 @@ public Mono<Map.Entry<HK, HV>> randomEntry(H key) {
167167

168168
Assert.notNull(key, "Key must not be null!");
169169

170-
return template.createMono(connection -> connection //
170+
return template.doCreateMono(connection -> connection //
171171
.hashCommands().hRandFieldWithValues(rawKey(key))).map(this::deserializeHashEntry);
172172
}
173173

@@ -180,7 +180,7 @@ public Flux<HK> randomKeys(H key, long count) {
180180

181181
Assert.notNull(key, "Key must not be null!");
182182

183-
return template.createFlux(connection -> connection //
183+
return template.doCreateFlux(connection -> connection //
184184
.hashCommands().hRandField(rawKey(key), count)).map(this::readHashKey);
185185
}
186186

@@ -193,7 +193,7 @@ public Flux<Map.Entry<HK, HV>> randomEntries(H key, long count) {
193193

194194
Assert.notNull(key, "Key must not be null!");
195195

196-
return template.createFlux(connection -> connection //
196+
return template.doCreateFlux(connection -> connection //
197197
.hashCommands().hRandFieldWithValues(rawKey(key), count)).map(this::deserializeHashEntry);
198198
}
199199

@@ -314,21 +314,21 @@ public Mono<Boolean> delete(H key) {
314314

315315
Assert.notNull(key, "Key must not be null!");
316316

317-
return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
317+
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
318318
}
319319

320320
private <T> Mono<T> createMono(Function<ReactiveHashCommands, Publisher<T>> function) {
321321

322322
Assert.notNull(function, "Function must not be null!");
323323

324-
return template.createMono(connection -> function.apply(connection.hashCommands()));
324+
return template.doCreateMono(connection -> function.apply(connection.hashCommands()));
325325
}
326326

327327
private <T> Flux<T> createFlux(Function<ReactiveHashCommands, Publisher<T>> function) {
328328

329329
Assert.notNull(function, "Function must not be null!");
330330

331-
return template.createFlux(connection -> function.apply(connection.hashCommands()));
331+
return template.doCreateFlux(connection -> function.apply(connection.hashCommands()));
332332
}
333333

334334
private ByteBuffer rawKey(H key) {

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,14 @@ public Mono<Boolean> delete(K key) {
103103

104104
Assert.notNull(key, "Key must not be null!");
105105

106-
return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
106+
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
107107
}
108108

109109
private <T> Mono<T> createMono(Function<ReactiveHyperLogLogCommands, Publisher<T>> function) {
110110

111111
Assert.notNull(function, "Function must not be null!");
112112

113-
return template.createMono(connection -> function.apply(connection.hyperLogLogCommands()));
113+
return template.doCreateMono(connection -> function.apply(connection.hyperLogLogCommands()));
114114
}
115115

116116
private ByteBuffer rawKey(K key) {

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public Mono<Long> rightPush(K key, V pivot, V value) {
200200
return createMono(connection -> connection.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value)));
201201
}
202202

203-
/*
203+
/*
204204
* (non-Javadoc)
205205
* @see org.springframework.data.redis.core.ReactiveListOperations#move(K, Direction, K, Direction)
206206
*/
@@ -216,7 +216,7 @@ public Mono<V> move(K sourceKey, Direction from, K destinationKey, Direction to)
216216
connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
217217
}
218218

219-
/*
219+
/*
220220
* (non-Javadoc)
221221
* @see org.springframework.data.redis.core.ReactiveListOperations#move(K, Direction, K, Direction, Duration)
222222
*/
@@ -378,21 +378,21 @@ public Mono<Boolean> delete(K key) {
378378

379379
Assert.notNull(key, "Key must not be null!");
380380

381-
return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
381+
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
382382
}
383383

384384
private <T> Mono<T> createMono(Function<ReactiveListCommands, Publisher<T>> function) {
385385

386386
Assert.notNull(function, "Function must not be null!");
387387

388-
return template.createMono(connection -> function.apply(connection.listCommands()));
388+
return template.doCreateMono(connection -> function.apply(connection.listCommands()));
389389
}
390390

391391
private <T> Flux<T> createFlux(Function<ReactiveListCommands, Publisher<T>> function) {
392392

393393
Assert.notNull(function, "Function must not be null!");
394394

395-
return template.createFlux(connection -> function.apply(connection.listCommands()));
395+
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
396396
}
397397

398398
private boolean isZeroOrGreater1Second(Duration timeout) {

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -508,21 +508,21 @@ public Mono<Boolean> delete(K key) {
508508

509509
Assert.notNull(key, "Key must not be null!");
510510

511-
return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
511+
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
512512
}
513513

514514
private <T> Mono<T> createMono(Function<ReactiveSetCommands, Publisher<T>> function) {
515515

516516
Assert.notNull(function, "Function must not be null!");
517517

518-
return template.createMono(connection -> function.apply(connection.setCommands()));
518+
return template.doCreateMono(connection -> function.apply(connection.setCommands()));
519519
}
520520

521521
private <T> Flux<T> createFlux(Function<ReactiveSetCommands, Publisher<T>> function) {
522522

523523
Assert.notNull(function, "Function must not be null!");
524524

525-
return template.createFlux(connection -> function.apply(connection.setCommands()));
525+
return template.doCreateFlux(connection -> function.apply(connection.setCommands()));
526526
}
527527

528528
private ByteBuffer rawKey(K key) {

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import org.springframework.data.redis.connection.convert.Converters;
1918
import reactor.core.publisher.Flux;
2019
import reactor.core.publisher.Mono;
2120

@@ -28,10 +27,12 @@
2827
import java.util.function.Function;
2928

3029
import org.reactivestreams.Publisher;
30+
3131
import org.springframework.core.convert.ConversionService;
3232
import org.springframework.data.domain.Range;
3333
import org.springframework.data.redis.connection.ReactiveStreamCommands;
3434
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
35+
import org.springframework.data.redis.connection.convert.Converters;
3536
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3637
import org.springframework.data.redis.connection.stream.Consumer;
3738
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -372,14 +373,14 @@ private <T> Mono<T> createMono(Function<ReactiveStreamCommands, Publisher<T>> fu
372373

373374
Assert.notNull(function, "Function must not be null!");
374375

375-
return template.createMono(connection -> function.apply(connection.streamCommands()));
376+
return template.doCreateMono(connection -> function.apply(connection.streamCommands()));
376377
}
377378

378379
private <T> Flux<T> createFlux(Function<ReactiveStreamCommands, Publisher<T>> function) {
379380

380381
Assert.notNull(function, "Function must not be null!");
381382

382-
return template.createFlux(connection -> function.apply(connection.streamCommands()));
383+
return template.doCreateFlux(connection -> function.apply(connection.streamCommands()));
383384
}
384385

385386
private ByteBuffer rawKey(K key) {

Diff for: src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public Mono<Long> increment(K key) {
244244

245245
Assert.notNull(key, "Key must not be null!");
246246

247-
return template.createMono(connection -> connection.numberCommands().incr(rawKey(key)));
247+
return template.doCreateMono(connection -> connection.numberCommands().incr(rawKey(key)));
248248
}
249249

250250
/* (non-Javadoc)
@@ -255,7 +255,7 @@ public Mono<Long> increment(K key, long delta) {
255255

256256
Assert.notNull(key, "Key must not be null!");
257257

258-
return template.createMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta));
258+
return template.doCreateMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta));
259259
}
260260

261261
/* (non-Javadoc)
@@ -266,7 +266,7 @@ public Mono<Double> increment(K key, double delta) {
266266

267267
Assert.notNull(key, "Key must not be null!");
268268

269-
return template.createMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta));
269+
return template.doCreateMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta));
270270
}
271271

272272
/* (non-Javadoc)
@@ -277,7 +277,7 @@ public Mono<Long> decrement(K key) {
277277

278278
Assert.notNull(key, "Key must not be null!");
279279

280-
return template.createMono(connection -> connection.numberCommands().decr(rawKey(key)));
280+
return template.doCreateMono(connection -> connection.numberCommands().decr(rawKey(key)));
281281
}
282282

283283
/* (non-Javadoc)
@@ -288,7 +288,7 @@ public Mono<Long> decrement(K key, long delta) {
288288

289289
Assert.notNull(key, "Key must not be null!");
290290

291-
return template.createMono(connection -> connection.numberCommands().decrBy(rawKey(key), delta));
291+
return template.doCreateMono(connection -> connection.numberCommands().decrBy(rawKey(key), delta));
292292
}
293293

294294
/* (non-Javadoc)
@@ -380,14 +380,14 @@ public Mono<Boolean> delete(K key) {
380380

381381
Assert.notNull(key, "Key must not be null!");
382382

383-
return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
383+
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
384384
}
385385

386386
private <T> Mono<T> createMono(Function<ReactiveStringCommands, Publisher<T>> function) {
387387

388388
Assert.notNull(function, "Function must not be null!");
389389

390-
return template.createMono(connection -> function.apply(connection.stringCommands()));
390+
return template.doCreateMono(connection -> function.apply(connection.stringCommands()));
391391
}
392392

393393
private ByteBuffer rawKey(K key) {

0 commit comments

Comments
 (0)