Skip to content

Commit 1bd037a

Browse files
author
Simon MacMullen
committed
Merge bug25056 (very late!)
2 parents ed623f6 + 0136793 commit 1bd037a

38 files changed

+1302
-652
lines changed

build.xml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@
308308
</fail>
309309
</target>
310310

311-
<target name="test-suite-run" depends="test-client, test-ssl, test-server, test-functional, test-main-silent"/>
311+
<target name="test-suite-run" depends="test-client, test-ssl, test-server, test-functional, test-functional-and-server-with-ha, test-main-silent"/>
312312

313313
<target name="test-client" depends="test-build" description="Run the client test suites.">
314314
<junit printSummary="withOutAndErr"
@@ -369,6 +369,19 @@
369369
</junit>
370370
</target>
371371

372+
<!-- TODO: merge test-server, test-functional and this into one, once umbrellas have been merged -->
373+
<target name="test-functional-and-server-with-ha" depends="detect-umbrella, test-build" if="UMBRELLA_AVAILABLE">
374+
<junit printSummary="withOutAndErr"
375+
haltOnFailure="${haltOnFailureJunit}"
376+
failureproperty="test.failure"
377+
fork="yes">
378+
<classpath refid="test.classpath"/>
379+
<formatter type="plain"/>
380+
<formatter type="xml"/>
381+
<test todir="${build.out}" name="com.rabbitmq.client.test.server.HATests"/>
382+
</junit>
383+
</target>
384+
372385
<target name="test-single" depends="test-build">
373386
<junit printSummary="withOutAndErr"
374387
haltOnFailure="${haltOnFailureJunit}"
@@ -491,4 +504,3 @@
491504
</bundlor:bundlor>
492505
</target>
493506
</project>
494-

pom.xml

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,8 @@
2929

3030
<developers>
3131
<developer>
32-
<id>matthias.radestock</id>
33-
<name>Matthias Radestock</name>
34-
<roles>
35-
<role>Developer</role>
36-
</roles>
37-
</developer>
38-
<developer>
39-
<id>tonyg</id>
40-
<name>Tony Garnock-Jones</name>
41-
<roles>
42-
<role>Developer</role>
43-
</roles>
44-
</developer>
45-
<developer>
46-
<id>matthew.sackman</id>
47-
<name>Matthias Sackman</name>
48-
<roles>
49-
<role>Developer</role>
50-
</roles>
51-
</developer>
52-
<developer>
53-
<id>david.maciver</id>
54-
<name>David MacIver</name>
55-
<roles>
56-
<role>Developer</role>
57-
</roles>
58-
</developer>
59-
<developer>
60-
<id>paul.jones</id>
61-
<name>Paul Jones</name>
32+
<id>rabbitmq.team</id>
33+
<name>The RabbitMQ Team</name>
6234
<roles>
6335
<role>Developer</role>
6436
</roles>

src/com/rabbitmq/client/Channel.java

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,17 @@
4545
* </ul>
4646
* <p>
4747
*
48-
* While a Channel can be used by multiple threads, it's important to ensure
49-
* that only one thread executes a command at once. Concurrent execution of
50-
* commands will likely cause an UnexpectedFrameError to be thrown.
48+
* <p>
49+
* {@link Channel} instances are safe for use by multiple
50+
* threads. Requests into a {@link Channel} are serialized, with only one
51+
* thread running commands at a time.
52+
* As such, applications may prefer using a {@link Channel} per thread
53+
* instead of sharing the same <code>Channel</code> across multiple threads.
54+
*
55+
* An <b>important caveat</b> to this is that confirms are <b>not</b> handled
56+
* properly when a {@link Channel} is shared between multiple threads. In that
57+
* scenario, it is therefore important to ensure that the {@link Channel}
58+
* instance is <b>not</b> accessed concurrently by multiple threads.
5159
*
5260
*/
5361

@@ -231,7 +239,7 @@ public interface Channel extends ShutdownNotifier {
231239
void basicQos(int prefetchCount) throws IOException;
232240

233241
/**
234-
* Publish a message with both "mandatory" and "immediate" flags set to false
242+
* Publish a message
235243
* @see com.rabbitmq.client.AMQP.Basic.Publish
236244
* @param exchange the exchange to publish the message to
237245
* @param routingKey the routing key
@@ -246,8 +254,22 @@ public interface Channel extends ShutdownNotifier {
246254
* @see com.rabbitmq.client.AMQP.Basic.Publish
247255
* @param exchange the exchange to publish the message to
248256
* @param routingKey the routing key
249-
* @param mandatory true if we are requesting a mandatory publish
250-
* @param immediate true if we are requesting an immediate publish
257+
* @param mandatory true if the 'mandatory' flag is to be set
258+
* @param props other properties for the message - routing headers etc
259+
* @param body the message body
260+
* @throws java.io.IOException if an error is encountered
261+
*/
262+
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
263+
throws IOException;
264+
265+
/**
266+
* Publish a message
267+
* @see com.rabbitmq.client.AMQP.Basic.Publish
268+
* @param exchange the exchange to publish the message to
269+
* @param routingKey the routing key
270+
* @param mandatory true if the 'mandatory' flag is to be set
271+
* @param immediate true if the 'immediate' flag is to be
272+
* set. Note that the RabbitMQ server does not support this flag.
251273
* @param props other properties for the message - routing headers etc
252274
* @param body the message body
253275
* @throws java.io.IOException if an error is encountered
@@ -347,9 +369,9 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
347369
* Bind an exchange to an exchange, with no extra arguments.
348370
* @see com.rabbitmq.client.AMQP.Exchange.Bind
349371
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
350-
* @param destination: the name of the exchange to which messages flow across the binding
351-
* @param source: the name of the exchange from which messages flow across the binding
352-
* @param routingKey: the routine key to use for the binding
372+
* @param destination the name of the exchange to which messages flow across the binding
373+
* @param source the name of the exchange from which messages flow across the binding
374+
* @param routingKey the routine key to use for the binding
353375
* @return a binding-confirm method if the binding was successfully created
354376
* @throws java.io.IOException if an error is encountered
355377
*/
@@ -359,10 +381,10 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
359381
* Bind an exchange to an exchange.
360382
* @see com.rabbitmq.client.AMQP.Exchange.Bind
361383
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
362-
* @param destination: the name of the exchange to which messages flow across the binding
363-
* @param source: the name of the exchange from which messages flow across the binding
364-
* @param routingKey: the routine key to use for the binding
365-
* @param arguments: other properties (binding parameters)
384+
* @param destination the name of the exchange to which messages flow across the binding
385+
* @param source the name of the exchange from which messages flow across the binding
386+
* @param routingKey the routine key to use for the binding
387+
* @param arguments other properties (binding parameters)
366388
* @return a binding-confirm method if the binding was successfully created
367389
* @throws java.io.IOException if an error is encountered
368390
*/
@@ -372,9 +394,9 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
372394
* Unbind an exchange from an exchange, with no extra arguments.
373395
* @see com.rabbitmq.client.AMQP.Exchange.Bind
374396
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
375-
* @param destination: the name of the exchange to which messages flow across the binding
376-
* @param source: the name of the exchange from which messages flow across the binding
377-
* @param routingKey: the routine key to use for the binding
397+
* @param destination the name of the exchange to which messages flow across the binding
398+
* @param source the name of the exchange from which messages flow across the binding
399+
* @param routingKey the routine key to use for the binding
378400
* @return a binding-confirm method if the binding was successfully created
379401
* @throws java.io.IOException if an error is encountered
380402
*/
@@ -384,10 +406,10 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
384406
* Unbind an exchange from an exchange.
385407
* @see com.rabbitmq.client.AMQP.Exchange.Bind
386408
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
387-
* @param destination: the name of the exchange to which messages flow across the binding
388-
* @param source: the name of the exchange from which messages flow across the binding
389-
* @param routingKey: the routine key to use for the binding
390-
* @param arguments: other properties (binding parameters)
409+
* @param destination the name of the exchange to which messages flow across the binding
410+
* @param source the name of the exchange from which messages flow across the binding
411+
* @param routingKey the routine key to use for the binding
412+
* @param arguments other properties (binding parameters)
391413
* @return a binding-confirm method if the binding was successfully created
392414
* @throws java.io.IOException if an error is encountered
393415
*/

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ public void setClientProperties(Map<String, Object> clientProperties) {
350350

351351
/**
352352
* Gets the sasl config to use when authenticating
353-
* @return
353+
* @return the sasl config
354354
* @see com.rabbitmq.client.SaslConfig
355355
*/
356356
public SaslConfig getSaslConfig() {

src/com/rabbitmq/client/SaslMechanism.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
public interface SaslMechanism {
2626
/**
2727
* The name of this mechanism (e.g. PLAIN)
28-
* @return
28+
* @return the name
2929
*/
3030
String getName();
3131

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@
3232
*/
3333
public class AMQCommand implements Command {
3434

35-
/** EMPTY_CONTENT_BODY_FRAME_SIZE = 8 = 1 + 2 + 4 + 1
35+
/** EMPTY_FRAME_SIZE = 8 = 1 + 2 + 4 + 1
3636
* <ul><li>1 byte of frame type</li>
3737
* <li>2 bytes of channel number</li>
3838
* <li>4 bytes of frame payload length</li>
3939
* <li>1 byte of payload trailer FRAME_END byte</li></ul>
40-
* See {@link #checkEmptyContentBodyFrameSize}, an assertion
41-
* checked at startup.
40+
* See {@link #checkEmptyFrameSize}, an assertion checked at
41+
* startup.
4242
*/
43-
private static final int EMPTY_CONTENT_BODY_FRAME_SIZE = 8;
43+
public static final int EMPTY_FRAME_SIZE = 8;
4444

4545
/** The assembler for this command - synchronised on - contains all the state */
4646
private final CommandAssembler assembler;
@@ -108,7 +108,7 @@ public void transmit(AMQChannel channel) throws IOException {
108108

109109
int frameMax = connection.getFrameMax();
110110
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
111-
- EMPTY_CONTENT_BODY_FRAME_SIZE;
111+
- EMPTY_FRAME_SIZE;
112112

113113
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
114114
int remaining = body.length - offset;
@@ -157,26 +157,26 @@ private static StringBuilder contentBodyStringBuilder(byte[] body, boolean suppr
157157

158158
/** Called to check internal code assumptions. */
159159
public static void checkPreconditions() {
160-
checkEmptyContentBodyFrameSize();
160+
checkEmptyFrameSize();
161161
}
162162

163163
/**
164-
* Since we're using a pre-computed value for
165-
* EMPTY_CONTENT_BODY_FRAME_SIZE we check this is
166-
* actually correct when run against the framing code in Frame.
164+
* Since we're using a pre-computed value for EMPTY_FRAME_SIZE we
165+
* check this is actually correct when run against the framing
166+
* code in Frame.
167167
*/
168-
private static void checkEmptyContentBodyFrameSize() {
168+
private static void checkEmptyFrameSize() {
169169
Frame f = new Frame(AMQP.FRAME_BODY, 0, new byte[0]);
170170
ByteArrayOutputStream s = new ByteArrayOutputStream();
171171
try {
172172
f.writeTo(new DataOutputStream(s));
173173
} catch (IOException ioe) {
174-
throw new AssertionError("IOException while checking EMPTY_CONTENT_BODY_FRAME_SIZE");
174+
throw new AssertionError("IOException while checking EMPTY_FRAME_SIZE");
175175
}
176176
int actualLength = s.toByteArray().length;
177-
if (EMPTY_CONTENT_BODY_FRAME_SIZE != actualLength) {
178-
throw new AssertionError("Internal error: expected EMPTY_CONTENT_BODY_FRAME_SIZE("
179-
+ EMPTY_CONTENT_BODY_FRAME_SIZE
177+
if (EMPTY_FRAME_SIZE != actualLength) {
178+
throw new AssertionError("Internal error: expected EMPTY_FRAME_SIZE("
179+
+ EMPTY_FRAME_SIZE
180180
+ ") is not equal to computed value: " + actualLength);
181181
}
182182
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.rabbitmq.client.SaslConfig;
4242
import com.rabbitmq.client.SaslMechanism;
4343
import com.rabbitmq.client.ShutdownSignalException;
44+
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4445
import com.rabbitmq.utility.BlockingCell;
4546
import com.rabbitmq.utility.Utility;
4647

@@ -663,6 +664,16 @@ public ShutdownSignalException shutdown(Object reason,
663664
boolean initiatedByApplication,
664665
Throwable cause,
665666
boolean notifyRpc)
667+
{
668+
ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, notifyRpc);
669+
finishShutdown(sse);
670+
return sse;
671+
}
672+
673+
private ShutdownSignalException startShutdown(Object reason,
674+
boolean initiatedByApplication,
675+
Throwable cause,
676+
boolean notifyRpc)
666677
{
667678
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
668679
reason, this);
@@ -677,10 +688,12 @@ public ShutdownSignalException shutdown(Object reason,
677688

678689
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
679690

691+
return sse;
692+
}
693+
694+
private void finishShutdown(ShutdownSignalException sse) {
680695
ChannelManager cm = _channelManager;
681696
if (cm != null) cm.handleSignal(sse);
682-
683-
return sse;
684697
}
685698

686699
/** Public API - {@inheritDoc} */
@@ -775,10 +788,15 @@ public void close(int closeCode,
775788
.replyText(closeMessage)
776789
.build();
777790

778-
shutdown(reason, initiatedByApplication, cause, true);
791+
final ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, true);
779792
if(sync){
780-
AMQChannel.SimpleBlockingRpcContinuation k =
781-
new AMQChannel.SimpleBlockingRpcContinuation();
793+
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
794+
@Override
795+
public AMQCommand transformReply(AMQCommand command) {
796+
AMQConnection.this.finishShutdown(sse);
797+
return command;
798+
}};
799+
782800
_channel0.quiescingRpc(reason, k);
783801
k.getReply(timeout);
784802
} else {

0 commit comments

Comments
 (0)