Skip to content

Commit 0516808

Browse files
committed
Add test suite for virtual threads support
Related to #4399
1 parent fe479fb commit 0516808

7 files changed

+733
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.sample.loom;
17+
18+
import java.util.Arrays;
19+
import java.util.concurrent.Future;
20+
21+
import javax.sql.DataSource;
22+
23+
import org.springframework.batch.core.Job;
24+
import org.springframework.batch.core.Step;
25+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
26+
import org.springframework.batch.core.job.builder.JobBuilder;
27+
import org.springframework.batch.core.repository.JobRepository;
28+
import org.springframework.batch.core.step.builder.StepBuilder;
29+
import org.springframework.batch.integration.async.AsyncItemProcessor;
30+
import org.springframework.batch.integration.async.AsyncItemWriter;
31+
import org.springframework.batch.item.ItemReader;
32+
import org.springframework.batch.item.support.ListItemReader;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.core.task.VirtualThreadTaskExecutor;
36+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
37+
import org.springframework.jdbc.support.JdbcTransactionManager;
38+
39+
/**
40+
* Configuration class that defines a chunk-oriented step with an asynchronous item
41+
* processor based on a {@link VirtualThreadTaskExecutor}.
42+
*
43+
* @author Mahmoud Ben Hassine
44+
*/
45+
@Configuration
46+
@EnableBatchProcessing
47+
public class JobConfigurationForAsynchronousItemProcessingWithVirtualThreads {
48+
49+
@Bean
50+
public ItemReader<Integer> itemReader() {
51+
return new ListItemReader<>(Arrays.asList(0, 1, 2, 3, 4, 5));
52+
}
53+
54+
@Bean
55+
public AsyncItemProcessor<Integer, Integer> itemProcessor() {
56+
AsyncItemProcessor<Integer, Integer> asyncItemProcessor = new AsyncItemProcessor<>();
57+
asyncItemProcessor.setDelegate(item -> {
58+
System.out.println(Thread.currentThread() + ": processing item " + item);
59+
return item + 1;
60+
});
61+
asyncItemProcessor.setTaskExecutor(new VirtualThreadTaskExecutor("spring-batch-"));
62+
return asyncItemProcessor;
63+
}
64+
65+
@Bean
66+
public AsyncItemWriter<Integer> itemWriter() {
67+
AsyncItemWriter<Integer> asyncItemWriter = new AsyncItemWriter<>();
68+
asyncItemWriter.setDelegate(items -> {
69+
for (Integer item : items) {
70+
System.out.println(Thread.currentThread() + ": writing item " + item);
71+
}
72+
});
73+
return asyncItemWriter;
74+
}
75+
76+
@Bean
77+
public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager,
78+
ItemReader<Integer> itemReader, AsyncItemProcessor<Integer, Integer> itemProcessor,
79+
AsyncItemWriter<Integer> itemWriter) {
80+
Step step = new StepBuilder("step", jobRepository).<Integer, Future<Integer>>chunk(2, transactionManager)
81+
.reader(itemReader)
82+
.processor(itemProcessor)
83+
.writer(itemWriter)
84+
.build();
85+
return new JobBuilder("job", jobRepository).start(step).build();
86+
}
87+
88+
@Bean
89+
public DataSource dataSource() {
90+
return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-hsqldb.sql")
91+
.generateUniqueName(true)
92+
.build();
93+
}
94+
95+
@Bean
96+
public JdbcTransactionManager transactionManager(DataSource dataSource) {
97+
return new JdbcTransactionManager(dataSource);
98+
}
99+
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.sample.loom;
17+
18+
import javax.sql.DataSource;
19+
20+
import org.springframework.batch.core.Job;
21+
import org.springframework.batch.core.Step;
22+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
23+
import org.springframework.batch.core.job.builder.JobBuilder;
24+
import org.springframework.batch.core.launch.JobLauncher;
25+
import org.springframework.batch.core.repository.JobRepository;
26+
import org.springframework.batch.core.step.builder.StepBuilder;
27+
import org.springframework.batch.repeat.RepeatStatus;
28+
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.core.task.TaskExecutor;
31+
import org.springframework.core.task.VirtualThreadTaskExecutor;
32+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
33+
import org.springframework.jdbc.support.JdbcTransactionManager;
34+
35+
/**
36+
* Configuration class that defines a {@link JobLauncher} based on a
37+
* {@link VirtualThreadTaskExecutor}.
38+
*
39+
* @author Mahmoud Ben Hassine
40+
*/
41+
@Configuration
42+
@EnableBatchProcessing
43+
public class JobConfigurationForLaunchingJobsWithVirtualThreads {
44+
45+
@Bean
46+
public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
47+
Step step = new StepBuilder("step", jobRepository).tasklet((contribution, chunkContext) -> {
48+
String message = Thread.currentThread() + ": Hello virtual threads world!";
49+
contribution.getStepExecution().getJobExecution().getExecutionContext().put("message", message);
50+
return RepeatStatus.FINISHED;
51+
}, transactionManager).build();
52+
return new JobBuilder("job", jobRepository).start(step).build();
53+
}
54+
55+
@Bean
56+
public DataSource dataSource() {
57+
return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-hsqldb.sql")
58+
.generateUniqueName(true)
59+
.build();
60+
}
61+
62+
@Bean
63+
public JdbcTransactionManager transactionManager(DataSource dataSource) {
64+
return new JdbcTransactionManager(dataSource);
65+
}
66+
67+
@Bean
68+
public TaskExecutor taskExecutor() {
69+
return new VirtualThreadTaskExecutor("spring-batch-");
70+
}
71+
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.sample.loom;
17+
18+
import java.util.Arrays;
19+
import java.util.concurrent.locks.Lock;
20+
import java.util.concurrent.locks.ReentrantLock;
21+
22+
import javax.sql.DataSource;
23+
24+
import org.springframework.batch.core.Job;
25+
import org.springframework.batch.core.Step;
26+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
27+
import org.springframework.batch.core.job.builder.JobBuilder;
28+
import org.springframework.batch.core.repository.JobRepository;
29+
import org.springframework.batch.core.step.builder.StepBuilder;
30+
import org.springframework.batch.item.ItemReader;
31+
import org.springframework.batch.item.ItemWriter;
32+
import org.springframework.batch.item.support.ListItemReader;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.core.task.VirtualThreadTaskExecutor;
36+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
37+
import org.springframework.jdbc.support.JdbcTransactionManager;
38+
39+
/**
40+
* Configuration class that defines a concurrent chunk-oriented step based on a
41+
* {@link VirtualThreadTaskExecutor}.
42+
*
43+
* @author Mahmoud Ben Hassine
44+
*/
45+
@Configuration
46+
@EnableBatchProcessing
47+
public class JobConfigurationForRunningConcurrentStepsWithVirtualThreads {
48+
49+
@Bean
50+
public ItemReader<Integer> itemReader() {
51+
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6)) {
52+
final Lock lock = new ReentrantLock();
53+
54+
@Override
55+
public Integer read() {
56+
this.lock.lock();
57+
try {
58+
Integer item = super.read();
59+
System.out.println(Thread.currentThread() + ": reading item " + item);
60+
return item;
61+
}
62+
finally {
63+
this.lock.unlock();
64+
}
65+
}
66+
};
67+
}
68+
69+
@Bean
70+
public ItemWriter<Integer> itemWriter() {
71+
return items -> {
72+
for (Integer item : items) {
73+
System.out.println(Thread.currentThread() + ": writing item " + item);
74+
}
75+
};
76+
}
77+
78+
@Bean
79+
public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager,
80+
ItemReader<Integer> itemReader, ItemWriter<Integer> itemWriter) {
81+
Step step = new StepBuilder("step", jobRepository).<Integer, Integer>chunk(2, transactionManager)
82+
.reader(itemReader)
83+
.writer(itemWriter)
84+
.taskExecutor(new VirtualThreadTaskExecutor("spring-batch-"))
85+
.build();
86+
return new JobBuilder("job", jobRepository).start(step).build();
87+
}
88+
89+
@Bean
90+
public DataSource dataSource() {
91+
return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-hsqldb.sql")
92+
.generateUniqueName(true)
93+
.build();
94+
}
95+
96+
@Bean
97+
public JdbcTransactionManager transactionManager(DataSource dataSource) {
98+
return new JdbcTransactionManager(dataSource);
99+
}
100+
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.sample.loom;
17+
18+
import javax.sql.DataSource;
19+
20+
import org.springframework.batch.core.Job;
21+
import org.springframework.batch.core.Step;
22+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
23+
import org.springframework.batch.core.job.builder.FlowBuilder;
24+
import org.springframework.batch.core.job.builder.JobBuilder;
25+
import org.springframework.batch.core.job.flow.Flow;
26+
import org.springframework.batch.core.repository.JobRepository;
27+
import org.springframework.batch.core.step.builder.StepBuilder;
28+
import org.springframework.batch.repeat.RepeatStatus;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.task.VirtualThreadTaskExecutor;
32+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
33+
import org.springframework.jdbc.support.JdbcTransactionManager;
34+
35+
/**
36+
* Configuration class that defines a parallel flow of steps based on a
37+
* {@link VirtualThreadTaskExecutor}.
38+
*
39+
* @author Mahmoud Ben Hassine
40+
*/
41+
@Configuration
42+
@EnableBatchProcessing
43+
public class JobConfigurationForRunningParallelStepsWithVirtualThreads {
44+
45+
@Bean
46+
public Step step1(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
47+
return createStep("step1", jobRepository, transactionManager);
48+
}
49+
50+
@Bean
51+
public Step step2(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
52+
return createStep("step2", jobRepository, transactionManager);
53+
}
54+
55+
@Bean
56+
public Job job(JobRepository jobRepository, Step step1, Step step2) {
57+
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step1).end();
58+
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2).end();
59+
60+
Flow splitFlow = new FlowBuilder<Flow>("splitflow").split(new VirtualThreadTaskExecutor("spring-batch-"))
61+
.add(flow1, flow2)
62+
.build();
63+
64+
return new JobBuilder("job", jobRepository).start(splitFlow).build().build();
65+
}
66+
67+
@Bean
68+
public DataSource dataSource() {
69+
return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-hsqldb.sql")
70+
.generateUniqueName(true)
71+
.build();
72+
}
73+
74+
@Bean
75+
public JdbcTransactionManager transactionManager(DataSource dataSource) {
76+
return new JdbcTransactionManager(dataSource);
77+
}
78+
79+
private Step createStep(String stepName, JobRepository jobRepository, JdbcTransactionManager transactionManager) {
80+
return new StepBuilder(stepName, jobRepository).tasklet((contribution, chunkContext) -> {
81+
System.out.println(Thread.currentThread() + ": running " + stepName);
82+
return RepeatStatus.FINISHED;
83+
}, transactionManager).build();
84+
}
85+
86+
}

0 commit comments

Comments
 (0)