|
19 | 19 | import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
|
20 | 20 | import static org.assertj.core.api.Assertions.assertThat;
|
21 | 21 |
|
| 22 | +import ch.qos.logback.classic.Level; |
22 | 23 | import com.rabbitmq.stream.BackOffDelayPolicy;
|
23 | 24 | import com.rabbitmq.stream.ConfirmationHandler;
|
24 | 25 | import com.rabbitmq.stream.ConfirmationStatus;
|
@@ -333,58 +334,66 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
|
333 | 334 | @ParameterizedTest
|
334 | 335 | @ValueSource(ints = {1, 7})
|
335 | 336 | void producerShouldBeClosedWhenStreamIsDeleted(int subEntrySize, TestInfo info) throws Exception {
|
336 |
| - String s = streamName(info); |
337 |
| - environment.streamCreator().stream(s).create(); |
338 |
| - |
339 |
| - StreamProducer producer = |
340 |
| - (StreamProducer) environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build(); |
341 |
| - |
342 |
| - AtomicInteger published = new AtomicInteger(0); |
343 |
| - AtomicInteger confirmed = new AtomicInteger(0); |
344 |
| - AtomicInteger errored = new AtomicInteger(0); |
345 |
| - Set<Number> errorCodes = ConcurrentHashMap.newKeySet(); |
346 |
| - |
347 |
| - short lastExpectedErrorCode = Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST; |
348 |
| - AtomicBoolean continuePublishing = new AtomicBoolean(true); |
349 |
| - Thread publishThread = |
350 |
| - new Thread( |
351 |
| - () -> { |
352 |
| - ConfirmationHandler confirmationHandler = |
353 |
| - confirmationStatus -> { |
354 |
| - if (confirmationStatus.isConfirmed()) { |
355 |
| - confirmed.incrementAndGet(); |
356 |
| - } else { |
357 |
| - errored.incrementAndGet(); |
358 |
| - errorCodes.add(confirmationStatus.getCode()); |
359 |
| - if (confirmationStatus.getCode() == lastExpectedErrorCode) { |
360 |
| - continuePublishing.set(false); |
| 337 | + Level initialLogLevel = TestUtils.newLoggerLevel(ProducersCoordinator.class, Level.DEBUG); |
| 338 | + try { |
| 339 | + String s = streamName(info); |
| 340 | + environment.streamCreator().stream(s).create(); |
| 341 | + |
| 342 | + StreamProducer producer = |
| 343 | + (StreamProducer) |
| 344 | + environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build(); |
| 345 | + |
| 346 | + AtomicInteger published = new AtomicInteger(0); |
| 347 | + AtomicInteger confirmed = new AtomicInteger(0); |
| 348 | + AtomicInteger errored = new AtomicInteger(0); |
| 349 | + Set<Number> errorCodes = ConcurrentHashMap.newKeySet(); |
| 350 | + |
| 351 | + short lastExpectedErrorCode = Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST; |
| 352 | + AtomicBoolean continuePublishing = new AtomicBoolean(true); |
| 353 | + Thread publishThread = |
| 354 | + new Thread( |
| 355 | + () -> { |
| 356 | + ConfirmationHandler confirmationHandler = |
| 357 | + confirmationStatus -> { |
| 358 | + if (confirmationStatus.isConfirmed()) { |
| 359 | + confirmed.incrementAndGet(); |
| 360 | + } else { |
| 361 | + errored.incrementAndGet(); |
| 362 | + errorCodes.add(confirmationStatus.getCode()); |
| 363 | + if (confirmationStatus.getCode() == lastExpectedErrorCode) { |
| 364 | + continuePublishing.set(false); |
| 365 | + } |
361 | 366 | }
|
362 |
| - } |
363 |
| - }; |
364 |
| - while (continuePublishing.get()) { |
365 |
| - try { |
366 |
| - producer.send( |
367 |
| - producer |
368 |
| - .messageBuilder() |
369 |
| - .addData("".getBytes(StandardCharsets.UTF_8)) |
370 |
| - .build(), |
371 |
| - confirmationHandler); |
372 |
| - published.incrementAndGet(); |
373 |
| - } catch (StreamException e) { |
374 |
| - // OK |
| 367 | + }; |
| 368 | + while (continuePublishing.get()) { |
| 369 | + try { |
| 370 | + producer.send( |
| 371 | + producer |
| 372 | + .messageBuilder() |
| 373 | + .addData("".getBytes(StandardCharsets.UTF_8)) |
| 374 | + .build(), |
| 375 | + confirmationHandler); |
| 376 | + published.incrementAndGet(); |
| 377 | + } catch (StreamException e) { |
| 378 | + // OK |
| 379 | + } |
375 | 380 | }
|
376 |
| - } |
377 |
| - }); |
378 |
| - publishThread.start(); |
| 381 | + }); |
| 382 | + publishThread.start(); |
379 | 383 |
|
380 |
| - Thread.sleep(1000L); |
| 384 | + Thread.sleep(1000L); |
381 | 385 |
|
382 |
| - assertThat(producer.isOpen()).isTrue(); |
| 386 | + assertThat(producer.isOpen()).isTrue(); |
383 | 387 |
|
384 |
| - environment.deleteStream(s); |
| 388 | + environment.deleteStream(s); |
385 | 389 |
|
386 |
| - waitAtMost(() -> !producer.isOpen()); |
387 |
| - waitAtMost(() -> errorCodes.contains(lastExpectedErrorCode)); |
| 390 | + waitAtMost(() -> !producer.isOpen()); |
| 391 | + waitAtMost( |
| 392 | + () -> errorCodes.contains(lastExpectedErrorCode), |
| 393 | + () -> errorCodes + " should contain " + lastExpectedErrorCode); |
| 394 | + } finally { |
| 395 | + TestUtils.newLoggerLevel(ProducersCoordinator.class, initialLogLevel); |
| 396 | + } |
388 | 397 | }
|
389 | 398 |
|
390 | 399 | @ParameterizedTest
|
|
0 commit comments