Skip to content

Change metrics type from int to long in StepExecution #3845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2019 the original author or authors.
* Copyright 2006-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,19 +28,19 @@
@SuppressWarnings("serial")
public class StepContribution implements Serializable {

private volatile int readCount = 0;
private volatile long readCount = 0;

private volatile int writeCount = 0;
private volatile long writeCount = 0;

private volatile int filterCount = 0;
private volatile long filterCount = 0;

private final int parentSkipCount;
private final long parentSkipCount;

private volatile int readSkipCount;
private volatile long readSkipCount;

private volatile int writeSkipCount;
private volatile long writeSkipCount;

private volatile int processSkipCount;
private volatile long processSkipCount;

private ExitStatus exitStatus = ExitStatus.EXECUTING;

Expand Down Expand Up @@ -76,9 +76,9 @@ public ExitStatus getExitStatus() {
/**
* Increment the counter for the number of items processed.
*
* @param count int amount to increment by.
* @param count long amount to increment by.
*/
public void incrementFilterCount(int count) {
public void incrementFilterCount(long count) {
filterCount += count;
}

Expand All @@ -92,9 +92,9 @@ public void incrementReadCount() {
/**
* Increment the counter for the number of items written.
*
* @param count int amount to increment by.
* @param count long amount to increment by.
*/
public void incrementWriteCount(int count) {
public void incrementWriteCount(long count) {
writeCount += count;
}

Expand All @@ -103,7 +103,7 @@ public void incrementWriteCount(int count) {
*
* @return the item counter.
*/
public int getReadCount() {
public long getReadCount() {
return readCount;
}

Expand All @@ -112,7 +112,7 @@ public int getReadCount() {
*
* @return the item counter.
*/
public int getWriteCount() {
public long getWriteCount() {
return writeCount;
}

Expand All @@ -121,15 +121,15 @@ public int getWriteCount() {
*
* @return the filter counter
*/
public int getFilterCount() {
public long getFilterCount() {
return filterCount;
}

/**
* @return the sum of skips accumulated in the parent {@link StepExecution}
* and this <code>StepContribution</code>.
*/
public int getStepSkipCount() {
public long getStepSkipCount() {
return readSkipCount + writeSkipCount + processSkipCount + parentSkipCount;
}

Expand All @@ -138,7 +138,7 @@ public int getStepSkipCount() {
* <code>StepContribution</code> (not including skips accumulated in the
* parent {@link StepExecution}).
*/
public int getSkipCount() {
public long getSkipCount() {
return readSkipCount + writeSkipCount + processSkipCount;
}

Expand All @@ -152,9 +152,9 @@ public void incrementReadSkipCount() {
/**
* Increment the read skip count for this contribution
*
* @param count int amount to increment by.
* @param count long amount to increment by.
*/
public void incrementReadSkipCount(int count) {
public void incrementReadSkipCount(long count) {
readSkipCount += count;
}

Expand All @@ -175,14 +175,14 @@ public void incrementProcessSkipCount() {
/**
* @return the read skip count
*/
public int getReadSkipCount() {
public long getReadSkipCount() {
return readSkipCount;
}

/**
* @return the write skip count
*/
public int getWriteSkipCount() {
public long getWriteSkipCount() {
return writeSkipCount;
}

Expand All @@ -191,7 +191,7 @@ public int getWriteSkipCount() {
*
* @return the process skip count
*/
public int getProcessSkipCount() {
public long getProcessSkipCount() {
return processSkipCount;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2013 the original author or authors.
* Copyright 2006-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@
*
* @author Lucas Ward
* @author Dave Syer
* @author Mahmoud Ben Hassine
*
*/
@SuppressWarnings("serial")
Expand All @@ -44,19 +45,19 @@ public class StepExecution extends Entity {

private volatile BatchStatus status = BatchStatus.STARTING;

private volatile int readCount = 0;
private volatile long readCount = 0;

private volatile int writeCount = 0;
private volatile long writeCount = 0;

private volatile int commitCount = 0;
private volatile long commitCount = 0;

private volatile int rollbackCount = 0;
private volatile long rollbackCount = 0;

private volatile int readSkipCount = 0;
private volatile long readSkipCount = 0;

private volatile int processSkipCount = 0;
private volatile long processSkipCount = 0;

private volatile int writeSkipCount = 0;
private volatile long writeSkipCount = 0;

private volatile Date startTime = new Date(System.currentTimeMillis());

Expand All @@ -70,7 +71,7 @@ public class StepExecution extends Entity {

private volatile boolean terminateOnly;

private volatile int filterCount;
private volatile long filterCount;

private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -140,7 +141,7 @@ public void setExecutionContext(ExecutionContext executionContext) {
*
* @return the current number of commits
*/
public int getCommitCount() {
public long getCommitCount() {
return commitCount;
}

Expand All @@ -149,7 +150,7 @@ public int getCommitCount() {
*
* @param commitCount the current number of commits
*/
public void setCommitCount(int commitCount) {
public void setCommitCount(long commitCount) {
this.commitCount = commitCount;
}

Expand All @@ -176,7 +177,7 @@ public void setEndTime(Date endTime) {
*
* @return the current number of items read for this execution
*/
public int getReadCount() {
public long getReadCount() {
return readCount;
}

Expand All @@ -185,7 +186,7 @@ public int getReadCount() {
*
* @param readCount the current number of read items for this execution
*/
public void setReadCount(int readCount) {
public void setReadCount(long readCount) {
this.readCount = readCount;
}

Expand All @@ -194,7 +195,7 @@ public void setReadCount(int readCount) {
*
* @return the current number of items written for this execution
*/
public int getWriteCount() {
public long getWriteCount() {
return writeCount;
}

Expand All @@ -203,7 +204,7 @@ public int getWriteCount() {
*
* @param writeCount the current number of written items for this execution
*/
public void setWriteCount(int writeCount) {
public void setWriteCount(long writeCount) {
this.writeCount = writeCount;
}

Expand All @@ -212,7 +213,7 @@ public void setWriteCount(int writeCount) {
*
* @return the current number of rollbacks for this execution
*/
public int getRollbackCount() {
public long getRollbackCount() {
return rollbackCount;
}

Expand All @@ -221,7 +222,7 @@ public int getRollbackCount() {
*
* @return the current number of items filtered out of this execution
*/
public int getFilterCount() {
public long getFilterCount() {
return filterCount;
}

Expand All @@ -230,15 +231,15 @@ public int getFilterCount() {
* @param filterCount the number of items filtered out of this execution to
* set
*/
public void setFilterCount(int filterCount) {
public void setFilterCount(long filterCount) {
this.filterCount = filterCount;
}

/**
* Setter for number of rollbacks for this execution
* @param rollbackCount int the number of rollbacks.
* @param rollbackCount long the number of rollbacks.
*/
public void setRollbackCount(int rollbackCount) {
public void setRollbackCount(long rollbackCount) {
this.rollbackCount = rollbackCount;
}

Expand Down Expand Up @@ -383,7 +384,7 @@ public void setTerminateOnly() {
/**
* @return the total number of items skipped.
*/
public int getSkipCount() {
public long getSkipCount() {
return readSkipCount + processSkipCount + writeSkipCount;
}

Expand All @@ -410,48 +411,48 @@ public JobParameters getJobParameters() {
/**
* @return the number of records skipped on read
*/
public int getReadSkipCount() {
public long getReadSkipCount() {
return readSkipCount;
}

/**
* @return the number of records skipped on write
*/
public int getWriteSkipCount() {
public long getWriteSkipCount() {
return writeSkipCount;
}

/**
* Set the number of records skipped on read
*
* @param readSkipCount int containing read skip count to be used for the step execution.
* @param readSkipCount long containing read skip count to be used for the step execution.
*/
public void setReadSkipCount(int readSkipCount) {
public void setReadSkipCount(long readSkipCount) {
this.readSkipCount = readSkipCount;
}

/**
* Set the number of records skipped on write
*
* @param writeSkipCount int containing write skip count to be used for the step execution.
* @param writeSkipCount long containing write skip count to be used for the step execution.
*/
public void setWriteSkipCount(int writeSkipCount) {
public void setWriteSkipCount(long writeSkipCount) {
this.writeSkipCount = writeSkipCount;
}

/**
* @return the number of records skipped during processing
*/
public int getProcessSkipCount() {
public long getProcessSkipCount() {
return processSkipCount;
}

/**
* Set the number of records skipped during processing.
*
* @param processSkipCount int containing process skip count to be used for the step execution.
* @param processSkipCount long containing process skip count to be used for the step execution.
*/
public void setProcessSkipCount(int processSkipCount) {
public void setProcessSkipCount(long processSkipCount) {
this.processSkipCount = processSkipCount;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013 the original author or authors.
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@
*
* @author Michael Minella
* @author Chris Schaefer
* @author Mahmoud Ben Hassine
*
* @param <I> input type for the step
* @param <O> output type for the step
Expand Down Expand Up @@ -193,7 +194,7 @@ public I recover(RetryContext context) throws Exception {
* @param e the cause of the skip
* @param skipCount the current skip count
*/
private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
private boolean shouldSkip(SkipPolicy policy, Throwable e, long skipCount) {
try {
return policy.shouldSkip(e, skipCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ private List<Object[]> buildStepExecutionParameters(StepExecution stepExecution)
stepExecution.getWriteSkipCount(), stepExecution.getProcessSkipCount(),
stepExecution.getRollbackCount(), stepExecution.getLastUpdated() };
Integer[] parameterTypes = new Integer[] { Types.BIGINT, Types.INTEGER, Types.VARCHAR, Types.BIGINT,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER,
Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER,
Types.INTEGER, Types.TIMESTAMP };
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.TIMESTAMP };
parameters.add(0, Arrays.copyOf(parameterValues,parameterValues.length));
parameters.add(1, Arrays.copyOf(parameterTypes,parameterTypes.length));
return parameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ private void callProcessSkipListener(I item, Throwable e) {
* @param e the cause of the skip
* @param skipCount the current skip count
*/
private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
private boolean shouldSkip(SkipPolicy policy, Throwable e, long skipCount) {
try {
return policy.shouldSkip(e, skipCount);
}
Expand Down
Loading