4
4
import com .rabbitmq .client .impl .nio .BlockingQueueNioQueue ;
5
5
import com .rabbitmq .client .impl .nio .DefaultByteBufferFactory ;
6
6
import com .rabbitmq .client .impl .nio .NioParams ;
7
+ import org .assertj .core .api .Condition ;
7
8
import org .junit .After ;
8
9
import org .junit .Before ;
9
10
import org .junit .Test ;
14
15
import java .util .concurrent .*;
15
16
import java .util .concurrent .atomic .AtomicInteger ;
16
17
17
- import static org .hamcrest .Matchers .hasSize ;
18
- import static org .hamcrest .Matchers .isOneOf ;
18
+ import static org .assertj .core .api .Assertions .assertThat ;
19
19
import static org .junit .Assert .assertEquals ;
20
- import static org .junit .Assert .assertThat ;
21
20
import static org .junit .Assert .assertTrue ;
22
21
23
22
/**
@@ -125,19 +124,21 @@ public void shutdownCompleted(ShutdownSignalException cause) {
125
124
public void nioLoopCleaning () throws Exception {
126
125
ConnectionFactory connectionFactory = new ConnectionFactory ();
127
126
connectionFactory .useNio ();
128
- for (int i = 0 ; i < 10 ; i ++) {
127
+ for (int i = 0 ; i < 10 ; i ++) {
129
128
Connection connection = connectionFactory .newConnection ();
130
129
connection .abort ();
131
130
}
132
131
}
133
132
134
- @ Test public void messageSize () throws Exception {
133
+ @ Test
134
+ public void messageSize () throws Exception {
135
135
for (int i = 0 ; i < 50 ; i ++) {
136
136
sendAndVerifyMessage (testConnection , 76390 );
137
137
}
138
138
}
139
139
140
- @ Test public void byteBufferFactory () throws Exception {
140
+ @ Test
141
+ public void byteBufferFactory () throws Exception {
141
142
ConnectionFactory cf = new ConnectionFactory ();
142
143
cf .useNio ();
143
144
int baseCapacity = 32768 ;
@@ -155,12 +156,15 @@ public void nioLoopCleaning() throws Exception {
155
156
sendAndVerifyMessage (c , 100 );
156
157
}
157
158
158
- assertThat (byteBuffers , hasSize (2 ));
159
- assertThat (byteBuffers .get (0 ).capacity (), isOneOf (nioParams .getReadByteBufferSize (), nioParams .getWriteByteBufferSize ()));
160
- assertThat (byteBuffers .get (1 ).capacity (), isOneOf (nioParams .getReadByteBufferSize (), nioParams .getWriteByteBufferSize ()));
159
+ assertThat (byteBuffers ).hasSize (2 );
160
+ Condition <Integer > condition = new Condition <>(c -> c == nioParams .getReadByteBufferSize () ||
161
+ c == nioParams .getWriteByteBufferSize (), "capacity set by factory" );
162
+ assertThat (byteBuffers .get (0 ).capacity ()).is (condition );
163
+ assertThat (byteBuffers .get (1 ).capacity ()).is (condition );
161
164
}
162
165
163
- @ Test public void directByteBuffers () throws Exception {
166
+ @ Test
167
+ public void directByteBuffers () throws Exception {
164
168
ConnectionFactory cf = new ConnectionFactory ();
165
169
cf .useNio ();
166
170
cf .setNioParams (new NioParams ().setByteBufferFactory (new DefaultByteBufferFactory (capacity -> ByteBuffer .allocateDirect (capacity ))));
@@ -169,15 +173,16 @@ public void nioLoopCleaning() throws Exception {
169
173
}
170
174
}
171
175
172
- @ Test public void customWriteQueue () throws Exception {
176
+ @ Test
177
+ public void customWriteQueue () throws Exception {
173
178
ConnectionFactory cf = new ConnectionFactory ();
174
179
cf .useNio ();
175
180
AtomicInteger count = new AtomicInteger (0 );
176
181
cf .setNioParams (new NioParams ().setWriteQueueFactory (ctx -> {
177
182
count .incrementAndGet ();
178
183
return new BlockingQueueNioQueue (
179
- new LinkedBlockingQueue <>(ctx .getNioParams ().getWriteQueueCapacity ()),
180
- ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
184
+ new LinkedBlockingQueue <>(ctx .getNioParams ().getWriteQueueCapacity ()),
185
+ ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
181
186
);
182
187
}));
183
188
try (Connection c = cf .newConnection ()) {
@@ -193,7 +198,7 @@ private void sendAndVerifyMessage(Connection connection, int size) throws Except
193
198
}
194
199
195
200
private Connection basicGetBasicConsume (ConnectionFactory connectionFactory , String queue , final CountDownLatch latch )
196
- throws IOException , TimeoutException {
201
+ throws IOException , TimeoutException {
197
202
Connection connection = connectionFactory .newConnection ();
198
203
Channel channel = connection .createChannel ();
199
204
channel .queueDeclare (queue , false , false , false , null );
@@ -213,7 +218,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
213
218
}
214
219
215
220
private boolean basicGetBasicConsume (Connection connection , String queue , final CountDownLatch latch , int msgSize )
216
- throws Exception {
221
+ throws Exception {
217
222
Channel channel = connection .createChannel ();
218
223
channel .queueDeclare (queue , false , false , false , null );
219
224
channel .queuePurge (queue );
0 commit comments