Skip to content

Commit f3b1776

Browse files
author
Simon MacMullen
committed
Merged default into bug22965
2 parents 2ac2e62 + 315cd7b commit f3b1776

29 files changed

+685
-229
lines changed

build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ test.javac.out=${build.out}/test/classes
1313
test.src.home=test/src
1414
java-jvm-1.4=c:/Program Files/java/j2re1.4.2_14/bin/java
1515
sibling.codegen.dir=../rabbitmq-codegen/
16-
spec.version=0.8
16+
spec.version=0.9.1
1717
bundle.out=${build.out}/bundle
1818
javadoc.out=build/doc/api
1919
python.bin=python

build.xml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
<pathelement path="${test.javac.out}"/>
2727
</path>
2828

29-
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json ${codegen.dir}/rabbitmq-0.8-extensions.json"/>
30-
29+
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-rabbitmq-${spec.version}.json"/>
30+
3131
<target name="amqp-generate-check" description="check if codegen needs to be run">
3232
<uptodate property="amqp.generate.notRequired">
3333
<srcfiles file="codegen.py"/>
@@ -40,7 +40,7 @@
4040
</compositemapper>
4141
</uptodate>
4242
</target>
43-
43+
4444
<target name="amqp-generate" depends="amqp-generate-check"
4545
unless="amqp.generate.notRequired" description="generate AMQP.java and AMQImpl.java from AMQP spec">
4646
<mkdir dir="${src.generated}/com/rabbitmq/client/"/>
@@ -171,12 +171,12 @@
171171
<arg value="${CLIENT_KEYSTORE_PHRASE}"/>
172172
</exec>
173173
</target>
174-
174+
175175
<target name="remove-client-keystore" if="SSL_AVAILABLE">
176176
<delete file="${CLIENT_KEYSTORE}" failonerror="false"/>
177177
<delete file="${CLIENT_KEYSTORE_EMPTY}" failonerror="false"/>
178178
</target>
179-
179+
180180
<target name="test-prepare">
181181
<property name="haltOnFailureJunit" value="yes" />
182182
<property name="haltOnFailureJava" value="true" />
@@ -318,9 +318,9 @@
318318

319319
<target name="test-suite-prepare">
320320
<property name="haltOnFailureJunit" value="no" />
321-
<property name="haltOnFailureJava" value="false" />
321+
<property name="haltOnFailureJava" value="false" />
322322
</target>
323-
323+
324324
<target name="test-suite" depends="test-suite-prepare, test-suite-run" description="Run all test suites.">
325325
<fail message="Errors occured in tests">
326326
<condition>
@@ -332,7 +332,7 @@
332332
</target>
333333

334334
<target name="test-suite-run" depends="test-client, test-ssl, test-server, test-functional, test-main-silent"/>
335-
335+
336336
<target name="test-client" depends="test-build" description="Run the client test suites.">
337337
<junit printSummary="withOutAndErr"
338338
haltOnFailure="${haltOnFailureJunit}"
@@ -355,7 +355,7 @@
355355
<jvmarg value="-Dkeystore.path=${CLIENT_KEYSTORE}"/>
356356
<jvmarg value="-Dkeystore.empty.path=${CLIENT_KEYSTORE_EMPTY}"/>
357357
<jvmarg value="-Dkeystore.passwd=${CLIENT_KEYSTORE_PHRASE}"/>
358-
358+
359359
<jvmarg value="-Dp12.path=${SSL_CERTS_DIR}/client/keycert.p12"/>
360360
<jvmarg value="-Dp12.passwd=${SSL_P12_PASSWORD}"/>
361361

@@ -479,16 +479,16 @@
479479
<fileset dir="lib">
480480
<include name="**/*.jar"/>
481481
</fileset>
482-
482+
483483
<fileset dir="${lib.out}">
484484
<include name="**/*.jar"/>
485485
</fileset>
486-
486+
487487
<fileset dir="scripts">
488488
<include name="**/*.sh"/>
489489
<include name="**/*.bat"/>
490490
</fileset>
491-
491+
492492
<fileset file="${retrotranslator}/retrotranslator-runtime-1.2.1.jar"/>
493493
<fileset file="${retrotranslator}/backport-util-concurrent-3.0.jar"/>
494494
</copy>

codegen.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def printHeader():
150150
public static class PROTOCOL {"""
151151
print " public static final int MAJOR = %i;" % spec.major
152152
print " public static final int MINOR = %i;" % spec.minor
153+
print " public static final int REVISION = %i;" % spec.revision
153154
print " public static final int PORT = %i;" % spec.port
154155
print " }"
155156

src/com/rabbitmq/client/BasicProperties.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,6 @@ public interface BasicProperties {
114114
*/
115115
public abstract String getAppId();
116116

117-
/**
118-
* Retrieve the value in the clusterId field.
119-
* @return clusterId field, or null if the field has not been set.
120-
*/
121-
public abstract String getClusterId();
122-
123117
/**
124118
* Set the contentType field, or null indicating the field is not set
125119
* @param contentType the value to set the field to
@@ -198,9 +192,4 @@ public interface BasicProperties {
198192
*/
199193
public abstract void setAppId(String appId);
200194

201-
/**
202-
* Set the clusterId field, or null indicating the field is not set
203-
* @param clusterId the value to set the field to
204-
*/
205-
public abstract void setClusterId(String clusterId);
206195
}

src/com/rabbitmq/client/Channel.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,17 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
457457
*/
458458
void basicAck(long deliveryTag, boolean multiple) throws IOException;
459459

460+
/**
461+
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
462+
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
463+
* containing the received message being rejected.
464+
* @see com.rabbitmq.client.AMQP.Basic.Reject
465+
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
466+
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
467+
* @throws java.io.IOException if an error is encountered
468+
*/
469+
void basicReject(long deliveryTag, boolean requeue) throws IOException;
470+
460471
/**
461472
* Start a non-nolocal, non-exclusive consumer, with
462473
* explicit acknowledgements required and a server-generated consumerTag.
@@ -467,7 +478,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
467478
* @see com.rabbitmq.client.AMQP.Basic.Consume
468479
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
469480
* @see #basicAck
470-
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
481+
* @see #basicConsume(String,boolean, String,boolean,boolean, Map, Consumer)
471482
*/
472483
String basicConsume(String queue, Consumer callback) throws IOException;
473484

@@ -481,7 +492,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
481492
* @throws java.io.IOException if an error is encountered
482493
* @see com.rabbitmq.client.AMQP.Basic.Consume
483494
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
484-
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
495+
* @see #basicConsume(String,boolean, String,boolean,boolean, Map, Consumer)
485496
*/
486497
String basicConsume(String queue, boolean noAck, Consumer callback) throws IOException;
487498

@@ -495,7 +506,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
495506
* @throws java.io.IOException if an error is encountered
496507
* @see com.rabbitmq.client.AMQP.Basic.Consume
497508
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
498-
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
509+
* @see #basicConsume(String,boolean, String,boolean,boolean, Map, Consumer)
499510
*/
500511
String basicConsume(String queue, boolean noAck, String consumerTag, Consumer callback) throws IOException;
501512

@@ -513,7 +524,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
513524
* @see com.rabbitmq.client.AMQP.Basic.Consume
514525
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
515526
*/
516-
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Consumer callback) throws IOException;
527+
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> filter, Consumer callback) throws IOException;
517528

518529
/**
519530
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}

src/com/rabbitmq/client/Connection.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,6 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
9898
*/
9999
Map<String, Object> getClientProperties();
100100

101-
/**
102-
* Retrieve the known hosts.
103-
* @return an array of addresses for all hosts that came back in the initial {@link com.rabbitmq.client.AMQP.Connection.OpenOk} open-ok method
104-
*/
105-
Address[] getKnownHosts();
106-
107101
/**
108102
* Retrieve the server properties.
109103
* @return a map of the server properties. This typically includes the product name and version of the server.

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -361,53 +361,25 @@ protected void configureSocket(Socket socket) throws IOException{
361361
socket.setTcpNoDelay(true);
362362
}
363363

364-
private Connection newConnection(Address[] addresses,
365-
int maxRedirects,
366-
Map<Address,Integer> redirectAttempts)
364+
/**
365+
* Create a new broker connection
366+
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
367+
* @return an interface to the connection
368+
* @throws IOException if it encounters a problem
369+
*/
370+
public Connection newConnection(Address[] addrs)
367371
throws IOException
368372
{
369373
IOException lastException = null;
370-
for (Address address : addresses) {
371-
Address[] lastKnownAddresses = new Address[0];
374+
for (Address addr : addrs) {
372375
try {
373-
while(true) {
374-
FrameHandler frameHandler = createFrameHandler(address);
375-
Integer redirectCount = redirectAttempts.get(address);
376-
if (redirectCount == null)
377-
redirectCount = 0;
378-
boolean allowRedirects = redirectCount < maxRedirects;
379-
try {
380-
AMQConnection conn = new AMQConnection(this,
381-
frameHandler);
382-
conn.start(!allowRedirects);
383-
return conn;
384-
} catch (RedirectException e) {
385-
if (!allowRedirects) {
386-
//this should never happen with a well-behaved server
387-
throw new IOException("server ignored 'insist'");
388-
} else {
389-
redirectAttempts.put(address, redirectCount+1);
390-
lastKnownAddresses = e.getKnownAddresses();
391-
address = e.getAddress();
392-
//TODO: we may want to log redirection attempts.
393-
}
394-
}
395-
}
376+
FrameHandler frameHandler = createFrameHandler(addr);
377+
AMQConnection conn = new AMQConnection(this,
378+
frameHandler);
379+
conn.start();
380+
return conn;
396381
} catch (IOException e) {
397382
lastException = e;
398-
if (lastKnownAddresses.length > 0) {
399-
// If there aren't any, don't bother trying, since
400-
// a recursive call with empty lastKnownAddresses
401-
// will cause our lastException to be stomped on
402-
// by an uninformative IOException. See bug 16273.
403-
try {
404-
return newConnection(lastKnownAddresses,
405-
maxRedirects,
406-
redirectAttempts);
407-
} catch (IOException e1) {
408-
lastException = e1;
409-
}
410-
}
411383
}
412384
}
413385

@@ -425,10 +397,7 @@ private Connection newConnection(Address[] addresses,
425397
*/
426398
public Connection newConnection() throws IOException {
427399
return newConnection(new Address[] {
428-
new Address(getHost(), getPort())
429-
},
430-
0,
431-
new HashMap<Address,Integer>());
400+
new Address(getHost(), getPort())});
432401
}
433402

434403

src/com/rabbitmq/client/DefaultConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class DefaultConsumer implements Consumer {
4545

4646
/**
4747
* Constructs a new instance and records its association to the passed-in channel.
48-
* @param channel the channel on which the
48+
* @param channel the channel to which this consumer is attached
4949
*/
5050
public DefaultConsumer(Channel channel) {
5151
_channel = channel;

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.rabbitmq.client.Connection;
4747
import com.rabbitmq.client.ConnectionFactory;
4848
import com.rabbitmq.client.MissedHeartbeatException;
49-
import com.rabbitmq.client.RedirectException;
5049
import com.rabbitmq.client.ShutdownSignalException;
5150
import com.rabbitmq.utility.BlockingCell;
5251
import com.rabbitmq.utility.Utility;
@@ -155,9 +154,6 @@ public void ensureIsOpen()
155154
*/
156155
private int _heartbeat;
157156

158-
/** Hosts retrieved from the connection.open-ok */
159-
private Address[] _knownHosts;
160-
161157
private final String _username, _password, _virtualHost;
162158
private final int _requestedChannelMax, _requestedFrameMax, _requestedHeartbeat;
163159
private final Map<String, Object> _clientProperties;
@@ -175,11 +171,6 @@ public int getPort() {
175171
return _frameHandler.getPort();
176172
}
177173

178-
/** {@inheritDoc} */
179-
public Address[] getKnownHosts() {
180-
return _knownHosts;
181-
}
182-
183174
public FrameHandler getFrameHandler(){
184175
return _frameHandler;
185176
}
@@ -236,12 +227,10 @@ public AMQConnection(ConnectionFactory factory,
236227
* Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
237228
* calls Connection.Open and waits for the OpenOk. Sets heartbeat
238229
* and frame max values after tuning has taken place.
239-
* @param insist true if broker redirects are disallowed
240-
* @throws RedirectException if the server is redirecting us to a different host/port
241230
* @throws java.io.IOException if an error is encountered
242231
*/
243-
public void start(boolean insist)
244-
throws IOException, RedirectException
232+
public void start()
233+
throws IOException
245234
{
246235
// Make sure that the first thing we do is to send the header,
247236
// which should cause any socket errors to show up for us, rather
@@ -316,19 +305,11 @@ public void start(boolean insist)
316305
_channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,
317306
frameMax,
318307
heartbeat));
319-
308+
// 0.9.1: insist [on not being redirected] is deprecated, but
309+
// still in generated code; just pass a dummy value here
320310
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
321311
"",
322-
insist)).getMethod();
323-
if (res instanceof AMQP.Connection.Redirect) {
324-
AMQP.Connection.Redirect redirect = (AMQP.Connection.Redirect) res;
325-
throw new RedirectException(Address.parseAddress(redirect.getHost()),
326-
Address.parseAddresses(redirect.getKnownHosts()));
327-
} else {
328-
AMQP.Connection.OpenOk openOk = (AMQP.Connection.OpenOk) res;
329-
_knownHosts = Address.parseAddresses(openOk.getKnownHosts());
330-
}
331-
312+
false)).getMethod();
332313
return;
333314
}
334315

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7676
* and this field can be deleted.
7777
*/
7878
@Deprecated
79-
private static final int TICKET = 1;
79+
private static final int TICKET = 0;
8080

8181
/**
8282
* Map from consumer tag to {@link Consumer} instance.
@@ -646,6 +646,13 @@ public void basicAck(long deliveryTag, boolean multiple)
646646
transmit(new Basic.Ack(deliveryTag, multiple));
647647
}
648648

649+
/** Public API - {@inheritDoc} */
650+
public void basicReject(long deliveryTag, boolean requeue)
651+
throws IOException
652+
{
653+
transmit(new Basic.Reject(deliveryTag, requeue));
654+
}
655+
649656
/** Public API - {@inheritDoc} */
650657
public String basicConsume(String queue, Consumer callback)
651658
throws IOException
@@ -665,12 +672,12 @@ public String basicConsume(String queue, boolean noAck, String consumerTag,
665672
Consumer callback)
666673
throws IOException
667674
{
668-
return basicConsume(queue, noAck, consumerTag, false, false, callback);
675+
return basicConsume(queue, noAck, consumerTag, false, false, null, callback);
669676
}
670677

671678
/** Public API - {@inheritDoc} */
672679
public String basicConsume(String queue, boolean noAck, String consumerTag,
673-
boolean noLocal, boolean exclusive,
680+
boolean noLocal, boolean exclusive, Map<String, Object> filter,
674681
final Consumer callback)
675682
throws IOException
676683
{
@@ -695,7 +702,7 @@ public String transformReply(AMQCommand replyCommand) {
695702

696703
rpc(new Basic.Consume(TICKET, queue, consumerTag,
697704
noLocal, noAck, exclusive,
698-
false),
705+
false, filter),
699706
k);
700707

701708
try {

0 commit comments

Comments
 (0)