-
Notifications
You must be signed in to change notification settings - Fork 2.4k
BATCH-2009 - stoppable tasklet #173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
e23eb35
5d27967
0ebda8f
cfa15bf
f9c8017
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ | |
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updates to the ChunkOrientedTasklet aren't needed since the status is checked at chunk boundaries anyways (hence the reason you can stop a chunk oriented step via the JobOperator#stop() method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
import org.springframework.batch.core.ExitStatus; | ||
import org.springframework.batch.core.StepContribution; | ||
import org.springframework.batch.core.scope.context.ChunkContext; | ||
import org.springframework.batch.core.step.tasklet.StoppableTasklet; | ||
import org.springframework.batch.core.step.tasklet.Tasklet; | ||
import org.springframework.batch.repeat.RepeatStatus; | ||
|
||
|
@@ -28,10 +30,11 @@ | |
* handling. | ||
* | ||
* @author Dave Syer | ||
* @author Will Schipp | ||
* | ||
* @param <I> input item type | ||
*/ | ||
public class ChunkOrientedTasklet<I> implements Tasklet { | ||
public class ChunkOrientedTasklet<I> implements StoppableTasklet { | ||
|
||
private static final String INPUTS_KEY = "INPUTS"; | ||
|
||
|
@@ -40,6 +43,11 @@ public class ChunkOrientedTasklet<I> implements Tasklet { | |
private final ChunkProvider<I> chunkProvider; | ||
|
||
private boolean buffering = true; | ||
|
||
/** | ||
* support for stoppable tasklet | ||
*/ | ||
private boolean stopped = false; | ||
|
||
private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class); | ||
|
||
|
@@ -63,6 +71,13 @@ public void setBuffering(boolean buffering) { | |
@Override | ||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | ||
|
||
//check for stopped at the beginning of a chunk | ||
if (stopped) { | ||
stopped = false;//reset | ||
contribution.setExitStatus(ExitStatus.STOPPED); | ||
return RepeatStatus.FINISHED; | ||
}//end if | ||
|
||
@SuppressWarnings("unchecked") | ||
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); | ||
if (inputs == null) { | ||
|
@@ -90,4 +105,9 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon | |
|
||
} | ||
|
||
@Override | ||
public void stop() { | ||
stopped = true; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
* @author Dave Syer | ||
* | ||
*/ | ||
public class CallableTaskletAdapter implements Tasklet, InitializingBean { | ||
public class CallableTaskletAdapter implements StoppableTasklet, InitializingBean { | ||
|
||
private Callable<RepeatStatus> callable; | ||
|
||
|
@@ -62,4 +62,7 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon | |
return callable.call(); | ||
} | ||
|
||
@Override | ||
public void stop() { } | ||
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why implement StoppableTasklet here if the stop method is going to be a no-op? I would argue that we should only implement the StoppableTasklet interface it we are really going to implement it. Otherwise, it is misleading. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reverted to master |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,9 +31,10 @@ | |
* @see AbstractMethodInvokingDelegator | ||
* | ||
* @author Dave Syer | ||
* @author Will Schipp | ||
* | ||
*/ | ||
public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator<Object> implements Tasklet { | ||
public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator<Object> implements StoppableTasklet { | ||
|
||
/** | ||
* Delegate execution to the target object and translate the return value to | ||
|
@@ -62,4 +63,7 @@ protected ExitStatus mapResult(Object result) { | |
return ExitStatus.COMPLETED; | ||
} | ||
|
||
@Override | ||
public void stop() { } | ||
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same note as the CallableTaskletAdapter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reverted to master |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package org.springframework.batch.core.step.tasklet; | ||
|
||
/** | ||
* | ||
* JSR-352 compatible tasklet that provides the 'stop' function. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave the JSR reference out of this. While it is needed for the JSR, there is no such thing in the JSR as a "Tasklet". It's a "Batchlet" which will require further wrapping of this concept. This just provides the stoppability for tasklets, independent of the JSR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
* The Spring Batch Tasklet is analogous to 'batchlet' in the JSR | ||
* terminology | ||
* | ||
* @author Will Schipp | ||
* | ||
*/ | ||
public interface StoppableTasklet extends Tasklet { | ||
|
||
/** | ||
* method to signal a long running/looping tasklet to stop | ||
* | ||
*/ | ||
void stop(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,8 +54,9 @@ | |
* still running when tasklet exits (abnormally). | ||
* | ||
* @author Robert Kasanicky | ||
* @author Will Schipp | ||
*/ | ||
public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean { | ||
public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean { | ||
|
||
protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class); | ||
|
||
|
@@ -77,6 +78,8 @@ public class SystemCommandTasklet extends StepExecutionListenerSupport implement | |
|
||
private boolean interruptOnCancel = false; | ||
|
||
private boolean stopped = false; | ||
|
||
/** | ||
* Execute system command and map its exit code to {@link ExitStatus} using | ||
* {@link SystemProcessExitCodeMapper}. | ||
|
@@ -99,7 +102,7 @@ public Integer call() throws Exception { | |
taskExecutor.execute(systemCommandTask); | ||
|
||
while (true) { | ||
Thread.sleep(checkInterval); | ||
Thread.sleep(checkInterval);//moved to the end of the logic | ||
if (systemCommandTask.isDone()) { | ||
contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get())); | ||
return RepeatStatus.FINISHED; | ||
|
@@ -112,6 +115,13 @@ else if (execution.isTerminateOnly()) { | |
systemCommandTask.cancel(interruptOnCancel); | ||
throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'"); | ||
} | ||
else if (stopped) { | ||
stopped = false;//reset | ||
systemCommandTask.cancel(interruptOnCancel); | ||
contribution.setExitStatus(ExitStatus.STOPPED); | ||
return RepeatStatus.FINISHED; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this one. At the very least, we would need to add in the documentation that the underlying command will not be interrupted unless interuptOnCancel has been set to true. Even with that though, we can't guarantee the state of things once that has occurred. |
||
|
||
} | ||
|
||
} | ||
|
@@ -208,4 +218,9 @@ public void setInterruptOnCancel(boolean interruptOnCancel) { | |
this.interruptOnCancel = interruptOnCancel; | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
stopped = true; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,15 +15,16 @@ | |
*/ | ||
package org.springframework.batch.core.launch.support; | ||
|
||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertNotNull; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.junit.Assert.fail; | ||
import static org.mockito.Matchers.anyString; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -36,12 +37,17 @@ | |
import org.springframework.batch.core.BatchStatus; | ||
import org.springframework.batch.core.Job; | ||
import org.springframework.batch.core.JobExecution; | ||
import org.springframework.batch.core.JobExecutionException; | ||
import org.springframework.batch.core.JobInstance; | ||
import org.springframework.batch.core.JobParameters; | ||
import org.springframework.batch.core.JobParametersIncrementer; | ||
import org.springframework.batch.core.Step; | ||
import org.springframework.batch.core.StepContribution; | ||
import org.springframework.batch.core.configuration.JobRegistry; | ||
import org.springframework.batch.core.configuration.support.MapJobRegistry; | ||
import org.springframework.batch.core.converter.DefaultJobParametersConverter; | ||
import org.springframework.batch.core.explore.JobExplorer; | ||
import org.springframework.batch.core.job.AbstractJob; | ||
import org.springframework.batch.core.job.JobSupport; | ||
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; | ||
import org.springframework.batch.core.launch.JobLauncher; | ||
|
@@ -52,6 +58,10 @@ | |
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; | ||
import org.springframework.batch.core.repository.JobRepository; | ||
import org.springframework.batch.core.repository.JobRestartException; | ||
import org.springframework.batch.core.scope.context.ChunkContext; | ||
import org.springframework.batch.core.step.tasklet.StoppableTasklet; | ||
import org.springframework.batch.core.step.tasklet.TaskletStep; | ||
import org.springframework.batch.repeat.RepeatStatus; | ||
import org.springframework.batch.support.PropertiesConverter; | ||
|
||
/** | ||
|
@@ -349,6 +359,32 @@ public void testStop() throws Exception{ | |
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); | ||
} | ||
|
||
@Test | ||
public void testStopTasklet() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This tests the rosey scenario. What about when something goes wrong with stopping a Tasklet (exception being thrown in the StoppableTasklet#stop() method at a minimum). |
||
JobInstance jobInstance = new JobInstance(123L, job.getName()); | ||
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters); | ||
MockStoppableTasklet tasklet = new MockStoppableTasklet(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reasoning behind the hard coded stub here instead of using Mockito? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. converted to mockito - the original allowed for assertions directly against the mock tasklet |
||
TaskletStep taskletStep = new TaskletStep(); | ||
taskletStep.setTasklet(tasklet); | ||
MockJob job = new MockJob(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same not as above... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. allowed test construction of the job to test the retrieval of the tasklet |
||
job.taskletStep = taskletStep; | ||
|
||
JobRegistry jobRegistry = mock(JobRegistry.class); | ||
TaskletStep step = mock(TaskletStep.class); | ||
|
||
when(step.getTasklet()).thenReturn(tasklet); | ||
when(step.getName()).thenReturn("test_job.step1"); | ||
when(jobRegistry.getJob(anyString())).thenReturn(job); | ||
when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution); | ||
|
||
jobOperator.setJobRegistry(jobRegistry); | ||
jobExplorer.getJobExecution(111L); | ||
jobRepository.update(jobExecution); | ||
jobOperator.stop(111L); | ||
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); | ||
assertTrue(tasklet.stopped); | ||
} | ||
|
||
@Test | ||
public void testAbort() throws Exception { | ||
JobInstance jobInstance = new JobInstance(123L, job.getName()); | ||
|
@@ -370,4 +406,42 @@ public void testAbortNonStopping() throws Exception { | |
jobRepository.update(jobExecution); | ||
jobOperator.abandon(123L); | ||
} | ||
|
||
class MockJob extends AbstractJob { | ||
|
||
private TaskletStep taskletStep; | ||
|
||
@Override | ||
public Step getStep(String stepName) { | ||
return taskletStep; | ||
} | ||
|
||
@Override | ||
public Collection<String> getStepNames() { | ||
return Arrays.asList("test_job.step1"); | ||
} | ||
|
||
@Override | ||
protected void doExecute(JobExecution execution) throws JobExecutionException { | ||
|
||
} | ||
|
||
} | ||
|
||
class MockStoppableTasklet implements StoppableTasklet { | ||
|
||
boolean stopped = Boolean.FALSE; | ||
|
||
@Override | ||
public RepeatStatus execute(StepContribution contribution, | ||
ChunkContext chunkContext) throws Exception { | ||
return null; | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
stopped = Boolean.TRUE; | ||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things here: