Skip to content

Commit f7dc255

Browse files
committed
Added xmit_interval=0 to NAKACK2, disabling retransmission (https://issues.redhat.com/browse/JGRP-2675)
Added rejection_policy field to ThreadPool
1 parent b43fdf1 commit f7dc255

File tree

7 files changed

+296
-39
lines changed

7 files changed

+296
-39
lines changed

conf/tcp-low-latency.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
bind_port="${jgroups.bind_port:7800}"
1919

2020
buffered_input_stream_size="8192"
21-
buffered_output_stream_size="0"
21+
buffered_output_stream_size="8192"
2222

2323
thread_pool.enabled="false"
2424

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
2+
NAKACK2 without retransmission over TCP
3+
=======================================
4+
5+
Author: Bela Ban
6+
Date: March 2023
7+
JIRA: https://issues.redhat.com/browse/JGRP-2675
8+
Unit test: NAKACK2_NoRetransmissionTest
9+
10+
11+
Goal
12+
----
13+
When run over TCP (and TCP_NIO2, see below) as transport, NAKACK2 can disable retransmission, as this is already
14+
performed by TCP.
15+
16+
Contrary to [1], which completely removes NAKACK2, this design uses the retransmit table *for the sole purpose of
17+
establishing ordering*, but not for retransmission.
18+
19+
20+
Implementation
21+
--------------
22+
* Seqnos are still added to messages sent down the stack
23+
* On the receiver, messages (or message batches) are added to the retransmit table and delivered in order of seqnos
24+
* Digests received by STABLE purge messages (from the retransmit table) that have been see by everyone
25+
* xmit_interval <= 0 disables retransmission
26+
* No retransmit task is running
27+
* stable() will not trigger a retransmission
28+
* The last-seqno-sender task is also disabled
29+
30+
31+
become_server_queue
32+
-------------------
33+
* A joiner queues messages received before JChannel.connect() returns
34+
* When getting a BECOME_SERVER event, the queued messages are sent up the stack
35+
* The become_server_queue is bounded (default: 50), so the following can happen:
36+
* C joins a cluster {A,B}. B's highest seqno is 100, sent as a digest with the view {A,B,C} to C
37+
* Before C can become a server, B sends 70 messages
38+
* C queues 50 of those (101-150), but drops 151-170 as the queue is bounded
39+
** Correction: BoundedList drops the oldest N messages when the capacity has been exceeded, so 101-120
40+
However, this doesn't change anything on a concetual level
41+
* C now creates the retransmit window for B at 50 (next seqno to be expected: 51)
42+
* When C receives the BECOME_SERVER, it'll drain the 50 messages from the queue, so B's window is at 150
43+
--> Since there is not retransmission, B's messages 151-170 will not get retransmitted, so further messages sent
44+
by B will be added to B's window at C, but will not get delivered as there is a gap
45+
--> Eventually, C's window for B might cause an OOME, as it only grows and never shrinks
46+
==> Solution: when xmit_interval is <= 0, make become_server_queue *unbounded*. The number of messages received in
47+
that queue should be relatively small, as the time window between sending a join request and becoming server
48+
is short
49+
==> xmit_interval should be able to be changed dynamically
50+
* Start / stop the retransmission task
51+
* Make become_server_queue bounded / unbounded
52+
53+
54+
Thread pool rejection policy
55+
----------------------------
56+
* The transport's thread pool must not drop messages, as this would create a gap in the retransmit table. This is
57+
required, as there is no retransmission at the NAKACK2 level.
58+
59+
60+
Summary
61+
-------
62+
This design requires that a message received over TCP *cannot be dropped before it reaches NAKACK2*. So tests such as
63+
LastMessageDroppedTest, which inserts DISCARD just below NAKACK2 to drop selected messages, won't pass with
64+
retransmission disabled. As TCP/IP considers the message delivered, if it is nevertheless dropped later on, it will
65+
never be retransmitted (not by TCP/IP and not by NAKACK2).
66+
67+
68+
69+
70+
71+
72+
73+
74+
75+
76+
77+
78+
TCP_NIO2
79+
--------
80+
* When TCP_NIO2 is used as transport, when sending a message, the gathering-write buffer 'Buffers' cannot drop the
81+
message, e.g. because the max capacity has been exceeded. Investigate making Buffers unbounded.
82+
83+
84+
Links
85+
-----
86+
[1] https://issues.redhat.com/browse/JGRP-2566

src/org/jgroups/protocols/pbcast/NAKACK2.java

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@
1010
import org.jgroups.util.*;
1111

1212
import java.util.*;
13-
import java.util.concurrent.ConcurrentHashMap;
14-
import java.util.concurrent.ConcurrentMap;
15-
import java.util.concurrent.Future;
16-
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.*;
1714
import java.util.concurrent.atomic.AtomicBoolean;
1815
import java.util.concurrent.atomic.AtomicInteger;
1916
import java.util.concurrent.atomic.AtomicLong;
@@ -60,20 +57,18 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
6057
@Property(description="Use a multicast to request retransmission of missing messages")
6158
protected boolean use_mcast_xmit_req;
6259

63-
6460
/**
6561
* Ask a random member for retransmission of a missing message. If set to
6662
* true, discard_delivered_msgs will be set to false
6763
*/
6864
@Property(description="Ask a random member for retransmission of a missing message. Default is false")
6965
protected boolean xmit_from_random_member;
7066

71-
7267
/**
7368
* Messages that have been received in order are sent up the stack (= delivered to the application).
74-
* Delivered messages are removed from the retransmission buffer, so they can get GC'ed by the JVM. When this
75-
* property is true, everyone (except the sender of a message) removes the message from their retransission
76-
* buffers as soon as it has been delivered to the application
69+
* Delivered messages are removed from the retransmit table, so they can get GC'ed by the JVM. When this
70+
* property is true, everyone (except the sender of a message) removes the message from their retransmit
71+
* table as soon as it has been delivered to the application
7772
*/
7873
@Property(description="Should messages delivered to application be discarded")
7974
protected boolean discard_delivered_msgs=true;
@@ -110,7 +105,7 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
110105
@Property(description="Size of the queue to hold messages received after creating the channel, but before being " +
111106
"connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the " +
112107
"queue is cleared. The motivation is to avoid retransmissions (see https://issues.redhat.com/browse/JGRP-1509 " +
113-
"for details). 0 disables the queue.")
108+
"for details).")
114109
protected int become_server_queue_size=50;
115110

116111
@Property(description="Time during which identical warnings about messages from a non member will be suppressed. " +
@@ -257,7 +252,7 @@ public void setResendLastSeqno(boolean flag) {
257252
/** Keeps a bounded list of the last N digest sets */
258253
protected final BoundedList<String> digest_history=new BoundedList<>(10);
259254

260-
protected BoundedList<Message> become_server_queue;
255+
protected Queue<Message> become_server_queue;
261256

262257
/** Log to suppress identical warnings for messages from non-members */
263258
protected SuppressLog<Address> suppress_log_non_member;
@@ -338,7 +333,7 @@ public <T extends Protocol> T setLevel(String level) {
338333

339334
@ManagedAttribute(description="Actual size of the become_server_queue",type=AttributeType.SCALAR)
340335
public int getBecomeServerQueueSizeActual() {
341-
return become_server_queue != null? become_server_queue.size() : -1;
336+
return become_server_queue.size();
342337
}
343338

344339
/** Returns the receive window for sender; only used for testing. Do not use ! */
@@ -499,8 +494,23 @@ public void init() throws Exception {
499494
}
500495
}
501496

502-
if(become_server_queue_size > 0)
503-
become_server_queue=new BoundedList<>(become_server_queue_size);
497+
if(xmit_interval <= 0) {
498+
// https://issues.redhat.com/browse/JGRP-2675
499+
become_server_queue=new ConcurrentLinkedQueue<>();
500+
RejectedExecutionHandler handler=transport.getThreadPool().getRejectedExecutionHandler();
501+
if(!(handler instanceof ThreadPoolExecutor.CallerRunsPolicy)) {
502+
log.warn("%s: xmit_interval of %d requires a CallerRunsPolicy in the thread pool; replacing %s",
503+
local_addr, xmit_interval, handler.getClass().getSimpleName());
504+
transport.getThreadPool().setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
505+
}
506+
}
507+
else {
508+
if(become_server_queue_size <= 0) {
509+
log.warn("%s: %s.become_server_queue_size is <= 0; setting it to 10", local_addr, NAKACK2.class.getSimpleName());
510+
become_server_queue_size=10;
511+
}
512+
become_server_queue=new ArrayBlockingQueue<>(become_server_queue_size);
513+
}
504514

505515
if(suppress_time_non_member_warnings > 0)
506516
suppress_log_non_member=new SuppressLog<>(log, "MsgDroppedNak", "SuppressMsg");
@@ -539,8 +549,7 @@ public void start() throws Exception {
539549
public void stop() {
540550
running=false;
541551
is_server=false;
542-
if(become_server_queue != null)
543-
become_server_queue.clear();
552+
become_server_queue.clear();
544553
stopRetransmitTask();
545554
xmit_task_map.clear();
546555
stable_xmit_map.clear();
@@ -785,12 +794,10 @@ public String[] supportedKeys() {
785794
/* --------------------------------- Private Methods --------------------------------------- */
786795

787796
protected void queueMessage(Message msg, long seqno) {
788-
if(become_server_queue != null) {
789-
become_server_queue.add(msg);
790-
log.trace("%s: message %s#%d was added to queue (not yet server)", local_addr, msg.getSrc(), seqno);
791-
}
797+
if(become_server_queue.offer(msg)) // discards item if queue is full
798+
log.trace("%s: message %s#%d was queued (not yet server)", local_addr, msg.getSrc(), seqno);
792799
else
793-
log.trace("%s: message %s#%d was discarded (not yet server)", local_addr, msg.getSrc(), seqno);
800+
log.trace("%s: message %s#%d was discarded (not yet server, queue full)", local_addr, msg.getSrc(), seqno);
794801
}
795802

796803
protected void unknownMember(Address sender, Object message) {
@@ -1028,19 +1035,14 @@ protected void deliverBatch(MessageBatch batch) {
10281035
* {@link GMS#installView(org.jgroups.View,org.jgroups.util.Digest)} method (called when a view is installed).
10291036
*/
10301037
protected void flushBecomeServerQueue() {
1031-
if(become_server_queue != null && !become_server_queue.isEmpty()) {
1038+
if(!become_server_queue.isEmpty()) {
10321039
log.trace("%s: flushing become_server_queue (%d elements)", local_addr, become_server_queue.size());
1033-
10341040
TP transport=getTransport();
1035-
for(final Message msg: become_server_queue) {
1036-
transport.getThreadPool().execute(() -> {
1037-
try {
1038-
up(msg);
1039-
}
1040-
finally {
1041-
become_server_queue.remove(msg);
1042-
}
1043-
});
1041+
for(;;) {
1042+
final Message msg=become_server_queue.poll();
1043+
if(msg == null)
1044+
break;
1045+
transport.getThreadPool().execute(() -> up(msg));
10441046
}
10451047
}
10461048
}
@@ -1436,7 +1438,7 @@ protected void stable(Digest digest) {
14361438
// check whether the last seqno received for a sender P in the stability digest is > last seqno
14371439
// received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic in DESIGN)
14381440
Table<Message> buf=xmit_table.get(member);
1439-
if(buf != null) {
1441+
if(buf != null && xmit_interval > 0) {
14401442
long my_hr=buf.getHighestReceived();
14411443
Long prev_hr=stable_xmit_map.get(member);
14421444
if(prev_hr != null && prev_hr > my_hr) {
@@ -1501,7 +1503,7 @@ protected static long sizeOfAllMessages(Table<Message> buf, boolean include_head
15011503
}
15021504

15031505
protected void startRetransmitTask() {
1504-
if(xmit_task == null || xmit_task.isDone())
1506+
if(xmit_interval > 0 && (xmit_task == null || xmit_task.isDone()))
15051507
xmit_task=timer.scheduleWithFixedDelay(new RetransmitTask(), 0, xmit_interval, TimeUnit.MILLISECONDS, sends_can_block);
15061508
}
15071509

src/org/jgroups/util/ThreadPool.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public class ThreadPool implements Lifecycle {
4242
@Property(description="Timeout (ms) to remove idle threads from the pool", type=AttributeType.TIME)
4343
protected long keep_alive_time=30000;
4444

45+
@Property(description="The rejection policy to be used in the thread pool (abort, discard, run, custom etc. " +
46+
"See Util.parseRejectionPolicy() for details")
47+
protected String rejection_policy="abort";
48+
4549
@Property(description="The number of times a thread pool needs to be full before a thread dump is logged")
4650
protected int thread_dumps_threshold=1;
4751

@@ -111,6 +115,23 @@ public ThreadPool setKeepAliveTime(long time) {
111115
return this;
112116
}
113117

118+
public ThreadPool setRejectionPolicy(String policy) {
119+
RejectedExecutionHandler p=Util.parseRejectionPolicy(policy);
120+
this.rejection_policy=policy;
121+
if(thread_pool instanceof ThreadPoolExecutor)
122+
((ThreadPoolExecutor)thread_pool).setRejectedExecutionHandler(p);
123+
return this;
124+
}
125+
126+
public RejectedExecutionHandler getRejectedExecutionHandler() {
127+
Executor t=thread_pool;
128+
return t instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)t).getRejectedExecutionHandler() : null;
129+
}
130+
131+
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
132+
if(thread_pool instanceof ThreadPoolExecutor)
133+
((ThreadPoolExecutor)thread_pool).setRejectedExecutionHandler(handler);
134+
}
114135

115136
public int getThreadDumpsThreshold() {
116137
return thread_dumps_threshold;
@@ -160,7 +181,7 @@ public int getLargestSize() {
160181
public void init() throws Exception {
161182
if(enabled) {
162183
thread_pool=ThreadCreator.createThreadPool(min_threads, max_threads, keep_alive_time,
163-
"abort", new SynchronousQueue<>(), tp.getThreadFactory(), tp.useVirtualThreads(), tp.getLog());
184+
rejection_policy, new SynchronousQueue<>(), tp.getThreadFactory(), tp.useVirtualThreads(), tp.getLog());
164185
}
165186
else // otherwise use the caller's thread to unmarshal the byte buffer into a message
166187
thread_pool=new DirectExecutor();

tests/byteman/org/jgroups/tests/byteman/BecomeServerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void receive(Message msg) {
6767
}
6868

6969

70-
protected void sendMessage(JChannel ch, String message) {
70+
protected static void sendMessage(JChannel ch, String message) {
7171
try {
7272
ch.send(new ObjectMessage(null, message).setFlag(Message.Flag.OOB));
7373
}

0 commit comments

Comments
 (0)