Skip to content

Commit 3ed94e8

Browse files
committed
Merge pull request #173 from willschipp/BATCH-2009
* BATCH-2009: BATCH-2009 - Stoppable tasklet * Added the StoppableTasklet interface (an extension of the Tasklet interface). * Updated JobOperator#stop(long executionId) to call StoppableTasklet#stop() on any currently running tasklets (local executions only). * Updated the SystemCommandTasklet to implement StoppableTasklet
2 parents 2d874ff + 81e0c38 commit 3ed94e8

File tree

6 files changed

+219
-3
lines changed

6 files changed

+219
-3
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.batch.core.JobParameters;
3434
import org.springframework.batch.core.JobParametersIncrementer;
3535
import org.springframework.batch.core.JobParametersInvalidException;
36+
import org.springframework.batch.core.Step;
3637
import org.springframework.batch.core.StepExecution;
3738
import org.springframework.batch.core.UnexpectedJobExecutionException;
3839
import org.springframework.batch.core.configuration.JobRegistry;
@@ -52,6 +53,11 @@
5253
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
5354
import org.springframework.batch.core.repository.JobRepository;
5455
import org.springframework.batch.core.repository.JobRestartException;
56+
import org.springframework.batch.core.step.NoSuchStepException;
57+
import org.springframework.batch.core.step.StepLocator;
58+
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
59+
import org.springframework.batch.core.step.tasklet.Tasklet;
60+
import org.springframework.batch.core.step.tasklet.TaskletStep;
5561
import org.springframework.batch.support.PropertiesConverter;
5662
import org.springframework.beans.factory.InitializingBean;
5763
import org.springframework.transaction.annotation.Transactional;
@@ -71,6 +77,7 @@
7177
*
7278
* @author Dave Syer
7379
* @author Lucas Ward
80+
* @author Will Schipp
7481
* @since 2.0
7582
*/
7683
public class SimpleJobOperator implements JobOperator, InitializingBean {
@@ -394,6 +401,33 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe
394401
jobExecution.setStatus(BatchStatus.STOPPING);
395402
jobRepository.update(jobExecution);
396403

404+
try {
405+
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
406+
if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
407+
//get the current stepExecution
408+
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
409+
if (stepExecution.getStatus().isRunning()) {
410+
try {
411+
//have the step execution that's running -> need to 'stop' it
412+
Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
413+
if (step instanceof TaskletStep) {
414+
Tasklet tasklet = ((TaskletStep)step).getTasklet();
415+
if (tasklet instanceof StoppableTasklet) {
416+
((StoppableTasklet)tasklet).stop();
417+
}
418+
}
419+
}
420+
catch (NoSuchStepException e) {
421+
logger.warn("Step not found",e);
422+
}
423+
}
424+
}
425+
}
426+
}
427+
catch (NoSuchJobException e) {
428+
logger.warn("Cannot find Job object",e);
429+
}
430+
397431
return true;
398432
}
399433

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.step.tasklet;
17+
18+
import org.springframework.batch.core.launch.JobOperator;
19+
20+
/**
21+
* An extension to the {@link Tasklet} interface to allow users to
22+
* add logic for stopping a tasklet. It is up to each implementation
23+
* as to how the stop will behave. The only guarantee provided by the
24+
* framework is that a call to {@link JobOperator#stop(long)} will
25+
* attempt to call the stop method on any currently running
26+
* StoppableTasklet.
27+
*
28+
* @author Will Schipp
29+
* @since 3.0
30+
*/
31+
public interface StoppableTasklet extends Tasklet {
32+
33+
/**
34+
* Used to signal that the job this {@link Tasklet} is executing
35+
* within has been requested to stop.
36+
*/
37+
void stop();
38+
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@
5454
* still running when tasklet exits (abnormally).
5555
*
5656
* @author Robert Kasanicky
57+
* @author Will Schipp
5758
*/
58-
public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean {
59+
public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean {
5960

6061
protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);
6162

@@ -77,6 +78,8 @@ public class SystemCommandTasklet extends StepExecutionListenerSupport implement
7778

7879
private boolean interruptOnCancel = false;
7980

81+
private boolean stopped = false;
82+
8083
/**
8184
* Execute system command and map its exit code to {@link ExitStatus} using
8285
* {@link SystemProcessExitCodeMapper}.
@@ -99,7 +102,7 @@ public Integer call() throws Exception {
99102
taskExecutor.execute(systemCommandTask);
100103

101104
while (true) {
102-
Thread.sleep(checkInterval);
105+
Thread.sleep(checkInterval);//moved to the end of the logic
103106
if (systemCommandTask.isDone()) {
104107
contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
105108
return RepeatStatus.FINISHED;
@@ -112,8 +115,12 @@ else if (execution.isTerminateOnly()) {
112115
systemCommandTask.cancel(interruptOnCancel);
113116
throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
114117
}
118+
else if (stopped) {
119+
systemCommandTask.cancel(interruptOnCancel);
120+
contribution.setExitStatus(ExitStatus.STOPPED);
121+
return RepeatStatus.FINISHED;
122+
}
115123
}
116-
117124
}
118125

119126
/**
@@ -208,4 +215,18 @@ public void setInterruptOnCancel(boolean interruptOnCancel) {
208215
this.interruptOnCancel = interruptOnCancel;
209216
}
210217

218+
/**
219+
* Will interrupt the thread executing the system command only if
220+
* {@link #setInterruptOnCancel(boolean)} has been set to true. Otherwise
221+
* the underlying command will be allowed to finish before the tasklet
222+
* ends.
223+
*
224+
* @since 3.0
225+
* @see StoppableTasklet#stop()
226+
*/
227+
@Override
228+
public void stop() {
229+
stopped = true;
230+
}
231+
211232
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
* @author Ben Hale
7272
* @author Robert Kasanicky
7373
* @author Michael Minella
74+
* @author Will Schipp
7475
*/
7576
@SuppressWarnings("serial")
7677
public class TaskletStep extends AbstractStep {
@@ -306,6 +307,14 @@ protected void open(ExecutionContext ctx) throws Exception {
306307
stream.open(ctx);
307308
}
308309

310+
/**
311+
* retrieve the tasklet - helper method for JobOperator
312+
* @return the {@link Tasklet} instance being executed within this step
313+
*/
314+
public Tasklet getTasklet() {
315+
return tasklet;
316+
}
317+
309318
/**
310319
* A callback for the transactional work inside a chunk. Also detects
311320
* failures in the transaction commit and rollback, only panicking if the

spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import static org.junit.Assert.assertNotNull;
2020
import static org.junit.Assert.assertTrue;
2121
import static org.junit.Assert.fail;
22+
import static org.mockito.Matchers.anyString;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.when;
2425

2526
import java.util.Arrays;
27+
import java.util.Collection;
2628
import java.util.Collections;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -35,12 +37,17 @@
3537
import org.springframework.batch.core.BatchStatus;
3638
import org.springframework.batch.core.Job;
3739
import org.springframework.batch.core.JobExecution;
40+
import org.springframework.batch.core.JobExecutionException;
3841
import org.springframework.batch.core.JobInstance;
3942
import org.springframework.batch.core.JobParameters;
4043
import org.springframework.batch.core.JobParametersIncrementer;
44+
import org.springframework.batch.core.Step;
45+
import org.springframework.batch.core.StepContribution;
46+
import org.springframework.batch.core.configuration.JobRegistry;
4147
import org.springframework.batch.core.configuration.support.MapJobRegistry;
4248
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
4349
import org.springframework.batch.core.explore.JobExplorer;
50+
import org.springframework.batch.core.job.AbstractJob;
4451
import org.springframework.batch.core.job.JobSupport;
4552
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
4653
import org.springframework.batch.core.launch.JobLauncher;
@@ -51,6 +58,10 @@
5158
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
5259
import org.springframework.batch.core.repository.JobRepository;
5360
import org.springframework.batch.core.repository.JobRestartException;
61+
import org.springframework.batch.core.scope.context.ChunkContext;
62+
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
63+
import org.springframework.batch.core.step.tasklet.TaskletStep;
64+
import org.springframework.batch.repeat.RepeatStatus;
5465
import org.springframework.batch.support.PropertiesConverter;
5566

5667
/**
@@ -254,6 +265,7 @@ public void testFindRunningExecutionsSunnyDay() throws Exception {
254265
}
255266

256267
@Test
268+
@SuppressWarnings("unchecked")
257269
public void testFindRunningExecutionsNoSuchJob() throws Exception {
258270
jobParameters = new JobParameters();
259271
when(jobExplorer.findRunningJobExecutions("no-such-job")).thenReturn(Collections.EMPTY_SET);
@@ -348,6 +360,67 @@ public void testStop() throws Exception{
348360
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
349361
}
350362

363+
@Test
364+
public void testStopTasklet() throws Exception {
365+
JobInstance jobInstance = new JobInstance(123L, job.getName());
366+
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters, null);
367+
StoppableTasklet tasklet = mock(StoppableTasklet.class);
368+
TaskletStep taskletStep = new TaskletStep();
369+
taskletStep.setTasklet(tasklet);
370+
MockJob job = new MockJob();
371+
job.taskletStep = taskletStep;
372+
373+
JobRegistry jobRegistry = mock(JobRegistry.class);
374+
TaskletStep step = mock(TaskletStep.class);
375+
376+
when(step.getTasklet()).thenReturn(tasklet);
377+
when(step.getName()).thenReturn("test_job.step1");
378+
when(jobRegistry.getJob(anyString())).thenReturn(job);
379+
when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution);
380+
381+
jobOperator.setJobRegistry(jobRegistry);
382+
jobExplorer.getJobExecution(111L);
383+
jobRepository.update(jobExecution);
384+
jobOperator.stop(111L);
385+
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
386+
}
387+
388+
@Test
389+
public void testStopTaskletException() throws Exception {
390+
JobInstance jobInstance = new JobInstance(123L, job.getName());
391+
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters, null);
392+
StoppableTasklet tasklet = new StoppableTasklet() {
393+
394+
@Override
395+
public RepeatStatus execute(StepContribution contribution,
396+
ChunkContext chunkContext) throws Exception {
397+
return null;
398+
}
399+
400+
@Override
401+
public void stop() {
402+
throw new IllegalStateException();
403+
}};
404+
TaskletStep taskletStep = new TaskletStep();
405+
taskletStep.setTasklet(tasklet);
406+
MockJob job = new MockJob();
407+
job.taskletStep = taskletStep;
408+
409+
JobRegistry jobRegistry = mock(JobRegistry.class);
410+
TaskletStep step = mock(TaskletStep.class);
411+
412+
when(step.getTasklet()).thenReturn(tasklet);
413+
when(step.getName()).thenReturn("test_job.step1");
414+
when(jobRegistry.getJob(anyString())).thenReturn(job);
415+
when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution);
416+
417+
jobOperator.setJobRegistry(jobRegistry);
418+
jobExplorer.getJobExecution(111L);
419+
jobRepository.update(jobExecution);
420+
jobOperator.stop(111L);
421+
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
422+
}
423+
351424
@Test
352425
public void testAbort() throws Exception {
353426
JobInstance jobInstance = new JobInstance(123L, job.getName());
@@ -369,4 +442,25 @@ public void testAbortNonStopping() throws Exception {
369442
jobRepository.update(jobExecution);
370443
jobOperator.abandon(123L);
371444
}
445+
446+
class MockJob extends AbstractJob {
447+
448+
private TaskletStep taskletStep;
449+
450+
@Override
451+
public Step getStep(String stepName) {
452+
return taskletStep;
453+
}
454+
455+
@Override
456+
public Collection<String> getStepNames() {
457+
return Arrays.asList("test_job.step1");
458+
}
459+
460+
@Override
461+
protected void doExecute(JobExecution execution) throws JobExecutionException {
462+
463+
}
464+
465+
}
372466
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,26 @@ public void testWorkingDirectory() throws Exception {
221221
tasklet.setWorkingDirectory(directory.getCanonicalPath());
222222
}
223223

224+
/*
225+
* test stopping a tasklet
226+
*/
227+
@Test
228+
public void testStopped() throws Exception {
229+
String command = System.getProperty("os.name").toLowerCase().indexOf("win") >= 0 ?
230+
"ping 1.1.1.1 -n 1 -w 5000" :
231+
"sleep 5";
232+
tasklet.setCommand(command);
233+
tasklet.setTerminationCheckInterval(10);
234+
tasklet.afterPropertiesSet();
235+
236+
StepContribution contribution = stepExecution.createStepContribution();
237+
//send stop
238+
tasklet.stop();
239+
tasklet.execute(contribution, null);
240+
241+
assertEquals(contribution.getExitStatus().getExitCode(),ExitStatus.STOPPED.getExitCode());
242+
}
243+
224244
/**
225245
* Exit code mapper containing mapping logic expected by the tests. 0 means
226246
* finished successfully, other value means failure.

0 commit comments

Comments
 (0)