Skip to content

Commit 155f8d3

Browse files
committed
Merge pull request #43532 from onobc
* pr/43532: Add Pulsar 4.0.x smoke test Closes gh-43532
2 parents a25065e + f1c1291 commit 155f8d3

File tree

7 files changed

+292
-0
lines changed

7 files changed

+292
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
plugins {
2+
id "java"
3+
id "org.springframework.boot.docker-test"
4+
}
5+
6+
description = "Spring Boot Pulsar 4 smoke test"
7+
8+
configurations.all {
9+
resolutionStrategy.eachDependency {
10+
if (it.requested.group == 'org.apache.pulsar' &&
11+
!(it.requested.name.startsWith('pulsar-client-reactive'))) {
12+
it.useVersion '4.0.1'
13+
}
14+
}
15+
}
16+
17+
dependencies {
18+
dockerTestImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test"))
19+
dockerTestImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support-docker"))
20+
dockerTestImplementation(project(":spring-boot-project:spring-boot-testcontainers"))
21+
dockerTestImplementation("org.awaitility:awaitility")
22+
dockerTestImplementation("org.testcontainers:junit-jupiter")
23+
dockerTestImplementation("org.testcontainers:pulsar")
24+
25+
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar"))
26+
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive"))
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package smoketest.pulsar;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.stream.IntStream;
23+
24+
import org.awaitility.Awaitility;
25+
import org.junit.jupiter.api.Nested;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.ExtendWith;
28+
import org.testcontainers.containers.PulsarContainer;
29+
import org.testcontainers.junit.jupiter.Container;
30+
import org.testcontainers.junit.jupiter.Testcontainers;
31+
32+
import org.springframework.boot.test.context.SpringBootTest;
33+
import org.springframework.boot.test.system.CapturedOutput;
34+
import org.springframework.boot.test.system.OutputCaptureExtension;
35+
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
36+
import org.springframework.boot.testsupport.container.TestImage;
37+
import org.springframework.test.context.ActiveProfiles;
38+
39+
import static org.assertj.core.api.Assertions.assertThat;
40+
41+
@Testcontainers(disabledWithoutDocker = true)
42+
@ExtendWith(OutputCaptureExtension.class)
43+
class SamplePulsarApplicationTests {
44+
45+
@Container
46+
@ServiceConnection
47+
static final PulsarContainer pulsar = TestImage.container(PulsarContainer.class);
48+
49+
abstract class PulsarApplication {
50+
51+
private final String type;
52+
53+
PulsarApplication(String type) {
54+
this.type = type;
55+
}
56+
57+
@Test
58+
void appProducesAndConsumesMessages(CapturedOutput output) {
59+
List<String> expectedOutput = new ArrayList<>();
60+
IntStream.range(0, 10).forEachOrdered((i) -> {
61+
expectedOutput.add("++++++PRODUCE %s:(%s)------".formatted(this.type, i));
62+
expectedOutput.add("++++++CONSUME %s:(%s)------".formatted(this.type, i));
63+
});
64+
Awaitility.waitAtMost(Duration.ofSeconds(30))
65+
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
66+
}
67+
68+
}
69+
70+
@Nested
71+
@SpringBootTest
72+
@ActiveProfiles("smoketest.pulsar.imperative")
73+
class ImperativePulsarApplication extends PulsarApplication {
74+
75+
ImperativePulsarApplication() {
76+
super("IMPERATIVE");
77+
}
78+
79+
}
80+
81+
@Nested
82+
@SpringBootTest
83+
@ActiveProfiles("smoketest.pulsar.reactive")
84+
class ReactivePulsarApplication extends PulsarApplication {
85+
86+
ReactivePulsarApplication() {
87+
super("REACTIVE");
88+
}
89+
90+
}
91+
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package smoketest.pulsar;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
22+
import org.springframework.boot.ApplicationRunner;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.context.annotation.Profile;
26+
import org.springframework.pulsar.annotation.PulsarListener;
27+
import org.springframework.pulsar.core.PulsarTemplate;
28+
import org.springframework.pulsar.core.PulsarTopic;
29+
import org.springframework.pulsar.core.PulsarTopicBuilder;
30+
31+
@Configuration(proxyBeanMethods = false)
32+
@Profile("smoketest.pulsar.imperative")
33+
class ImperativeAppConfig {
34+
35+
private static final Log logger = LogFactory.getLog(ImperativeAppConfig.class);
36+
37+
private static final String TOPIC = "pulsar-smoke-test-topic";
38+
39+
@Bean
40+
PulsarTopic pulsarTestTopic() {
41+
return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build();
42+
}
43+
44+
@Bean
45+
ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate<SampleMessage> template) {
46+
return (args) -> {
47+
for (int i = 0; i < 10; i++) {
48+
template.send(TOPIC, new SampleMessage(i, "message:" + i));
49+
logger.info("++++++PRODUCE IMPERATIVE:(" + i + ")------");
50+
}
51+
};
52+
}
53+
54+
@PulsarListener(topics = TOPIC)
55+
void consumeMessagesFromPulsarTopic(SampleMessage msg) {
56+
logger.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------");
57+
}
58+
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package smoketest.pulsar;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
import org.apache.pulsar.reactive.client.api.MessageSpec;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.Mono;
24+
25+
import org.springframework.boot.ApplicationRunner;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.context.annotation.Profile;
29+
import org.springframework.pulsar.core.PulsarTopic;
30+
import org.springframework.pulsar.core.PulsarTopicBuilder;
31+
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
32+
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
33+
34+
@Configuration(proxyBeanMethods = false)
35+
@Profile("smoketest.pulsar.reactive")
36+
class ReactiveAppConfig {
37+
38+
private static final Log logger = LogFactory.getLog(ReactiveAppConfig.class);
39+
40+
private static final String TOPIC = "pulsar-reactive-smoke-test-topic";
41+
42+
@Bean
43+
PulsarTopic pulsarTestTopic() {
44+
return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build();
45+
}
46+
47+
@Bean
48+
ApplicationRunner sendMessagesToPulsarTopic(ReactivePulsarTemplate<SampleMessage> template) {
49+
return (args) -> Flux.range(0, 10)
50+
.map((i) -> new SampleMessage(i, "message:" + i))
51+
.map(MessageSpec::of)
52+
.as((msgs) -> template.send(TOPIC, msgs))
53+
.doOnNext((sendResult) -> logger
54+
.info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------"))
55+
.subscribe();
56+
}
57+
58+
@ReactivePulsarListener(topics = TOPIC)
59+
Mono<Void> consumeMessagesFromPulsarTopic(SampleMessage msg) {
60+
logger.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------");
61+
return Mono.empty();
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package smoketest.pulsar;
18+
19+
record SampleMessage(Integer id, String content) {
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package smoketest.pulsar;
18+
19+
import org.springframework.boot.SpringApplication;
20+
import org.springframework.boot.autoconfigure.SpringBootApplication;
21+
22+
@SpringBootApplication
23+
public class SamplePulsarApplication {
24+
25+
public static void main(String[] args) {
26+
SpringApplication.run(SamplePulsarApplication.class, args);
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
spring.pulsar.consumer.subscription.initial-position=earliest

0 commit comments

Comments
 (0)