Skip to content

Commit 08e0481

Browse files
committed
Refine #4131
* Deprecate SimpleJobLauncher in favor of TaskExecutorJobLauncher * Update year in license headers
1 parent c079103 commit 08e0481

File tree

4 files changed

+228
-158
lines changed

4 files changed

+228
-158
lines changed
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Copyright 2006-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.launch.support;
17+
18+
import java.time.Duration;
19+
20+
import org.apache.commons.logging.Log;
21+
import org.apache.commons.logging.LogFactory;
22+
23+
import org.springframework.batch.core.BatchStatus;
24+
import org.springframework.batch.core.ExitStatus;
25+
import org.springframework.batch.core.Job;
26+
import org.springframework.batch.core.JobExecution;
27+
import org.springframework.batch.core.JobInstance;
28+
import org.springframework.batch.core.JobParameters;
29+
import org.springframework.batch.core.JobParametersInvalidException;
30+
import org.springframework.batch.core.StepExecution;
31+
import org.springframework.batch.core.launch.JobLauncher;
32+
import org.springframework.batch.core.observability.BatchMetrics;
33+
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
34+
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
35+
import org.springframework.batch.core.repository.JobRepository;
36+
import org.springframework.batch.core.repository.JobRestartException;
37+
import org.springframework.beans.factory.InitializingBean;
38+
import org.springframework.core.task.SyncTaskExecutor;
39+
import org.springframework.core.task.TaskExecutor;
40+
import org.springframework.core.task.TaskRejectedException;
41+
import org.springframework.util.Assert;
42+
43+
/**
44+
* Simple implementation of the {@link JobLauncher} interface. The Spring Core
45+
* {@link TaskExecutor} interface is used to launch a {@link Job}. This means that the
46+
* type of executor set is very important. If a {@link SyncTaskExecutor} is used, then the
47+
* job will be processed <strong>within the same thread that called the launcher.</strong>
48+
* Care should be taken to ensure any users of this class understand fully whether or not
49+
* the implementation of TaskExecutor used will start tasks synchronously or
50+
* asynchronously. The default setting uses a synchronous task executor.
51+
*
52+
* There is only one required dependency of this Launcher, a {@link JobRepository}. The
53+
* JobRepository is used to obtain a valid JobExecution. The Repository must be used
54+
* because the provided {@link Job} could be a restart of an existing {@link JobInstance},
55+
* and only the Repository can reliably recreate it.
56+
*
57+
* @author Lucas Ward
58+
* @author Dave Syer
59+
* @author Will Schipp
60+
* @author Michael Minella
61+
* @author Mahmoud Ben Hassine
62+
* @since 1.0
63+
* @see JobRepository
64+
* @see TaskExecutor
65+
* @deprecated Since v5.0.0 for removal in v5.2.0. Use {@link TaskExecutorJobLauncher}.
66+
*/
67+
@Deprecated(since = "5.0.0", forRemoval = true)
68+
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
69+
70+
protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
71+
72+
private JobRepository jobRepository;
73+
74+
private TaskExecutor taskExecutor;
75+
76+
/**
77+
* Run the provided job with the given {@link JobParameters}. The
78+
* {@link JobParameters} will be used to determine if this is an execution of an
79+
* existing job instance, or if a new one should be created.
80+
* @param job the job to be run.
81+
* @param jobParameters the {@link JobParameters} for this particular execution.
82+
* @return the {@link JobExecution} if it returns synchronously. If the implementation
83+
* is asynchronous, the status might well be unknown.
84+
* @throws JobExecutionAlreadyRunningException if the JobInstance already exists and
85+
* has an execution already running.
86+
* @throws JobRestartException if the execution would be a re-start, but a re-start is
87+
* either not allowed or not needed.
88+
* @throws JobInstanceAlreadyCompleteException if this instance has already completed
89+
* successfully
90+
* @throws JobParametersInvalidException thrown if jobParameters is invalid.
91+
*/
92+
@Override
93+
public JobExecution run(final Job job, final JobParameters jobParameters)
94+
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
95+
JobParametersInvalidException {
96+
97+
Assert.notNull(job, "The Job must not be null.");
98+
Assert.notNull(jobParameters, "The JobParameters must not be null.");
99+
100+
final JobExecution jobExecution;
101+
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
102+
if (lastExecution != null) {
103+
if (!job.isRestartable()) {
104+
throw new JobRestartException("JobInstance already exists and is not restartable");
105+
}
106+
/*
107+
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED
108+
* and STOPPING retrieve the previous execution and check
109+
*/
110+
for (StepExecution execution : lastExecution.getStepExecutions()) {
111+
BatchStatus status = execution.getStatus();
112+
if (status.isRunning() || status == BatchStatus.STOPPING) {
113+
throw new JobExecutionAlreadyRunningException(
114+
"A job execution for this job is already running: " + lastExecution);
115+
}
116+
else if (status == BatchStatus.UNKNOWN) {
117+
throw new JobRestartException(
118+
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
119+
+ "The last execution ended with a failure that could not be rolled back, "
120+
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
121+
}
122+
}
123+
}
124+
125+
// Check the validity of the parameters before doing creating anything
126+
// in the repository...
127+
job.getJobParametersValidator().validate(jobParameters);
128+
129+
/*
130+
* There is a very small probability that a non-restartable job can be restarted,
131+
* but only if another process or thread manages to launch <i>and</i> fail a job
132+
* execution for this instance between the last assertion and the next method
133+
* returning successfully.
134+
*/
135+
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
136+
137+
try {
138+
taskExecutor.execute(new Runnable() {
139+
140+
@Override
141+
public void run() {
142+
try {
143+
if (logger.isInfoEnabled()) {
144+
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
145+
+ "]");
146+
}
147+
job.execute(jobExecution);
148+
if (logger.isInfoEnabled()) {
149+
Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(),
150+
jobExecution.getEndTime());
151+
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
152+
+ "] and the following status: [" + jobExecution.getStatus() + "]"
153+
+ (jobExecutionDuration == null ? ""
154+
: " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
155+
}
156+
}
157+
catch (Throwable t) {
158+
if (logger.isInfoEnabled()) {
159+
logger.info("Job: [" + job
160+
+ "] failed unexpectedly and fatally with the following parameters: ["
161+
+ jobParameters + "]", t);
162+
}
163+
rethrow(t);
164+
}
165+
}
166+
167+
private void rethrow(Throwable t) {
168+
if (t instanceof RuntimeException) {
169+
throw (RuntimeException) t;
170+
}
171+
else if (t instanceof Error) {
172+
throw (Error) t;
173+
}
174+
throw new IllegalStateException(t);
175+
}
176+
});
177+
}
178+
catch (TaskRejectedException e) {
179+
jobExecution.upgradeStatus(BatchStatus.FAILED);
180+
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
181+
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
182+
}
183+
jobRepository.update(jobExecution);
184+
}
185+
186+
return jobExecution;
187+
}
188+
189+
/**
190+
* Set the JobRepository.
191+
* @param jobRepository instance of {@link JobRepository}.
192+
*/
193+
public void setJobRepository(JobRepository jobRepository) {
194+
this.jobRepository = jobRepository;
195+
}
196+
197+
/**
198+
* Set the TaskExecutor. (Optional)
199+
* @param taskExecutor instance of {@link TaskExecutor}.
200+
*/
201+
public void setTaskExecutor(TaskExecutor taskExecutor) {
202+
this.taskExecutor = taskExecutor;
203+
}
204+
205+
/**
206+
* Ensure the required dependencies of a {@link JobRepository} have been set.
207+
*/
208+
@Override
209+
public void afterPropertiesSet() throws Exception {
210+
Assert.state(jobRepository != null, "A JobRepository has not been set.");
211+
if (taskExecutor == null) {
212+
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
213+
taskExecutor = new SyncTaskExecutor();
214+
}
215+
}
216+
217+
}

0 commit comments

Comments
 (0)