@@ -68,6 +68,7 @@ public static void main(String[] args) {
68
68
int portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
69
69
String exchangeType = strArg (cmd , 't' , "direct" );
70
70
String exchangeName = strArg (cmd , 'e' , exchangeType );
71
+ String queueName = strArg (cmd , 'u' , "" );
71
72
int samplingInterval = intArg (cmd , 'i' , 1 );
72
73
int rateLimit = intArg (cmd , 'r' , 0 );
73
74
int producerCount = intArg (cmd , 'x' , 1 );
@@ -83,6 +84,9 @@ public static void main(String[] args) {
83
84
int frameMax = intArg (cmd , 'M' , 0 );
84
85
int heartbeat = intArg (cmd , 'b' , 0 );
85
86
87
+ boolean exclusive = "" .equals (queueName );
88
+ boolean autoDelete = !exclusive ;
89
+
86
90
//setup
87
91
String id = UUID .randomUUID ().toString ();
88
92
Stats stats = new Stats (1000L * samplingInterval );
@@ -101,13 +105,15 @@ public static void main(String[] args) {
101
105
Channel channel = conn .createChannel ();
102
106
if (consumerTxSize > 0 ) channel .txSelect ();
103
107
channel .exchangeDeclare (exchangeName , exchangeType );
104
- String queueName =
105
- channel .queueDeclare ("" , flags .contains ("persistent" ),
106
- true , false , null ).getQueue ();
108
+ String qName =
109
+ channel .queueDeclare (queueName ,
110
+ flags .contains ("persistent" ),
111
+ exclusive , autoDelete ,
112
+ null ).getQueue ();
107
113
QueueingConsumer consumer = new QueueingConsumer (channel );
108
114
if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
109
- channel .basicConsume (queueName , autoAck , consumer );
110
- channel .queueBind (queueName , exchangeName , id );
115
+ channel .basicConsume (qName , autoAck , consumer );
116
+ channel .queueBind (qName , exchangeName , id );
111
117
Thread t =
112
118
new Thread (new Consumer (consumer , id ,
113
119
consumerTxSize , autoAck ,
@@ -174,6 +180,7 @@ private static Options getOptions() {
174
180
options .addOption (new Option ("p" , "port" , true , "broker port" ));
175
181
options .addOption (new Option ("t" , "type" , true , "exchange type" ));
176
182
options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
183
+ options .addOption (new Option ("u" , "queue" , true , "queue name" ));
177
184
options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
178
185
options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
179
186
options .addOption (new Option ("x" , "producers" , true , "producer count" ));
0 commit comments