Skip to content

Commit d0c60e6

Browse files
committed
- Release blocked sender on view change (https://issues.redhat.com/browse/JGRP-2853)
- Unit test: ReliableMulticastBlockTest
1 parent 0f2dc37 commit d0c60e6

File tree

3 files changed

+147
-1
lines changed

3 files changed

+147
-1
lines changed

src/org/jgroups/protocols/NAKACK4.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,23 @@ public void changeCapacity(int new_capacity) {
132132
@Override
133133
protected void adjustReceivers(List<Address> members) {
134134
super.adjustReceivers(members);
135+
long old_min=ack_table.min();
135136
ack_table.adjust(members);
137+
long new_min=ack_table.min();
138+
if(new_min > old_min) {
139+
Buffer<Message> buf=sendBuf();
140+
if(buf == null)
141+
log.warn("%s: local send buffer is null", local_addr);
142+
else
143+
buf.purge(new_min); // unblocks senders waiting for space to become available
144+
}
145+
}
146+
147+
@Override
148+
protected void reset() {
149+
FixedBuffer<Message> buf=(FixedBuffer<Message>)sendBuf();
150+
Util.close(buf);
151+
super.reset();
136152
}
137153

138154
@Override

src/org/jgroups/util/FixedBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public boolean add(long seqno, T element, Predicate<T> remove_filter, Options op
8080
if(dist <= 0)
8181
return false;
8282

83-
if(dist > capacity() && (!block || !block(seqno))) { // seqno too big
83+
if(dist > capacity() && (!block || !block(seqno))) { // no space for message
8484
num_dropped_msgs.increment();
8585
return false;
8686
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package org.jgroups.tests;
2+
3+
import org.jgroups.Global;
4+
import org.jgroups.JChannel;
5+
import org.jgroups.View;
6+
import org.jgroups.protocols.DISCARD;
7+
import org.jgroups.protocols.NAKACK4;
8+
import org.jgroups.protocols.ReliableMulticast;
9+
import org.jgroups.protocols.TP;
10+
import org.jgroups.protocols.pbcast.GMS;
11+
import org.jgroups.stack.ProtocolStack;
12+
import org.jgroups.util.MyReceiver;
13+
import org.jgroups.util.Util;
14+
import org.testng.annotations.AfterMethod;
15+
import org.testng.annotations.BeforeMethod;
16+
import org.testng.annotations.Test;
17+
18+
import java.util.List;
19+
import java.util.stream.Collectors;
20+
import java.util.stream.IntStream;
21+
import java.util.stream.Stream;
22+
23+
/**
24+
* Tests {@link org.jgroups.protocols.NAKACK4} and other subclasses of {@link org.jgroups.protocols.ReliableMulticast}
25+
* for (sender-)blocking operations
26+
* @author Bela Ban
27+
* @since 5.4
28+
*/
29+
@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
30+
public class ReliableMulticastBlockTest {
31+
protected static final int NUM=4;
32+
protected JChannel[] channels=new JChannel[NUM];
33+
protected MyReceiver<Integer>[] receivers=new MyReceiver[NUM];
34+
35+
@BeforeMethod
36+
protected void setup() throws Exception {
37+
for(int i=0; i < channels.length; i++) {
38+
JChannel ch=new JChannel(Util.getTestStackNew());
39+
((ReliableMulticast)ch.stack().findProtocol(NAKACK4.class)).setXmitInterval(500);
40+
String name=String.valueOf((char)('A' + i));
41+
receivers[i]=new MyReceiver<Integer>().name(name);
42+
if(i == 0) {
43+
NAKACK4 nak=ch.stack().findProtocol(ReliableMulticast.class);
44+
nak.capacity(5); // A can send 5 messages before it blocks
45+
}
46+
channels[i]=ch.name(name).connect("ReliableMulticastBlockTest");
47+
ch.receiver(receivers[i]);
48+
}
49+
Util.waitUntilAllChannelsHaveSameView(2000, 100, channels);
50+
}
51+
52+
@AfterMethod
53+
protected void destroy() {
54+
Util.closeReverse(channels);
55+
}
56+
57+
/** Tests A sending and blocking on waiting for ACKs from D, then D leaves -> this should unblock A */
58+
public void testSenderBlockingAndViewChange() throws Exception {
59+
DISCARD discard=new DISCARD().discardAll(true);
60+
channels[NUM-1].stack().insertProtocol(discard, ProtocolStack.Position.ABOVE, TP.class);
61+
Util.shutdown(channels[NUM-1]);
62+
channels[NUM-1]=null;
63+
64+
Thread sender=new Thread(() -> {
65+
System.out.printf("A sending %d messages to all\n", 10);
66+
for(int i=1; i <= 10; i++) {
67+
try {
68+
channels[0].send(null, i);
69+
}
70+
catch(Exception e) {
71+
throw new RuntimeException(e);
72+
}
73+
}
74+
});
75+
sender.start(); // will block
76+
77+
// inject view change excluding D
78+
View view=View.create(channels[0].address(), 10L, channels[0].address(), channels[1].address(), channels[2].address());
79+
System.out.printf("-- installing view %s\n", view);
80+
for(int i=0; i < NUM-1; i++) {
81+
GMS gms=channels[i].stack().findProtocol(GMS.class);
82+
gms.installView(view); // this should unblock the sender thread above
83+
}
84+
sender.join(500);
85+
86+
List<Integer> expected=IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
87+
Util.waitUntil(5000, 100,
88+
() -> Stream.of(receivers[0], receivers[1], receivers[2])
89+
.map(MyReceiver::size).allMatch(n -> n == 10), () -> print(receivers));
90+
assert receivers[NUM-1].size() == 0;
91+
for(int i=0; i < NUM-1; i++) {
92+
List<Integer> actual=receivers[i].list();
93+
assert expected.equals(actual);
94+
}
95+
System.out.printf("received msgs:\n%s\n", print(receivers));
96+
}
97+
98+
/** A sends messages and blocks, then A's channel is closed */
99+
public void testSenderBlockingAndChannelClose() throws Exception {
100+
for(int i=1; i < NUM; i++) {
101+
DISCARD discard=new DISCARD().discardAll(true);
102+
channels[i].stack().insertProtocol(discard, ProtocolStack.Position.ABOVE, TP.class);
103+
Util.shutdown(channels[i]);
104+
}
105+
106+
Thread sender=new Thread(() -> {
107+
System.out.printf("A sending %d messages to all\n", 10);
108+
for(int i=1; i <= 10; i++) {
109+
try {
110+
channels[0].send(null, i);
111+
}
112+
catch(Exception ex) {
113+
System.out.printf("-- exception because channel was closed: %s\n", ex);
114+
break;
115+
}
116+
}
117+
});
118+
sender.start(); // will block
119+
sender.join(1000);
120+
121+
channels[0].disconnect();
122+
Util.waitUntilTrue(2000, 100, () -> !sender.isAlive());
123+
assert !sender.isAlive() : "sender should have been unblocked";
124+
}
125+
126+
@SafeVarargs
127+
protected static String print(MyReceiver<Integer> ... receivers) {
128+
return Stream.of(receivers).map(r -> String.format("%s: %s", r.name(), r.list())).collect(Collectors.joining("\n"));
129+
}
130+
}

0 commit comments

Comments
 (0)