Skip to content

BATCH-2009 - stoppable tasklet #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.batch.core.launch.support;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand All @@ -33,13 +34,15 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.ListableJobLocator;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.AbstractJob;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobLauncher;
Expand All @@ -52,6 +55,9 @@
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.support.PropertiesConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -71,6 +77,7 @@
*
* @author Dave Syer
* @author Lucas Ward
* @author Will Schipp
* @since 2.0
*/
public class SimpleJobOperator implements JobOperator, InitializingBean {
Expand Down Expand Up @@ -392,6 +399,32 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe
}
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
//implementation to support Tasklet.stop()
//TODO manage this is an 'scoped' proxy test
//find the job object
Job job;
try {
job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
//get the steps for the job
if (job instanceof AbstractJob) {
//retrieve the steps
Collection<String> stepNames = ((AbstractJob)job).getStepNames();
//go through the step names are retrieve each step
for (String stepName : stepNames) {
Step step = ((AbstractJob)job).getStep(stepName);
//determine type
if (step instanceof TaskletStep) {
//invoke stop --> reflection?
Tasklet tasklet = ((TaskletStep) step).getTasklet();
if (tasklet instanceof StoppableTasklet) {
((StoppableTasklet)tasklet).stop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things here:

  1. I need to look to see if there is a better way to get a handle on the steps.
  2. We should only be calling StoppableTasklet#stop() on a tasklet that is actually running. If it is not, that may cause unnecessary exceptions to be thrown.

}//end if
}//end if
}//end for
}//end if
} catch (NoSuchJobException e) {
logger.error("Couldn't find Job for the execution:" + jobExecution.getJobInstance().getJobName());
}

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates to the ChunkOrientedTasklet aren't needed since the status is checked at chunk boundaries anyways (hence the reason you can stop a chunk oriented step via the JobOperator#stop() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

Expand All @@ -28,10 +30,11 @@
* handling.
*
* @author Dave Syer
* @author Will Schipp
*
* @param <I> input item type
*/
public class ChunkOrientedTasklet<I> implements Tasklet {
public class ChunkOrientedTasklet<I> implements StoppableTasklet {

private static final String INPUTS_KEY = "INPUTS";

Expand All @@ -40,6 +43,11 @@ public class ChunkOrientedTasklet<I> implements Tasklet {
private final ChunkProvider<I> chunkProvider;

private boolean buffering = true;

/**
* support for stoppable tasklet
*/
private boolean stopped = false;

private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);

Expand All @@ -63,6 +71,13 @@ public void setBuffering(boolean buffering) {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

//check for stopped at the beginning of a chunk
if (stopped) {
stopped = false;//reset
contribution.setExitStatus(ExitStatus.STOPPED);
return RepeatStatus.FINISHED;
}//end if

@SuppressWarnings("unchecked")
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
Expand Down Expand Up @@ -90,4 +105,9 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon

}

@Override
public void stop() {
stopped = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @author Dave Syer
*
*/
public class CallableTaskletAdapter implements Tasklet, InitializingBean {
public class CallableTaskletAdapter implements StoppableTasklet, InitializingBean {

private Callable<RepeatStatus> callable;

Expand Down Expand Up @@ -62,4 +62,7 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
return callable.call();
}

@Override
public void stop() { }

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why implement StoppableTasklet here if the stop method is going to be a no-op? I would argue that we should only implement the StoppableTasklet interface it we are really going to implement it. Otherwise, it is misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to master

Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
* @see AbstractMethodInvokingDelegator
*
* @author Dave Syer
* @author Will Schipp
*
*/
public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator<Object> implements Tasklet {
public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator<Object> implements StoppableTasklet {

/**
* Delegate execution to the target object and translate the return value to
Expand Down Expand Up @@ -62,4 +63,7 @@ protected ExitStatus mapResult(Object result) {
return ExitStatus.COMPLETED;
}

@Override
public void stop() { }

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note as the CallableTaskletAdapter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to master

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.springframework.batch.core.step.tasklet;

/**
*
* JSR-352 compatible tasklet that provides the 'stop' function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave the JSR reference out of this. While it is needed for the JSR, there is no such thing in the JSR as a "Tasklet". It's a "Batchlet" which will require further wrapping of this concept. This just provides the stoppability for tasklets, independent of the JSR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* The Spring Batch Tasklet is analogous to 'batchlet' in the JSR
* terminology
*
* @author Will Schipp
*
*/
public interface StoppableTasklet extends Tasklet {

/**
* method to signal a long running/looping tasklet to stop
*
*/
void stop();

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@
* still running when tasklet exits (abnormally).
*
* @author Robert Kasanicky
* @author Will Schipp
*/
public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean {
public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean {

protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);

Expand All @@ -77,6 +78,8 @@ public class SystemCommandTasklet extends StepExecutionListenerSupport implement

private boolean interruptOnCancel = false;

private boolean stopped = false;

/**
* Execute system command and map its exit code to {@link ExitStatus} using
* {@link SystemProcessExitCodeMapper}.
Expand All @@ -99,7 +102,7 @@ public Integer call() throws Exception {
taskExecutor.execute(systemCommandTask);

while (true) {
Thread.sleep(checkInterval);
Thread.sleep(checkInterval);//moved to the end of the logic
if (systemCommandTask.isDone()) {
contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
return RepeatStatus.FINISHED;
Expand All @@ -112,6 +115,13 @@ else if (execution.isTerminateOnly()) {
systemCommandTask.cancel(interruptOnCancel);
throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
}
else if (stopped) {
stopped = false;//reset
systemCommandTask.cancel(interruptOnCancel);
contribution.setExitStatus(ExitStatus.STOPPED);
return RepeatStatus.FINISHED;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one. At the very least, we would need to add in the documentation that the underlying command will not be interrupted unless interuptOnCancel has been set to true. Even with that though, we can't guarantee the state of things once that has occurred.


}

}
Expand Down Expand Up @@ -208,4 +218,9 @@ public void setInterruptOnCancel(boolean interruptOnCancel) {
this.interruptOnCancel = interruptOnCancel;
}

@Override
public void stop() {
stopped = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* @author Ben Hale
* @author Robert Kasanicky
* @author Michael Minella
* @author Will Schipp
*/
@SuppressWarnings("serial")
public class TaskletStep extends AbstractStep {
Expand Down Expand Up @@ -306,6 +307,14 @@ protected void open(ExecutionContext ctx) throws Exception {
stream.open(ctx);
}

/**
* retrieve the tasklet - helper method for JobOperator
* @return
*/
public Tasklet getTasklet() {
return tasklet;
}

/**
* A callback for the transactional work inside a chunk. Also detects
* failures in the transaction commit and rollback, only panicking if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/
package org.springframework.batch.core.launch.support;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,12 +37,17 @@
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.AbstractJob;
import org.springframework.batch.core.job.JobSupport;
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobLauncher;
Expand All @@ -52,6 +58,10 @@
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.batch.support.PropertiesConverter;

/**
Expand Down Expand Up @@ -349,6 +359,32 @@ public void testStop() throws Exception{
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
}

@Test
public void testStopTasklet() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests the rosey scenario. What about when something goes wrong with stopping a Tasklet (exception being thrown in the StoppableTasklet#stop() method at a minimum).

JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
MockStoppableTasklet tasklet = new MockStoppableTasklet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the hard coded stub here instead of using Mockito?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converted to mockito - the original allowed for assertions directly against the mock tasklet

TaskletStep taskletStep = new TaskletStep();
taskletStep.setTasklet(tasklet);
MockJob job = new MockJob();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same not as above...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowed test construction of the job to test the retrieval of the tasklet

job.taskletStep = taskletStep;

JobRegistry jobRegistry = mock(JobRegistry.class);
TaskletStep step = mock(TaskletStep.class);

when(step.getTasklet()).thenReturn(tasklet);
when(step.getName()).thenReturn("test_job.step1");
when(jobRegistry.getJob(anyString())).thenReturn(job);
when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution);

jobOperator.setJobRegistry(jobRegistry);
jobExplorer.getJobExecution(111L);
jobRepository.update(jobExecution);
jobOperator.stop(111L);
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
assertTrue(tasklet.stopped);
}

@Test
public void testAbort() throws Exception {
JobInstance jobInstance = new JobInstance(123L, job.getName());
Expand All @@ -370,4 +406,42 @@ public void testAbortNonStopping() throws Exception {
jobRepository.update(jobExecution);
jobOperator.abandon(123L);
}

class MockJob extends AbstractJob {

private TaskletStep taskletStep;

@Override
public Step getStep(String stepName) {
return taskletStep;
}

@Override
public Collection<String> getStepNames() {
return Arrays.asList("test_job.step1");
}

@Override
protected void doExecute(JobExecution execution) throws JobExecutionException {

}

}

class MockStoppableTasklet implements StoppableTasklet {

boolean stopped = Boolean.FALSE;

@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
return null;
}

@Override
public void stop() {
stopped = Boolean.TRUE;
}

}
}
Loading