Skip to content

Commit b9d6e26

Browse files
committed
Open ChunkMessageChannelItemWriter for extension
Resolves #952
1 parent 66e0b9a commit b9d6e26

File tree

1 file changed

+11
-11
lines changed

1 file changed

+11
-11
lines changed

spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,19 @@ public class ChunkMessageChannelItemWriter<T>
5151

5252
static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
5353

54-
private static final long DEFAULT_THROTTLE_LIMIT = 6;
54+
protected static final long DEFAULT_THROTTLE_LIMIT = 6;
5555

56-
private MessagingTemplate messagingGateway;
56+
protected MessagingTemplate messagingGateway;
5757

58-
private final LocalState localState = new LocalState();
58+
protected final LocalState localState = new LocalState();
5959

60-
private long throttleLimit = DEFAULT_THROTTLE_LIMIT;
60+
protected long throttleLimit = DEFAULT_THROTTLE_LIMIT;
6161

62-
private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
62+
protected final int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
6363

64-
private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;
64+
protected int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;
6565

66-
private PollableChannel replyChannel;
66+
protected PollableChannel replyChannel;
6767

6868
/**
6969
* The maximum number of times to wait at the end of a step for a non-null result from
@@ -189,7 +189,7 @@ public Collection<StepContribution> getStepContributions() {
189189
* Wait until all the results that are in the pipeline come back to the reply channel.
190190
* @return true if successfully received a result, false if timed out
191191
*/
192-
private boolean waitForResults() throws AsynchronousFailureException {
192+
protected boolean waitForResults() throws AsynchronousFailureException {
193193
int count = 0;
194194
int maxCount = maxWaitTimeouts;
195195
Throwable failure = null;
@@ -221,7 +221,7 @@ private boolean waitForResults() throws AsynchronousFailureException {
221221
* (maybe we are sharing a channel and we shouldn't be)
222222
*/
223223
@SuppressWarnings("unchecked")
224-
private void getNextResult() throws AsynchronousFailureException {
224+
protected void getNextResult() throws AsynchronousFailureException {
225225
Message<ChunkResponse> message = (Message<ChunkResponse>) messagingGateway.receive(replyChannel);
226226
if (message != null) {
227227
ChunkResponse payload = message.getPayload();
@@ -254,7 +254,7 @@ private void getNextResult() throws AsynchronousFailureException {
254254
* Re-throws the original throwable if it is unchecked, wraps checked exceptions into
255255
* {@link AsynchronousFailureException}.
256256
*/
257-
private static AsynchronousFailureException wrapIfNecessary(Throwable throwable) {
257+
protected static AsynchronousFailureException wrapIfNecessary(Throwable throwable) {
258258
if (throwable instanceof Error) {
259259
throw (Error) throwable;
260260
}
@@ -266,7 +266,7 @@ else if (throwable instanceof AsynchronousFailureException) {
266266
}
267267
}
268268

269-
private static class LocalState {
269+
protected static class LocalState {
270270

271271
private final AtomicInteger current = new AtomicInteger(-1);
272272

0 commit comments

Comments
 (0)