24
24
import java .util .concurrent .CopyOnWriteArrayList ;
25
25
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
26
26
import java .util .function .BiFunction ;
27
- import java .util .stream .Collectors ;
28
27
29
28
import org .springframework .data .redis .connection .Message ;
30
29
import org .springframework .data .redis .connection .MessageListener ;
34
33
35
34
/**
36
35
* Synchronizing {@link MessageListener} and {@link SubscriptionListener} that allows notifying a {@link Runnable}
37
- * (through {@link SubscriptionSynchronizion }) upon completing subscriptions to channels or patterns.
36
+ * (through {@link SubscriptionSynchronization }) upon completing subscriptions to channels or patterns.
38
37
*
39
38
* @author Mark Paluch
40
39
* @since 3.0
@@ -43,19 +42,19 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
43
42
44
43
private final MessageListener messageListener ;
45
44
private final SubscriptionListener subscriptionListener ;
46
- private final List <SubscriptionSynchronizion > synchronizations = new CopyOnWriteArrayList <>();
45
+ private final List <SubscriptionSynchronization > synchronizations = new CopyOnWriteArrayList <>();
47
46
48
47
public SynchronizingMessageListener (MessageListener messageListener , SubscriptionListener subscriptionListener ) {
49
48
this .messageListener = messageListener ;
50
49
this .subscriptionListener = subscriptionListener ;
51
50
}
52
51
53
52
/**
54
- * Register a {@link SubscriptionSynchronizion }.
53
+ * Register a {@link SubscriptionSynchronization }.
55
54
*
56
55
* @param synchronization must not be {@literal null}.
57
56
*/
58
- public void addSynchronization (SubscriptionSynchronizion synchronization ) {
57
+ public void addSynchronization (SubscriptionSynchronization synchronization ) {
59
58
this .synchronizations .add (synchronization );
60
59
}
61
60
@@ -68,7 +67,7 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
68
67
public void onChannelSubscribed (byte [] channel , long count ) {
69
68
70
69
subscriptionListener .onChannelSubscribed (channel , count );
71
- handleSubscription (channel , SubscriptionSynchronizion ::onChannelSubscribed );
70
+ handleSubscription (channel , SubscriptionSynchronization ::onChannelSubscribed );
72
71
}
73
72
74
73
@ Override
@@ -80,7 +79,7 @@ public void onChannelUnsubscribed(byte[] channel, long count) {
80
79
public void onPatternSubscribed (byte [] pattern , long count ) {
81
80
82
81
subscriptionListener .onPatternSubscribed (pattern , count );
83
- handleSubscription (pattern , SubscriptionSynchronizion ::onPatternSubscribed );
82
+ handleSubscription (pattern , SubscriptionSynchronization ::onPatternSubscribed );
84
83
}
85
84
86
85
@ Override
@@ -89,16 +88,16 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
89
88
}
90
89
91
90
void handleSubscription (byte [] topic ,
92
- BiFunction <SubscriptionSynchronizion , ByteArrayWrapper , Boolean > synchronizerCallback ) {
91
+ BiFunction <SubscriptionSynchronization , ByteArrayWrapper , Boolean > synchronizerCallback ) {
93
92
94
93
if (synchronizations .isEmpty ()) {
95
94
return ;
96
95
}
97
96
98
97
ByteArrayWrapper binaryChannel = new ByteArrayWrapper (topic );
99
- List <SubscriptionSynchronizion > finalized = new ArrayList <>(synchronizations .size ());
98
+ List <SubscriptionSynchronization > finalized = new ArrayList <>(synchronizations .size ());
100
99
101
- for (SubscriptionSynchronizion synchronizer : synchronizations ) {
100
+ for (SubscriptionSynchronization synchronizer : synchronizations ) {
102
101
103
102
if (synchronizerCallback .apply (synchronizer , binaryChannel )) {
104
103
finalized .add (synchronizer );
@@ -111,37 +110,38 @@ void handleSubscription(byte[] topic,
111
110
/**
112
111
* Synchronization to await subscriptions for channels and patterns.
113
112
*/
114
- static class SubscriptionSynchronizion {
113
+ static class SubscriptionSynchronization {
115
114
116
- private static final AtomicIntegerFieldUpdater <SubscriptionSynchronizion > DONE = AtomicIntegerFieldUpdater
117
- .newUpdater (SubscriptionSynchronizion .class , "done" );
115
+ private static final AtomicIntegerFieldUpdater <SubscriptionSynchronization > DONE = AtomicIntegerFieldUpdater
116
+ .newUpdater (SubscriptionSynchronization .class , "done" );
118
117
119
118
private static final int NOT_DONE = 0 ;
120
119
private static final int DONE_DONE = 0 ;
121
120
122
121
private volatile int done = NOT_DONE ;
123
- private final Set <ByteArrayWrapper > remainingPatterns ;
124
- private final Set <ByteArrayWrapper > remainingChannels ;
125
122
126
123
private final Runnable doneCallback ;
127
124
128
- public SubscriptionSynchronizion (Collection <byte []> remainingPatterns , Collection <byte []> remainingChannels ,
125
+ private final Set <ByteArrayWrapper > remainingPatterns ;
126
+ private final Set <ByteArrayWrapper > remainingChannels ;
127
+
128
+ public SubscriptionSynchronization (Collection <byte []> remainingPatterns , Collection <byte []> remainingChannels ,
129
129
Runnable doneCallback ) {
130
130
131
131
if (remainingPatterns .isEmpty ()) {
132
132
this .remainingPatterns = Collections .emptySet ();
133
133
} else {
134
134
this .remainingPatterns = ConcurrentHashMap .newKeySet (remainingPatterns .size ());
135
135
this .remainingPatterns
136
- .addAll (remainingPatterns .stream ().map (ByteArrayWrapper ::new ).collect ( Collectors . toList () ));
136
+ .addAll (remainingPatterns .stream ().map (ByteArrayWrapper ::new ).toList ());
137
137
}
138
138
139
139
if (remainingChannels .isEmpty ()) {
140
140
this .remainingChannels = Collections .emptySet ();
141
141
} else {
142
142
this .remainingChannels = ConcurrentHashMap .newKeySet (remainingChannels .size ());
143
143
this .remainingChannels
144
- .addAll (remainingChannels .stream ().map (ByteArrayWrapper ::new ).collect ( Collectors . toList () ));
144
+ .addAll (remainingChannels .stream ().map (ByteArrayWrapper ::new ).toList ());
145
145
}
146
146
147
147
this .doneCallback = doneCallback ;
0 commit comments