43
43
import com .rabbitmq .client .impl .AMQImpl ;
44
44
import com .rabbitmq .client .impl .Frame ;
45
45
import com .rabbitmq .utility .BlockingCell ;
46
+ import com .rabbitmq .utility .Utility ;
46
47
47
48
48
49
/**
52
53
* printed to stdout.
53
54
*/
54
55
public class Tracer implements Runnable {
56
+ private static boolean property (String property ) {
57
+ return Boolean .parseBoolean (System .getProperty (
58
+ "com.rabbitmq.tools.Tracer." + property ));
59
+ }
60
+
55
61
public static final boolean WITHHOLD_INBOUND_HEARTBEATS =
56
- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. WITHHOLD_INBOUND_HEARTBEATS") );
62
+ property ( " WITHHOLD_INBOUND_HEARTBEATS" );
57
63
public static final boolean WITHHOLD_OUTBOUND_HEARTBEATS =
58
- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. WITHHOLD_OUTBOUND_HEARTBEATS") );
64
+ property ( " WITHHOLD_OUTBOUND_HEARTBEATS" );
59
65
public static final boolean NO_ASSEMBLE_FRAMES =
60
- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. NO_ASSEMBLE_FRAMES") );
66
+ property ( " NO_ASSEMBLE_FRAMES" );
61
67
public static final boolean NO_DECODE_FRAMES =
62
- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. NO_DECODE_FRAMES") );
68
+ property ( " NO_DECODE_FRAMES" );
63
69
public static final boolean SUPPRESS_COMMAND_BODIES =
64
- Boolean .parseBoolean (System .getProperty ("com.rabbitmq.tools.Tracer.SUPPRESS_COMMAND_BODIES" ));
65
-
70
+ property ("SUPPRESS_COMMAND_BODIES" );
66
71
public static final boolean SILENT_MODE =
67
- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. SILENT_MODE") );
72
+ property ( " SILENT_MODE" );
68
73
69
74
final static int LOG_QUEUE_SIZE = 1024 * 1024 ;
70
75
final static int BUFFER_SIZE = 10 * 1024 * 1024 ;
71
76
final static int MAX_TIME_BETWEEN_FLUSHES = 1000 ;
72
77
final static Object FLUSH = new Object ();
73
78
74
- private static class AsyncLogger extends Thread {
79
+ private static class AsyncLogger extends Thread {
75
80
final PrintStream ps ;
76
81
final BlockingQueue <Object > queue = new ArrayBlockingQueue <Object >(LOG_QUEUE_SIZE , true );
77
- AsyncLogger (PrintStream ps ){
82
+ AsyncLogger (PrintStream ps ) {
78
83
this .ps = new PrintStream (new BufferedOutputStream (ps , BUFFER_SIZE ), false );
79
84
start ();
80
85
81
- new Thread (){
82
- @ Override public void run (){
83
- while (true ){
86
+ new Thread () {
87
+ @ Override public void run () {
88
+ while (true ) {
84
89
try {
85
90
Thread .sleep (MAX_TIME_BETWEEN_FLUSHES );
86
91
queue .add (FLUSH );
@@ -91,31 +96,21 @@ private static class AsyncLogger extends Thread{
91
96
}.start ();
92
97
}
93
98
94
- void printMessage (Object message ){
95
- if (message instanceof Throwable ){
96
- ((Throwable )message ).printStackTrace (ps );
97
- } else if (message instanceof String ){
98
- ps .println (message );
99
- } else {
100
- throw new RuntimeException ("Unrecognised object " + message );
101
- }
102
- }
103
-
104
- @ Override public void run (){
99
+ @ Override public void run () {
105
100
try {
106
- while (true ){
101
+ while (true ) {
107
102
Object message = queue .take ();
108
103
if (message == FLUSH ) ps .flush ();
109
- else printMessage (message );
104
+ else ps . println (message );
110
105
}
111
- } catch (InterruptedException interrupt ){
106
+ } catch (InterruptedException interrupt ) {
112
107
}
113
108
}
114
109
115
- void log (Object message ){
110
+ void log (String message ) {
116
111
try {
117
112
queue .put (message );
118
- } catch (InterruptedException ex ){
113
+ } catch (InterruptedException ex ) {
119
114
throw new RuntimeException (ex );
120
115
}
121
116
}
@@ -196,22 +191,31 @@ public void run() {
196
191
new Thread (outHandler ).start ();
197
192
Object result = w .uninterruptibleGet ();
198
193
if (result instanceof Exception ) {
199
- logger . log ( result );
194
+ logException (( Exception ) result );
200
195
}
201
196
} catch (EOFException eofe ) {
202
- logger . log ( eofe );
197
+ logException (( Exception ) eofe );
203
198
} catch (IOException ioe ) {
204
- logger . log ( ioe );
199
+ logException (( Exception ) ioe );
205
200
} finally {
206
201
try {
207
202
inSock .close ();
208
203
outSock .close ();
209
204
} catch (IOException ioe2 ) {
210
- logger . log ( ioe2 );
205
+ logException (( Exception ) ioe2 );
211
206
}
212
207
}
213
208
}
214
209
210
+ public void log (String message ) {
211
+ logger .log ("" + System .currentTimeMillis () + ": conn#"
212
+ + id + " " + message );
213
+ }
214
+
215
+ public void logException (Exception e ) {
216
+ log ("uncaught " + Utility .makeStackTrace (e ));
217
+ }
218
+
215
219
public class DirectionHandler implements Runnable {
216
220
public BlockingCell <Object > waitCell ;
217
221
@@ -235,7 +239,9 @@ public Frame readFrame() throws IOException {
235
239
}
236
240
237
241
public void report (int channel , Object object ) {
238
- logger .log ("" + System .currentTimeMillis () + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- " ) + object );
242
+ Tracer .this .log ("ch#" + channel
243
+ + (inBound ? " -> " : " <- " )
244
+ + object );
239
245
}
240
246
241
247
public void reportFrame (Frame f )
@@ -261,12 +267,13 @@ public void reportFrame(Frame f)
261
267
}
262
268
}
263
269
270
+
264
271
public void doFrame () throws IOException {
265
272
Frame f = readFrame ();
266
273
267
274
if (f != null ) {
268
275
269
- if (SILENT_MODE ){
276
+ if (SILENT_MODE ) {
270
277
f .writeTo (o );
271
278
return ;
272
279
}
@@ -289,14 +296,14 @@ public void doFrame() throws IOException {
289
296
}
290
297
} else {
291
298
AMQCommand .Assembler c = assemblers .get (f .channel );
292
- if (c == null ){
299
+ if (c == null ) {
293
300
c = AMQCommand .newAssembler ();
294
301
assemblers .put (f .channel , c );
295
302
}
296
303
AMQCommand cmd = c .handleFrame (f );
297
304
if (cmd != null ) {
298
305
report (f .channel , cmd .toString (SUPPRESS_COMMAND_BODIES ));
299
- assemblers .remove (f .channel );
306
+ assemblers .remove (f .channel );
300
307
}
301
308
}
302
309
}
0 commit comments