39
39
import java .util .Arrays ;
40
40
import java .util .List ;
41
41
import java .util .UUID ;
42
- import java .net .Socket ;
43
42
44
43
import org .apache .commons .cli .CommandLine ;
45
44
import org .apache .commons .cli .CommandLineParser ;
62
61
import com .rabbitmq .client .AMQP .Queue ;
63
62
import com .rabbitmq .client .QueueingConsumer .Delivery ;
64
63
64
+
65
65
public class MulticastMain {
66
66
67
67
public static void main (String [] args ) {
@@ -70,24 +70,22 @@ public static void main(String[] args) {
70
70
try {
71
71
CommandLine cmd = parser .parse (options , args );
72
72
73
- final String hostName = strArg (cmd , 'h' , "localhost" );
74
- final int portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
75
- final String exchangeType = strArg (cmd , 't' , "direct" );
76
- final String exchangeName = strArg (cmd , 'e' , exchangeType );
77
- final int samplingInterval = intArg (cmd , 'i' , 1 );
78
- final int rateLimit = intArg (cmd , 'r' , 0 );
79
- final int producerCount = intArg (cmd , 'x' , 1 );
80
- final int consumerCount = intArg (cmd , 'y' , 1 );
81
- final int producerTxSize = intArg (cmd , 'm' , 0 );
82
- final int consumerTxSize = intArg (cmd , 'n' , 0 );
83
- final boolean autoAck = cmd .hasOption ('a' );
84
- final int prefetchCount = intArg (cmd , 'q' , 0 );
85
- final int minMsgSize = intArg (cmd , 's' , 0 );
86
- final int maxRedirects = intArg (cmd , 'd' , 0 );
87
- final int timeLimit = intArg (cmd , 'z' , 0 );
88
- final int sendBufferSize = intArg (cmd , 'b' , -1 );
89
- final int recvBufferSize = intArg (cmd , 'c' , -1 );
90
- final List flags = lstArg (cmd , 'f' );
73
+ String hostName = strArg (cmd , 'h' , "localhost" );
74
+ int portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
75
+ String exchangeType = strArg (cmd , 't' , "direct" );
76
+ String exchangeName = strArg (cmd , 'e' , exchangeType );
77
+ int samplingInterval = intArg (cmd , 'i' , 1 );
78
+ int rateLimit = intArg (cmd , 'r' , 0 );
79
+ int producerCount = intArg (cmd , 'x' , 1 );
80
+ int consumerCount = intArg (cmd , 'y' , 1 );
81
+ int producerTxSize = intArg (cmd , 'm' , 0 );
82
+ int consumerTxSize = intArg (cmd , 'n' , 0 );
83
+ boolean autoAck = cmd .hasOption ('a' );
84
+ int prefetchCount = intArg (cmd , 'q' , 0 );
85
+ int minMsgSize = intArg (cmd , 's' , 0 );
86
+ int maxRedirects = intArg (cmd , 'd' , 0 );
87
+ int timeLimit = intArg (cmd , 'z' , 0 );
88
+ List flags = lstArg (cmd , 'f' );
91
89
92
90
//setup
93
91
String id = UUID .randomUUID ().toString ();
@@ -100,15 +98,7 @@ public static void main(String[] args) {
100
98
Connection [] consumerConnections = new Connection [consumerCount ];
101
99
for (int i = 0 ; i < consumerCount ; i ++) {
102
100
System .out .println ("starting consumer #" + i );
103
- Connection conn = new ConnectionFactory (params ) {
104
- public void configureSocket (Socket socket ) throws IOException {
105
- super .configureSocket (socket );
106
- if (recvBufferSize > 0 )
107
- socket .setReceiveBufferSize (recvBufferSize );
108
- if (sendBufferSize > 0 )
109
- socket .setSendBufferSize (sendBufferSize );
110
- }
111
- }.newConnection (addresses , maxRedirects );
101
+ Connection conn = new ConnectionFactory (params ).newConnection (addresses , maxRedirects );
112
102
consumerConnections [i ] = conn ;
113
103
Channel channel = conn .createChannel ();
114
104
if (consumerTxSize > 0 ) channel .txSelect ();
@@ -119,7 +109,7 @@ public void configureSocket(Socket socket) throws IOException {
119
109
if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
120
110
channel .basicConsume (queueName , autoAck , consumer );
121
111
channel .queueBind (queueName , exchangeName , id );
122
- Thread t =
112
+ Thread t =
123
113
new Thread (new Consumer (consumer , id ,
124
114
consumerTxSize , autoAck ,
125
115
stats , timeLimit ));
@@ -135,7 +125,7 @@ public void configureSocket(Socket socket) throws IOException {
135
125
Channel channel = conn .createChannel ();
136
126
if (producerTxSize > 0 ) channel .txSelect ();
137
127
channel .exchangeDeclare (exchangeName , exchangeType );
138
- Thread t =
128
+ Thread t =
139
129
new Thread (new Producer (channel , exchangeName , id ,
140
130
flags , producerTxSize ,
141
131
1000L * samplingInterval ,
@@ -168,24 +158,22 @@ public void configureSocket(Socket socket) throws IOException {
168
158
169
159
private static Options getOptions () {
170
160
Options options = new Options ();
171
- options .addOption (new Option ("h" , "host" , true , "broker host" ));
172
- options .addOption (new Option ("p" , "port" , true , "broker port" ));
173
- options .addOption (new Option ("t" , "type" , true , "exchange type" ));
174
- options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
175
- options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
176
- options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
177
- options .addOption (new Option ("x" , "producers" , true , "producer count" ));
178
- options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
179
- options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
180
- options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
181
- options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
182
- options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
183
- options .addOption (new Option ("s" , "size" , true , "message size" ));
184
- options .addOption (new Option ("d" , "redirects" , true , "max redirects" ));
185
- options .addOption (new Option ("z" , "time" , true , "time limit" ));
186
- options .addOption (new Option ("b" , "sendbuffer" , true , "send buffer size" ));
187
- options .addOption (new Option ("c" , "recvbuffer" , true , "receive buffer size" ));
188
- Option flag = new Option ("f" , "flag" , true , "message flag" );
161
+ options .addOption (new Option ("h" , "host" , true , "broker host" ));
162
+ options .addOption (new Option ("p" , "port" , true , "broker port" ));
163
+ options .addOption (new Option ("t" , "type" , true , "exchange type" ));
164
+ options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
165
+ options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
166
+ options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
167
+ options .addOption (new Option ("x" , "producers" , true , "producer count" ));
168
+ options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
169
+ options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
170
+ options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
171
+ options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
172
+ options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
173
+ options .addOption (new Option ("s" , "size" , true , "message size" ));
174
+ options .addOption (new Option ("d" , "redirects" , true , "max redirects" ));
175
+ options .addOption (new Option ("z" , "time" , true , "time limit" ));
176
+ Option flag = new Option ("f" , "flag" , true , "message flag" );
189
177
flag .setArgs (Option .UNLIMITED_VALUES );
190
178
options .addOption (flag );
191
179
return options ;
@@ -373,7 +361,7 @@ public void run() {
373
361
int msgSeq = d .readInt ();
374
362
long msgNano = d .readLong ();
375
363
long nano = System .nanoTime ();
376
-
364
+
377
365
Envelope envelope = delivery .getEnvelope ();
378
366
379
367
if (!autoAck ) {
@@ -434,7 +422,7 @@ private void reset(long t) {
434
422
435
423
public synchronized void collectStats (long now , long latency ) {
436
424
msgCount ++;
437
-
425
+
438
426
if (latency > 0 ) {
439
427
minLatency = Math .min (minLatency , latency );
440
428
maxLatency = Math .max (maxLatency , latency );
@@ -455,9 +443,9 @@ public synchronized void collectStats(long now, long latency) {
455
443
"" ));
456
444
reset (now );
457
445
}
458
-
446
+
459
447
}
460
-
448
+
461
449
}
462
450
463
451
}
0 commit comments