Skip to content

Commit e810ee4

Browse files
committed
Ordering test now passes reliably, as flow control max_bytes was reduced. This would have worked unchanged when switching to NAKACK4 / UNICAST4
1 parent 0c16aa4 commit e810ee4

File tree

3 files changed

+37
-34
lines changed

3 files changed

+37
-34
lines changed

src/org/jgroups/protocols/SHUFFLE.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ public class SHUFFLE extends Protocol {
3434

3535

3636
@Property(description="Reorder up messages and message batches")
37-
protected boolean up=true;
37+
protected volatile boolean up=true;
3838

3939
@Property(description="Reorder down messages and message batches")
40-
protected boolean down;
40+
protected volatile boolean down;
4141

4242
@Property(description="max number of messages before we reorder queued messages and send them up")
4343
protected int max_size=10;
@@ -140,15 +140,12 @@ public void flush(boolean stop_shuffling) {
140140
}
141141
}
142142

143-
144143
protected static void shuffle(MessageBatch batch) {
145144
Message[] msgs=batch.stream().toArray(Message[]::new);
146145
Util.shuffle(msgs, 0, msgs.length);
147146
batch.array().set(msgs);
148147
}
149148

150-
151-
152149
protected synchronized void startTask() {
153150
if(task == null || task.isDone() || task.isCancelled())
154151
task=timer.schedule(() -> {

src/org/jgroups/protocols/pbcast/NAKACK2.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ public void setResendLastSeqno(boolean flag) {
310310
public int getNumMessagesReceived() {return num_messages_received;}
311311
public NAKACK2 setNumMessagesReceived(int n) {this.num_messages_received=n; return this;}
312312

313+
public int getMaxBatchSize() {return max_batch_size;}
314+
public NAKACK2 setMaxBatchSize(int s) {max_batch_size=s; return this;}
315+
313316
public boolean isTrace() {return is_trace;}
314317
public NAKACK2 isTrace(boolean i) {this.is_trace=i; return this;}
315318

tests/junit-functional/org/jgroups/tests/OrderingTest.java

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.CountDownLatch;
1616
import java.util.stream.Stream;
1717

18+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
19+
1820
/**
1921
* Tests multicast and unicast ordering of <em>regular</em> messages.<br/>
2022
* Regular messages from different senders can be delivered in parallel; messages from the same sender must be
@@ -25,14 +27,12 @@
2527
*/
2628
@Test(groups=Global.TIME_SENSITIVE,singleThreaded=true)
2729
public class OrderingTest {
28-
protected static final int NUM_MSGS=100000;
30+
protected static final int NUM_MSGS=50_000;
2931
protected static final int PRINT=NUM_MSGS / 5;
3032
protected static final int NUM_SENDERS=2;
33+
protected JChannel[] channels=new JChannel[NUM_SENDERS];
3134

32-
protected JChannel[] channels=new JChannel[NUM_SENDERS];
33-
34-
35-
@BeforeMethod void init() throws Exception {
35+
@BeforeMethod protected void init() throws Exception {
3636
for(int i=0; i < channels.length; i++) {
3737
channels[i]=createChannel(i).connect("OrderingTest.testFIFOOrder");
3838
channels[i].setReceiver(new MyReceiver(channels[i].name()));
@@ -44,66 +44,67 @@ public class OrderingTest {
4444
}
4545
}
4646

47-
@AfterMethod void destroy() {
48-
47+
@AfterMethod protected void destroy() {
4948
Stream.of(channels).forEach(ch -> {
5049
SHUFFLE shuffle=ch.getProtocolStack().findProtocol(SHUFFLE.class);
5150
shuffle.setDown(false).setUp(false);
5251
});
5352
Util.close(channels);
5453
}
5554

56-
5755
protected static JChannel createChannel(int index) throws Exception {
5856
return new JChannel(new SHARED_LOOPBACK(),
5957
new SHARED_LOOPBACK_PING(),
60-
new SHUFFLE().setUp(false).setDown(false).setMaxSize(200), // reorders messages
61-
new NAKACK2().useMcastXmit(false).setDiscardDeliveredMsgs(true).setXmitInterval(200),
58+
new SHUFFLE().setUp(false).setDown(false).setMaxSize(100), // reorders messages
59+
new NAKACK2().setDiscardDeliveredMsgs(true).setXmitInterval(200),
6260
new UNICAST3(),
6361
new STABLE().setMaxBytes(50_000).setDesiredAverageGossip(1000),
6462
new GMS().setJoinTimeout(500).printLocalAddress(false),
65-
new UFC().setMaxCredits(2_000_000),
66-
new MFC().setMaxCredits(2_000_000),
67-
new FRAG2())
68-
.name(String.valueOf((char)('A' +index)));
63+
new UFC().setMaxCredits(200_000).setMinCredits(50_000),
64+
new MFC().setMaxCredits(200_000).setMinCredits(50_000),
65+
new FRAG2().setFragSize(40_000))
66+
.name(String.valueOf((char)('A' + index)));
6967
}
7068

7169

7270
// @Test(invocationCount=100)
7371
public void testMulticastFIFOOrdering() throws Exception {
7472
System.out.println("\n-- sending " + NUM_MSGS + " messages");
7573
final CountDownLatch latch=new CountDownLatch(1);
76-
MySender[] senders=new MySender[NUM_SENDERS];
74+
Thread[] senders=new Thread[NUM_SENDERS];
7775
for(int i=0; i < senders.length; i++) {
78-
senders[i]=new MySender(channels[i], null, latch);
76+
senders[i]=new Thread(new MySender(channels[i], null, latch), "sender-" + i);
7977
senders[i].start();
8078
}
8179
latch.countDown();
82-
for(MySender sender: senders)
80+
long start=System.nanoTime();
81+
for(Thread sender: senders)
8382
sender.join();
84-
8583
System.out.println("-- senders done");
86-
8784
checkOrder(NUM_MSGS * NUM_SENDERS);
85+
long time=System.nanoTime()-start;
86+
System.out.printf("-- took %s to send and receive %,d msgs\n", Util.printTime(time, NANOSECONDS), NUM_MSGS*NUM_SENDERS);
8887
}
8988

9089
public void testUnicastFIFOOrdering() throws Exception {
9190
System.out.printf("\n-- sending %d unicast messages\n", NUM_MSGS);
9291
final CountDownLatch latch=new CountDownLatch(1);
93-
MySender[] senders=new MySender[NUM_SENDERS];
92+
Thread[] senders=new Thread[NUM_SENDERS];
9493
for(int i=0; i < senders.length; i++) {
9594
Address dest=channels[(i+1) % channels.length].getAddress();
96-
senders[i]=new MySender(channels[i], dest, latch);
95+
senders[i]=new Thread(new MySender(channels[i], dest, latch), "sender=" + i);
9796
System.out.printf("-- %s sends to %s\n", channels[i].getAddress(), dest);
9897
senders[i].start();
9998
}
10099
latch.countDown();
101-
for(MySender sender: senders)
100+
long start=System.nanoTime();
101+
for(Thread sender: senders)
102102
sender.join();
103103

104104
System.out.println("-- senders done");
105-
106105
checkOrder(NUM_MSGS);
106+
long time=System.nanoTime()-start;
107+
System.out.printf("-- took %s to send, reshuffle and receive %,d msgs\n", Util.printTime(time, NANOSECONDS), NUM_MSGS);
107108
}
108109

109110
protected void checkOrder(int expected_msgs) {
@@ -114,16 +115,16 @@ protected void checkOrder(int expected_msgs) {
114115
}
115116

116117
System.out.println("\n-- waiting for message reception by all receivers:");
117-
Util.waitUntilTrue(10000, 500,
118+
Util.waitUntilTrue(500000, 500,
118119
() -> Stream.of(channels).map(JChannel::getReceiver)
119120
.allMatch(r -> ((MyReceiver)r).getReceived() == expected_msgs));
120121

121122
Stream.of(channels).forEach(ch -> System.out.printf("%s: %d\n", ch.getAddress(),
122123
((MyReceiver)ch.getReceiver()).getReceived()));
123124
for(JChannel ch: channels) {
124-
MyReceiver receiver=(MyReceiver)ch.getReceiver();
125-
assert receiver.getReceived() == expected_msgs :
126-
String.format("%s had %d messages (expected=%d)", receiver.name, receiver.getReceived(), expected_msgs);
125+
MyReceiver r=(MyReceiver)ch.getReceiver();
126+
assert r.getReceived() == expected_msgs :
127+
String.format("%s received %d messages (expected=%d)", r.name, r.getReceived(), expected_msgs);
127128
}
128129

129130
System.out.println("\n-- checking message order");
@@ -138,7 +139,7 @@ protected void checkOrder(int expected_msgs) {
138139

139140

140141

141-
protected static class MySender extends Thread {
142+
protected static class MySender implements Runnable {
142143
protected final JChannel ch;
143144
protected final Address dest;
144145
protected final CountDownLatch latch;
@@ -158,7 +159,7 @@ public void run() {
158159
}
159160
for(int i=1; i <= NUM_MSGS; i++) {
160161
try {
161-
Message msg=new BytesMessage(dest, i);
162+
Message msg=new ObjectMessage(dest, i);
162163
ch.send(msg);
163164
if(i % PRINT == 0)
164165
System.out.println(ch.getAddress() + ": " + i + " sent");
@@ -202,7 +203,9 @@ public synchronized void receive(Message msg) {
202203
if(++received % PRINT == 0)
203204
System.out.printf("%s: received %d\n", name, received);
204205
}
206+
205207
}
206208

207209

210+
208211
}

0 commit comments

Comments
 (0)