File tree 4 files changed +24
-2
lines changed
main/java/com/rabbitmq/stream/impl
java/com/rabbitmq/stream/impl
4 files changed +24
-2
lines changed Original file line number Diff line number Diff line change @@ -2824,7 +2824,17 @@ public void channelInactive(ChannelHandlerContext ctx) {
2824
2824
// because it will be handled later anyway.
2825
2825
if (shutdownReason == null ) {
2826
2826
if (closing .compareAndSet (false , true )) {
2827
- executorService .submit (() -> closingSequence (ShutdownReason .UNKNOWN ));
2827
+ if (executorService == null ) {
2828
+ // the TCP connection is closed before the state is initialized
2829
+ // we do our best the execute the closing sequence
2830
+ new Thread (
2831
+ () -> {
2832
+ closingSequence (ShutdownReason .UNKNOWN );
2833
+ })
2834
+ .start ();
2835
+ } else {
2836
+ executorService .submit (() -> closingSequence (ShutdownReason .UNKNOWN ));
2837
+ }
2828
2838
}
2829
2839
}
2830
2840
}
Original file line number Diff line number Diff line change @@ -742,7 +742,7 @@ static <T> T locatorOperation(
742
742
Function <Client , T > operation ,
743
743
Supplier <Client > clientSupplier ,
744
744
BackOffDelayPolicy backOffDelayPolicy ) {
745
- int maxAttempt = 3 ;
745
+ int maxAttempt = 5 ;
746
746
int attempt = 0 ;
747
747
boolean executed = false ;
748
748
Exception lastException = null ;
Original file line number Diff line number Diff line change 23
23
import static java .util .stream .IntStream .range ;
24
24
import static org .assertj .core .api .Assertions .assertThat ;
25
25
26
+ import ch .qos .logback .classic .Level ;
26
27
import com .google .common .util .concurrent .RateLimiter ;
27
28
import com .rabbitmq .stream .*;
28
29
import com .rabbitmq .stream .impl .TestUtils .Sync ;
@@ -57,10 +58,13 @@ public class RecoveryClusterTest {
57
58
TestInfo testInfo ;
58
59
EventLoopGroup eventLoopGroup ;
59
60
EnvironmentBuilder environmentBuilder ;
61
+ static Level producersCoordinatorLogLevel ;
60
62
61
63
@ BeforeAll
62
64
static void initAll () {
63
65
nodes = Cli .nodes ();
66
+ producersCoordinatorLogLevel =
67
+ TestUtils .newLoggerLevel (ProducersCoordinator .class , Level .DEBUG );
64
68
}
65
69
66
70
@ BeforeEach
@@ -82,6 +86,13 @@ void tearDown() {
82
86
}
83
87
}
84
88
89
+ @ AfterAll
90
+ static void tearDownAll () {
91
+ if (producersCoordinatorLogLevel != null ) {
92
+ TestUtils .newLoggerLevel (ProducersCoordinator .class , producersCoordinatorLogLevel );
93
+ }
94
+ }
95
+
85
96
@ ParameterizedTest
86
97
@ CsvSource ({
87
98
"false,false" ,
Original file line number Diff line number Diff line change 7
7
8
8
<logger name =" com.rabbitmq.stream" level =" warn" />
9
9
<logger name =" com.rabbitmq.stream.impl.Utils" level =" warn" />
10
+ <logger name =" com.rabbitmq.stream.impl.StreamEnvironment" level =" warn" />
10
11
<logger name =" com.rabbitmq.stream.impl.ConsumersCoordinator" level =" warn" />
11
12
<logger name =" com.rabbitmq.stream.impl.ProducersCoordinator" level =" warn" />
12
13
<logger name =" com.rabbitmq.stream.impl.RecoveryClusterTest" level =" info" />
You can’t perform that action at this time.
0 commit comments