Skip to content

Commit ac13ac6

Browse files
committed
merge bug24881 into default
2 parents 2b9e9f4 + 20c499d commit ac13ac6

File tree

1 file changed

+38
-30
lines changed

1 file changed

+38
-30
lines changed

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public static void main(String[] args) {
7676
int consumerTxSize = intArg(cmd, 'n', 0);
7777
long confirm = intArg(cmd, 'c', -1);
7878
boolean autoAck = cmd.hasOption('a');
79+
int multiAckEvery = intArg(cmd, 'A', 0);
7980
int prefetchCount = intArg(cmd, 'q', 0);
8081
int minMsgSize = intArg(cmd, 's', 0);
8182
int timeLimit = intArg(cmd, 'z', 0);
@@ -120,7 +121,7 @@ public static void main(String[] args) {
120121
Thread t =
121122
new Thread(new Consumer(channel, id, qName,
122123
consumerTxSize, autoAck,
123-
stats, timeLimit));
124+
multiAckEvery, stats, timeLimit));
124125
consumerThreads[i] = t;
125126
}
126127
Thread[] producerThreads = new Thread[producerCount];
@@ -183,27 +184,28 @@ private static void usage(Options options) {
183184

184185
private static Options getOptions() {
185186
Options options = new Options();
186-
options.addOption(new Option("?", "help", false,"show usage"));
187-
options.addOption(new Option("h", "uri", true, "AMQP URI"));
188-
options.addOption(new Option("t", "type", true, "exchange type"));
189-
options.addOption(new Option("e", "exchange", true, "exchange name"));
190-
options.addOption(new Option("u", "queue", true, "queue name"));
191-
options.addOption(new Option("i", "interval", true, "sampling interval"));
192-
options.addOption(new Option("r", "rate", true, "rate limit"));
193-
options.addOption(new Option("x", "producers", true, "producer count"));
194-
options.addOption(new Option("y", "consumers", true, "consumer count"));
195-
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
196-
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
197-
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
198-
options.addOption(new Option("a", "autoack", false,"auto ack"));
199-
options.addOption(new Option("q", "qos", true, "qos prefetch count"));
200-
options.addOption(new Option("s", "size", true, "message size"));
201-
options.addOption(new Option("z", "time", true, "time limit"));
202-
Option flag = new Option("f", "flag", true, "message flag");
187+
options.addOption(new Option("?", "help", false,"show usage"));
188+
options.addOption(new Option("h", "uri", true, "AMQP URI"));
189+
options.addOption(new Option("t", "type", true, "exchange type"));
190+
options.addOption(new Option("e", "exchange", true, "exchange name"));
191+
options.addOption(new Option("u", "queue", true, "queue name"));
192+
options.addOption(new Option("i", "interval", true, "sampling interval"));
193+
options.addOption(new Option("r", "rate", true, "rate limit"));
194+
options.addOption(new Option("x", "producers", true, "producer count"));
195+
options.addOption(new Option("y", "consumers", true, "consumer count"));
196+
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
197+
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
198+
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
199+
options.addOption(new Option("a", "autoack", false,"auto ack"));
200+
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
201+
options.addOption(new Option("q", "qos", true, "qos prefetch count"));
202+
options.addOption(new Option("s", "size", true, "message size"));
203+
options.addOption(new Option("z", "time", true, "time limit"));
204+
Option flag = new Option("f", "flag", true, "message flag");
203205
flag.setArgs(Option.UNLIMITED_VALUES);
204206
options.addOption(flag);
205-
options.addOption(new Option("M", "framemax", true, "frame max"));
206-
options.addOption(new Option("b", "heartbeat", true, "heartbeat interval"));
207+
options.addOption(new Option("M", "framemax", true, "frame max"));
208+
options.addOption(new Option("b", "heartbeat", true, "heartbeat interval"));
207209
return options;
208210
}
209211

@@ -410,20 +412,22 @@ public static class Consumer implements Runnable {
410412
private String queueName;
411413
private int txSize;
412414
private boolean autoAck;
415+
private int multiAckEvery;
413416
private Stats stats;
414417
private long timeLimit;
415418

416419
public Consumer(Channel channel, String id,
417420
String queueName, int txSize, boolean autoAck,
418-
Stats stats, int timeLimit) {
419-
420-
this.channel = channel;
421-
this.id = id;
422-
this.queueName = queueName;
423-
this.txSize = txSize;
424-
this.autoAck = autoAck;
425-
this.stats = stats;
426-
this.timeLimit = 1000L * timeLimit;
421+
int multiAckEvery, Stats stats, int timeLimit) {
422+
423+
this.channel = channel;
424+
this.id = id;
425+
this.queueName = queueName;
426+
this.txSize = txSize;
427+
this.autoAck = autoAck;
428+
this.multiAckEvery = multiAckEvery;
429+
this.stats = stats;
430+
this.timeLimit = 1000L * timeLimit;
427431
}
428432

429433
public void run() {
@@ -462,7 +466,11 @@ public void run() {
462466
Envelope envelope = delivery.getEnvelope();
463467

464468
if (!autoAck) {
465-
channel.basicAck(envelope.getDeliveryTag(), false);
469+
if (multiAckEvery == 0) {
470+
channel.basicAck(envelope.getDeliveryTag(), false);
471+
} else if (totalMsgCount % multiAckEvery == 0) {
472+
channel.basicAck(envelope.getDeliveryTag(), true);
473+
}
466474
}
467475

468476
if (txSize != 0 && totalMsgCount % txSize == 0) {

0 commit comments

Comments
 (0)