diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java index 0735e7f2e7..0738305d3e 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java @@ -33,6 +33,7 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersIncrementer; import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.UnexpectedJobExecutionException; import org.springframework.batch.core.configuration.JobRegistry; @@ -52,6 +53,11 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.batch.core.step.NoSuchStepException; +import org.springframework.batch.core.step.StepLocator; +import org.springframework.batch.core.step.tasklet.StoppableTasklet; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.support.PropertiesConverter; import org.springframework.beans.factory.InitializingBean; import org.springframework.transaction.annotation.Transactional; @@ -71,6 +77,7 @@ * * @author Dave Syer * @author Lucas Ward + * @author Will Schipp * @since 2.0 */ public class SimpleJobOperator implements JobOperator, InitializingBean { @@ -392,6 +399,33 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe } jobExecution.setStatus(BatchStatus.STOPPING); jobRepository.update(jobExecution); + //implementation to support Tasklet.stop() + try { + Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName()); + if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object + //get the current stepExecution + for (StepExecution stepExecution : jobExecution.getStepExecutions()) { + if (stepExecution.getStatus().isRunning()) { + try { + //have the step execution that's running -> need to 'stop' it + Step step = ((StepLocator)job).getStep(stepExecution.getStepName()); + if (step instanceof TaskletStep) { + Tasklet tasklet = ((TaskletStep)step).getTasklet(); + if (tasklet instanceof StoppableTasklet) { + ((StoppableTasklet)tasklet).stop(); + }//end if + }//end if + } + catch (NoSuchStepException e) { + logger.warn("Step not found",e); + } + }//end if + }//end for + }//end if + } + catch (NoSuchJobException e) { + logger.warn("Cannot find Job object",e); + } return true; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java new file mode 100644 index 0000000000..280b4ecc4b --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java @@ -0,0 +1,18 @@ +package org.springframework.batch.core.step.tasklet; + +/** + * + * Stoppable tasklet + * + * @author Will Schipp + * + */ +public interface StoppableTasklet extends Tasklet { + + /** + * method to signal a long running/looping tasklet to stop + * + */ + void stop(); + +} diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java index 397ef75786..f0f33ff003 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java @@ -54,8 +54,9 @@ * still running when tasklet exits (abnormally). * * @author Robert Kasanicky + * @author Will Schipp */ -public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean { +public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean { protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class); @@ -77,6 +78,8 @@ public class SystemCommandTasklet extends StepExecutionListenerSupport implement private boolean interruptOnCancel = false; + private boolean stopped = false; + /** * Execute system command and map its exit code to {@link ExitStatus} using * {@link SystemProcessExitCodeMapper}. @@ -99,7 +102,7 @@ public Integer call() throws Exception { taskExecutor.execute(systemCommandTask); while (true) { - Thread.sleep(checkInterval); + Thread.sleep(checkInterval);//moved to the end of the logic if (systemCommandTask.isDone()) { contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get())); return RepeatStatus.FINISHED; @@ -112,6 +115,14 @@ else if (execution.isTerminateOnly()) { systemCommandTask.cancel(interruptOnCancel); throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'"); } + else if (stopped) { + stopped = false;//reset + //invoke cancel + systemCommandTask.cancel(interruptOnCancel); + contribution.setExitStatus(ExitStatus.STOPPED); + return RepeatStatus.FINISHED; + } + } } @@ -208,4 +219,9 @@ public void setInterruptOnCancel(boolean interruptOnCancel) { this.interruptOnCancel = interruptOnCancel; } + @Override + public void stop() { + stopped = true; + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java index 8b98f9fe44..70f56edd5f 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java @@ -71,6 +71,7 @@ * @author Ben Hale * @author Robert Kasanicky * @author Michael Minella + * @author Will Schipp */ @SuppressWarnings("serial") public class TaskletStep extends AbstractStep { @@ -306,6 +307,14 @@ protected void open(ExecutionContext ctx) throws Exception { stream.open(ctx); } + /** + * retrieve the tasklet - helper method for JobOperator + * @return + */ + public Tasklet getTasklet() { + return tasklet; + } + /** * A callback for the transactional work inside a chunk. Also detects * failures in the transaction commit and rollback, only panicking if the diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java index 197153ac9d..e77abe0498 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java @@ -15,15 +15,17 @@ */ package org.springframework.batch.core.launch.support; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -36,12 +38,17 @@ import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersIncrementer; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.support.MapJobRegistry; import org.springframework.batch.core.converter.DefaultJobParametersConverter; import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.job.JobSupport; import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.JobLauncher; @@ -52,6 +59,10 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.StoppableTasklet; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.batch.support.PropertiesConverter; /** @@ -349,6 +360,67 @@ public void testStop() throws Exception{ assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); } + @Test + public void testStopTasklet() throws Exception { + JobInstance jobInstance = new JobInstance(123L, job.getName()); + JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters); + StoppableTasklet tasklet = mock(StoppableTasklet.class); + TaskletStep taskletStep = new TaskletStep(); + taskletStep.setTasklet(tasklet); + MockJob job = new MockJob(); + job.taskletStep = taskletStep; + + JobRegistry jobRegistry = mock(JobRegistry.class); + TaskletStep step = mock(TaskletStep.class); + + when(step.getTasklet()).thenReturn(tasklet); + when(step.getName()).thenReturn("test_job.step1"); + when(jobRegistry.getJob(anyString())).thenReturn(job); + when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution); + + jobOperator.setJobRegistry(jobRegistry); + jobExplorer.getJobExecution(111L); + jobRepository.update(jobExecution); + jobOperator.stop(111L); + assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); + } + + @Test + public void testStopTaskletException() throws Exception { + JobInstance jobInstance = new JobInstance(123L, job.getName()); + JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters); + StoppableTasklet tasklet = new StoppableTasklet() { + + @Override + public RepeatStatus execute(StepContribution contribution, + ChunkContext chunkContext) throws Exception { + return null; + } + + @Override + public void stop() { + throw new IllegalStateException(); + }}; + TaskletStep taskletStep = new TaskletStep(); + taskletStep.setTasklet(tasklet); + MockJob job = new MockJob(); + job.taskletStep = taskletStep; + + JobRegistry jobRegistry = mock(JobRegistry.class); + TaskletStep step = mock(TaskletStep.class); + + when(step.getTasklet()).thenReturn(tasklet); + when(step.getName()).thenReturn("test_job.step1"); + when(jobRegistry.getJob(anyString())).thenReturn(job); + when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution); + + jobOperator.setJobRegistry(jobRegistry); + jobExplorer.getJobExecution(111L); + jobRepository.update(jobExecution); + jobOperator.stop(111L); + assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); + } + @Test public void testAbort() throws Exception { JobInstance jobInstance = new JobInstance(123L, job.getName()); @@ -370,4 +442,27 @@ public void testAbortNonStopping() throws Exception { jobRepository.update(jobExecution); jobOperator.abandon(123L); } + + + class MockJob extends AbstractJob { + + private TaskletStep taskletStep; + + @Override + public Step getStep(String stepName) { + return taskletStep; + } + + @Override + public Collection getStepNames() { + return Arrays.asList("test_job.step1"); + } + + @Override + protected void doExecute(JobExecution execution) throws JobExecutionException { + + } + + } + } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java index dcb3e9aa19..442807fa88 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java @@ -216,6 +216,24 @@ public void testWorkingDirectory() throws Exception { // no error expected now tasklet.setWorkingDirectory(directory.getCanonicalPath()); } + + /* + * test stopping a tasklet + */ + @Test + public void testStopped() throws Exception { + String command = "sleep 5"; + tasklet.setCommand(command); + tasklet.setTerminationCheckInterval(10); + tasklet.afterPropertiesSet(); + + StepContribution contribution = stepExecution.createStepContribution(); + //send stop + tasklet.stop(); + tasklet.execute(contribution, null); + + assertEquals(contribution.getExitStatus().getExitCode(),ExitStatus.STOPPED.getExitCode()); + } /** * Exit code mapper containing mapping logic expected by the tests. 0 means