Skip to content

Commit f1a5500

Browse files
committed
Added PassRegularMessagesUpDirectly policy
1 parent d20dd4c commit f1a5500

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

src/org/jgroups/protocols/TP.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,10 @@ public <T extends Protocol> T setLevel(String level) {
275275
public void setMessageProcessingPolicy(String policy) {
276276
if(policy == null)
277277
return;
278+
278279
msg_processing_policy=policy.startsWith("submit")? new SubmitToThreadPool() :
279-
policy.startsWith("max")? new MaxOneThreadPerSender() : null;
280+
policy.startsWith("max")? new MaxOneThreadPerSender() :
281+
policy.startsWith("direct")? new PassRegularMessagesUpDirectly() : null;
280282
try {
281283
if(msg_processing_policy == null) {
282284
Class<MessageProcessingPolicy> clazz=(Class<MessageProcessingPolicy>)Util.loadClass(policy, getClass());
@@ -290,6 +292,7 @@ public void setMessageProcessingPolicy(String policy) {
290292
}
291293
}
292294

295+
293296
/* --------------------------------------------- JMX ---------------------------------------------- */
294297
@Component(name="msg_stats")
295298
protected final MsgStats msg_stats=new MsgStats();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.jgroups.util;
2+
3+
import org.jgroups.Message;
4+
5+
/**
6+
* {@link org.jgroups.stack.MessageProcessingPolicy} which passes regular messages and message batches up directly
7+
* (on the same thread), but passes OOB messages to the thread pool.
8+
* @author Bela Ban
9+
* @since 5.2.14
10+
*/
11+
public class PassRegularMessagesUpDirectly extends SubmitToThreadPool {
12+
13+
@Override
14+
public boolean loopback(Message msg, boolean oob) {
15+
if(oob)
16+
return super.loopback(msg, oob);
17+
tp.passMessageUp(msg, null, false, msg.getDest() == null, false);
18+
return true;
19+
}
20+
21+
@Override
22+
public boolean process(Message msg, boolean oob) {
23+
if(oob)
24+
return super.process(msg, oob);
25+
SingleMessageHandler smh=new SingleMessageHandler(msg);
26+
smh.run();
27+
return true;
28+
}
29+
30+
@Override
31+
public boolean process(MessageBatch batch, boolean oob) {
32+
if(oob)
33+
return super.process(batch, oob);
34+
BatchHandler bh=new BatchHandler(batch);
35+
bh.run();
36+
return true;
37+
}
38+
}

0 commit comments

Comments
 (0)