Skip to content

Commit 0db826f

Browse files
committed
- If the thread pool is full, use a separate thread to send the view (https://issues.redhat.com/browse/JGRP-2880)
- Use a separate thread to send LEAVE request in Leaver
1 parent accf9a3 commit 0db826f

File tree

4 files changed

+39
-9
lines changed

4 files changed

+39
-9
lines changed

src/org/jgroups/protocols/pbcast/GMS.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,11 @@ public void castViewChangeAndSendJoinRsps(View new_view, Digest digest, Collecti
560560
down_prot.down(view_change_msg);
561561
sendJoinResponses(jr, joiners);
562562
};
563-
thread_pool.execute(r);
563+
boolean rc=thread_pool.execute(r);
564+
if(!rc) { // https://issues.redhat.com/browse/JGRP-2880
565+
Thread th=getThreadFactory().newThread(r);
566+
th.start();
567+
}
564568
}
565569

566570
try {

src/org/jgroups/protocols/pbcast/Leaver.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,19 @@ public void reset() {
116116
protected void sendLeaveRequest(Address coord, Address leaving_mbr) {
117117
Message msg=new EmptyMessage(coord).setFlag(Message.Flag.OOB).setFlag(Message.TransientFlag.DONT_BLOCK)
118118
.putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.LEAVE_REQ, leaving_mbr));
119-
gms.getDownProtocol().down(msg);
119+
if(gms.thread_pool == null)
120+
gms.getDownProtocol().down(msg);
121+
else {
122+
// If the mcast protocol can block, we need to send a view asynchronously. The views will still
123+
// be delivered in order, see https://issues.redhat.com/browse/JGRP-2875 for details
124+
Runnable r=() -> {
125+
gms.getDownProtocol().down(msg);
126+
};
127+
boolean rc=gms.thread_pool.execute(r);
128+
if(!rc) { // https://issues.redhat.com/browse/JGRP-2880
129+
Thread th=gms.getThreadFactory().newThread(r);
130+
th.start();
131+
}
132+
}
120133
}
121134
}

src/org/jgroups/util/ThreadPool.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ public class ThreadPool implements Lifecycle {
7575
@ManagedAttribute(description="The number of messages dropped because the thread pool was full",type= SCALAR)
7676
protected final LongAdder num_rejected_msgs=new LongAdder();
7777

78+
protected final RejectedExecutionHandler rejected_handler=(r, pool) -> {
79+
num_rejected_msgs.increment();
80+
//https://issues.redhat.com/browse/JGRP-2802
81+
String thread_dump=thread_dumps_enabled? String.format(". Threads:\n%s", Util.dumpThreads()) : "";
82+
thread_pool_full_log.log(warn, "thread-pool-full", thread_pool_full_suppress_time,
83+
address, max_threads, getThreadPoolSize(), thread_dump);
84+
};
85+
7886
public ThreadPool() {
7987
}
8088

@@ -231,17 +239,17 @@ public void doExecute(Runnable task) {
231239

232240
public Executor pool() {return thread_pool;}
233241

234-
public boolean execute(Runnable task) {
242+
public boolean execute(Runnable task, RejectedExecutionHandler h) {
235243
try {
236244
thread_pool.execute(task);
237245
return true;
238246
}
239247
catch(RejectedExecutionException ex) {
240-
num_rejected_msgs.increment();
241-
//https://issues.redhat.com/browse/JGRP-2802
242-
String thread_dump=thread_dumps_enabled? String.format(". Threads:\n%s", Util.dumpThreads()) : "";
243-
thread_pool_full_log.log(warn, "thread-pool-full", thread_pool_full_suppress_time,
244-
address, max_threads, getThreadPoolSize(), thread_dump);
248+
rejected_handler.rejectedExecution(task, null); // we want the default behavior (incr rej. msgs)
249+
if(h != null) {
250+
ThreadPoolExecutor tmp=thread_pool instanceof ThreadPoolExecutor? (ThreadPoolExecutor)thread_pool : null;
251+
h.rejectedExecution(task, tmp);
252+
}
245253
return false;
246254
}
247255
catch(Throwable t) {
@@ -251,6 +259,10 @@ public boolean execute(Runnable task) {
251259
}
252260
}
253261

262+
public boolean execute(Runnable task) {
263+
return execute(task, null);
264+
}
265+
254266
public String toString() {
255267
return thread_pool != null? thread_pool.toString() : "n/a";
256268
}

tests/junit-functional/org/jgroups/tests/ReliableUnicastBlockTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ public void testConnectionCloseThenReopen() throws Exception {
155155
GMS gms_b=b.stack().findProtocol(GMS.class);
156156
gms_b.installView(mv);
157157

158-
Util.waitUntil(2000, 100, () -> rb.size() == 10);
158+
Util.waitUntil(2000, 100, () -> rb.size() == 10,
159+
() -> String.format("B has %d messages: %s", rb.size(), rb.list()));
159160
assert rb.list().equals(EXPECTED);
160161
System.out.printf("-- rb: %s\n", print(rb));
161162
}

0 commit comments

Comments
 (0)