Skip to content

Commit 451d3ba

Browse files
committed
Make the meaning of running status consistent
- A running status is STARTING, STARTED, or STOPPING. - Make the order when iterating the return value by JdbcJobExecutionDao.findRunningJobExecutions consistent which the result set of the sql. Resolves spring-projects#1483
1 parent 5bce152 commit 451d3ba

File tree

9 files changed

+132
-13
lines changed

9 files changed

+132
-13
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public static BatchStatus max(BatchStatus status1, BatchStatus status2) {
8585
/**
8686
* Convenience method to decide if a status indicates that work is in progress.
8787
*
88-
* @return true if the status is STARTING, STARTED
88+
* @return true if the status is STARTING, STARTED, STOPPING
8989
*/
9090
public boolean isRunning() {
91-
return this == STARTING || this == STARTED;
91+
return this == STARTING || this == STARTED || this == STOPPING;
9292
}
9393

9494
/**

spring-batch-core/src/main/java/org/springframework/batch/core/JobExecution.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,10 @@ public StepExecution createStepExecution(String stepName) {
263263
* Test if this {@link JobExecution} indicates that it is running.
264264
* Note that this does not necessarily mean that it has been persisted.
265265
*
266-
* @return {@code true} if the end time is null and the start time is not null.
266+
* @return {@code true} if the status is one of the running status.
267267
*/
268268
public boolean isRunning() {
269-
return startTime != null && endTime == null;
269+
return status.isRunning();
270270
}
271271

272272
/**

spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public JobExecution run(final Job job, final JobParameters jobParameters)
112112
*/
113113
for (StepExecution execution : lastExecution.getStepExecutions()) {
114114
BatchStatus status = execution.getStatus();
115-
if (status.isRunning() || status == BatchStatus.STOPPING) {
115+
if (status.isRunning()) {
116116
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
117117
+ lastExecution);
118118
} else if (status == BatchStatus.UNKNOWN) {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.sql.SQLException;
2121
import java.sql.Types;
2222
import java.util.HashMap;
23-
import java.util.HashSet;
23+
import java.util.LinkedHashSet;
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Map.Entry;
@@ -85,7 +85,7 @@ public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements
8585
+ " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
8686

8787
private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
88-
+ "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";
88+
+ "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.STATUS in ('STARTING', 'STARTED', 'STOPPING') order by E.JOB_EXECUTION_ID desc";
8989

9090
private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?";
9191

@@ -287,7 +287,7 @@ public JobExecution getJobExecution(Long executionId) {
287287
@Override
288288
public Set<JobExecution> findRunningJobExecutions(String jobName) {
289289

290-
final Set<JobExecution> result = new HashSet<>();
290+
final Set<JobExecution> result = new LinkedHashSet<>();
291291
RowCallbackHandler handler = new RowCallbackHandler() {
292292
@Override
293293
public void processRow(ResultSet rs) throws SQLException {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public JobExecution createJobExecution(String jobName, JobParameters jobParamete
123123

124124
// check for running executions and find the last started
125125
for (JobExecution execution : executions) {
126-
if (execution.isRunning() || execution.isStopping()) {
126+
if (execution.isRunning()) {
127127
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
128128
+ jobInstance);
129129
}

spring-batch-core/src/test/java/org/springframework/batch/core/BatchStatusTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public void testIsRunning() {
7373
assertFalse(BatchStatus.COMPLETED.isRunning());
7474
assertTrue(BatchStatus.STARTED.isRunning());
7575
assertTrue(BatchStatus.STARTING.isRunning());
76+
assertTrue(BatchStatus.STOPPING.isRunning());
7677
}
7778

7879
@Test

spring-batch-core/src/test/java/org/springframework/batch/core/JobExecutionTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,19 @@ public void testGetEndTime() {
5757

5858
/**
5959
* Test method for
60-
* {@link org.springframework.batch.core.JobExecution#getEndTime()}.
60+
* {@link org.springframework.batch.core.JobExecution#isRunning()}.
6161
*/
6262
@Test
6363
public void testIsRunning() {
64-
execution.setStartTime(new Date());
64+
execution.setStatus(BatchStatus.STARTING);
6565
assertTrue(execution.isRunning());
66-
execution.setEndTime(new Date(100L));
66+
execution.setStatus(BatchStatus.STARTED);
67+
assertTrue(execution.isRunning());
68+
execution.setStatus(BatchStatus.STOPPING);
69+
assertTrue(execution.isRunning());
70+
execution.setStatus(BatchStatus.COMPLETED);
71+
assertFalse(execution.isRunning());
72+
execution.setStatus(BatchStatus.FAILED);
6773
assertFalse(execution.isRunning());
6874
}
6975

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2006-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.launch;
17+
18+
import java.util.Date;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
import javax.sql.DataSource;
25+
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
import org.junit.runner.RunWith;
29+
30+
import org.springframework.batch.core.Job;
31+
import org.springframework.batch.core.JobParameters;
32+
import org.springframework.batch.core.JobParametersBuilder;
33+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
34+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
35+
import org.springframework.batch.core.repository.JobRepository;
36+
import org.springframework.batch.core.step.tasklet.TaskletStep;
37+
import org.springframework.batch.core.test.AbstractIntegrationTests;
38+
import org.springframework.batch.repeat.RepeatStatus;
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.test.context.ContextConfiguration;
41+
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
42+
import org.springframework.transaction.PlatformTransactionManager;
43+
44+
/**
45+
* @author Marvin Deng
46+
*/
47+
@RunWith(SpringJUnit4ClassRunner.class)
48+
@ContextConfiguration(locations = {"/simple-job-launcher-context.xml"})
49+
public class SimpleJobLauncherIntegrationTests extends AbstractIntegrationTests {
50+
51+
@Autowired
52+
private JobLauncher jobLauncher;
53+
54+
@Autowired
55+
private JobRepository jobRepository;
56+
57+
@Autowired
58+
private PlatformTransactionManager transactionManager;
59+
60+
@Autowired
61+
public void setDataSource(DataSource dataSource) {
62+
this.dataSource = dataSource;
63+
}
64+
65+
@Test
66+
public void testLaunchWithSameParametersInMultiThreads() throws Exception {
67+
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
68+
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
69+
70+
TaskletStep step = stepBuilderFactory.get("testStep")
71+
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
72+
.build();
73+
Job job = jobBuilderFactory.get("testJob").start(step).build();
74+
JobParameters jobParameters = new JobParametersBuilder()
75+
.addDate("now", new Date())
76+
.toJobParameters();
77+
78+
AtomicInteger succeedCount = new AtomicInteger(0);
79+
AtomicInteger failedCount = new AtomicInteger(0);
80+
final int count = 10;
81+
CountDownLatch latch = new CountDownLatch(count);
82+
ExecutorService executorService = Executors.newFixedThreadPool(count);
83+
for (int i = 0; i < count; i++) {
84+
executorService.submit(() -> {
85+
try {
86+
jobLauncher.run(job, jobParameters);
87+
succeedCount.incrementAndGet();
88+
}
89+
catch (Exception e) {
90+
failedCount.incrementAndGet();
91+
}
92+
finally {
93+
latch.countDown();
94+
}
95+
});
96+
}
97+
executorService.shutdown();
98+
latch.await();
99+
Assert.assertEquals(1, succeedCount.intValue());
100+
Assert.assertEquals(count - 1, failedCount.intValue());
101+
}
102+
103+
}

spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractJobExecutionDaoTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ public void testFindRunningExecutions() {
202202
exec.setCreateTime(new Date(0));
203203
exec.setStartTime(new Date(1L));
204204
exec.setEndTime(new Date(2L));
205+
exec.setStatus(BatchStatus.COMPLETED);
205206
exec.setLastUpdated(new Date(5L));
206207
dao.saveJobExecution(exec);
207208

@@ -212,9 +213,17 @@ public void testFindRunningExecutions() {
212213
exec.setLastUpdated(new Date(5L));
213214
dao.saveJobExecution(exec);
214215

216+
//Stopping JobExecution as status is STOPPING
217+
exec = new JobExecution(jobInstance, jobParameters);
218+
exec.setStartTime(new Date(3L));
219+
exec.setStatus(BatchStatus.STOPPING);
220+
exec.setLastUpdated(new Date(5L));
221+
dao.saveJobExecution(exec);
222+
215223
//Running JobExecution as StartTime is populated but EndTime is null
216224
exec = new JobExecution(jobInstance, jobParameters);
217225
exec.setStartTime(new Date(2L));
226+
exec.setStatus(BatchStatus.STARTED);
218227
exec.setLastUpdated(new Date(5L));
219228
exec.createStepExecution("step");
220229
dao.saveJobExecution(exec);
@@ -228,7 +237,7 @@ public void testFindRunningExecutions() {
228237

229238
Set<JobExecution> values = dao.findRunningJobExecutions(exec.getJobInstance().getJobName());
230239

231-
assertEquals(1, values.size());
240+
assertEquals(3, values.size());
232241
JobExecution value = values.iterator().next();
233242
assertEquals(exec, value);
234243
assertEquals(5L, value.getLastUpdated().getTime());

0 commit comments

Comments
 (0)