Skip to content

Commit c1cd80a

Browse files
committed
Added ASYNC
1 parent 122d1ac commit c1cd80a

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

src/org/jgroups/protocols/ASYNC.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.jgroups.protocols;
2+
3+
import org.jgroups.annotations.MBean;
4+
import org.jgroups.annotations.Property;
5+
import org.jgroups.stack.Protocol;
6+
import org.jgroups.util.MessageBatch;
7+
import org.jgroups.util.ThreadPool;
8+
9+
/**
10+
* Delivers message batches in separate threads. Note that this can destroy ordering for
11+
* regular message batches. For OOB batches, this is OK.
12+
* @author Bela Ban
13+
* @since 5.4.3
14+
*/
15+
@MBean(description="Delivers (OOB) message batches in a seperate thread, so that")
16+
public class ASYNC extends Protocol {
17+
protected ThreadPool thread_pool;
18+
19+
@Property(description="Handle regular messages and batches (destroys ordering)")
20+
protected boolean handle_reg_msgs;
21+
22+
@Property(description="Process (= async dispatch) only batches >= max_batch_size. Smaller batches are dispatched " +
23+
"on the same thread. 0 disables this")
24+
protected int max_batch_size;
25+
26+
@Override
27+
public void init() throws Exception {
28+
thread_pool=getTransport().getThreadPool();
29+
}
30+
31+
@Override
32+
public void up(MessageBatch batch) {
33+
MessageBatch.Mode mode=batch.mode();
34+
if((mode == MessageBatch.Mode.OOB || handle_reg_msgs) && batch.size() >= max_batch_size)
35+
thread_pool.execute(new AsyncBatchDispatcher(batch));
36+
up_prot.up(batch);
37+
}
38+
39+
protected class AsyncBatchDispatcher implements Runnable {
40+
protected final MessageBatch batch;
41+
42+
protected AsyncBatchDispatcher(MessageBatch batch) {
43+
this.batch=batch;
44+
}
45+
46+
@Override
47+
public void run() {
48+
up_prot.up(batch);
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)