2
2
3
3
import org .testcontainers .containers .wait .strategy .Wait ;
4
4
import org .testcontainers .containers .wait .strategy .WaitAllStrategy ;
5
- import org .testcontainers .containers .wait .strategy .WaitStrategy ;
6
5
import org .testcontainers .utility .DockerImageName ;
7
6
8
- import java .util .ArrayList ;
9
- import java .util .List ;
10
-
11
7
/**
12
8
* This container wraps Apache Pulsar running in standalone mode
13
9
*/
@@ -36,6 +32,8 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {
36
32
@ Deprecated
37
33
private static final String DEFAULT_TAG = "2.10.0" ;
38
34
35
+ private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy ();
36
+
39
37
private boolean functionsWorkerEnabled = false ;
40
38
41
39
private boolean transactionsEnabled = false ;
@@ -60,6 +58,7 @@ public PulsarContainer(final DockerImageName dockerImageName) {
60
58
super (dockerImageName );
61
59
dockerImageName .assertCompatibleWith (DockerImageName .parse ("apachepulsar/pulsar" ));
62
60
withExposedPorts (BROKER_PORT , BROKER_HTTP_PORT );
61
+ setWaitStrategy (waitAllStrategy );
63
62
}
64
63
65
64
@ Override
@@ -98,21 +97,18 @@ protected void setupCommandAndEnv() {
98
97
99
98
final String clusterName = getEnvMap ().getOrDefault ("PULSAR_PREFIX_clusterName" , "standalone" );
100
99
final String response = String .format ("[\" %s\" ]" , clusterName );
101
-
102
- List <WaitStrategy > waitStrategies = new ArrayList <>();
103
- waitStrategies .add (Wait .defaultWaitStrategy ());
104
- waitStrategies .add (
100
+ waitAllStrategy .withStrategy (
105
101
Wait .forHttp (ADMIN_CLUSTERS_ENDPOINT ).forPort (BROKER_HTTP_PORT ).forResponsePredicate (response ::equals )
106
102
);
103
+
107
104
if (transactionsEnabled ) {
108
105
withEnv ("PULSAR_PREFIX_transactionCoordinatorEnabled" , "true" );
109
- waitStrategies .add (Wait .forHttp (TRANSACTION_TOPIC_ENDPOINT ).forStatusCode (200 ).forPort (BROKER_HTTP_PORT ));
106
+ waitAllStrategy .withStrategy (
107
+ Wait .forHttp (TRANSACTION_TOPIC_ENDPOINT ).forStatusCode (200 ).forPort (BROKER_HTTP_PORT )
108
+ );
110
109
}
111
110
if (functionsWorkerEnabled ) {
112
- waitStrategies . add (Wait .forLogMessage (".*Function worker service started.*" , 1 ));
111
+ waitAllStrategy . withStrategy (Wait .forLogMessage (".*Function worker service started.*" , 1 ));
113
112
}
114
- final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy ();
115
- waitStrategies .forEach (compoundedWaitStrategy ::withStrategy );
116
- waitingFor (compoundedWaitStrategy );
117
113
}
118
114
}
0 commit comments