diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchObservabilityBeanPostProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchObservabilityBeanPostProcessor.java
new file mode 100644
index 0000000000..d269797323
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchObservabilityBeanPostProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.configuration.annotation;
+
+import io.micrometer.observation.ObservationRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.batch.core.job.AbstractJob;
+import org.springframework.batch.core.step.AbstractStep;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+
+/**
+ * Bean post processor that configures observable batch artifacts (jobs and steps) with
+ * Micrometer's observation registry.
+ *
+ * @author Mahmoud Ben Hassine
+ * @since 5.0
+ */
+public class BatchObservabilityBeanPostProcessor implements BeanFactoryPostProcessor, BeanPostProcessor {
+
+ private static final Log LOGGER = LogFactory.getLog(BatchObservabilityBeanPostProcessor.class);
+
+ private ConfigurableListableBeanFactory beanFactory;
+
+ @Override
+ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
+ this.beanFactory = beanFactory;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ try {
+ ObservationRegistry observationRegistry = this.beanFactory.getBean(ObservationRegistry.class);
+ if (bean instanceof AbstractJob) {
+ ((AbstractJob) bean).setObservationRegistry(observationRegistry);
+ }
+ if (bean instanceof AbstractStep) {
+ ((AbstractStep) bean).setObservationRegistry(observationRegistry);
+ }
+ }
+ catch (NoSuchBeanDefinitionException e) {
+ LOGGER.info("No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP");
+ }
+ return bean;
+ }
+
+}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java
index 8259b05159..ac28ad33ba 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java
@@ -151,7 +151,8 @@
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
-@Import({ BatchRegistrar.class, ScopeConfiguration.class, AutomaticJobRegistrarBeanPostProcessor.class })
+@Import({ BatchRegistrar.class, ScopeConfiguration.class, AutomaticJobRegistrarBeanPostProcessor.class,
+ BatchObservabilityBeanPostProcessor.class })
public @interface EnableBatchProcessing {
/**
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java
index 4df2422c06..2d69bc9c61 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java
@@ -22,8 +22,11 @@
import java.util.stream.Collectors;
import io.micrometer.core.instrument.LongTaskTimer;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.observation.Observation;
+import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,6 +90,10 @@ public abstract class AbstractJob implements Job, StepLocator, BeanNameAware, In
private StepHandler stepHandler;
+ private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
+
+ private MeterRegistry meterRegistry = Metrics.globalRegistry;
+
private BatchJobObservationConvention observationConvention = new DefaultBatchJobObservationConvention();
/**
@@ -285,11 +292,13 @@ public final void execute(JobExecution execution) {
JobSynchronizationManager.register(execution);
String activeJobMeterName = "job.active";
- LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer(activeJobMeterName, "Active jobs", Tag.of(
- BatchMetrics.METRICS_PREFIX + activeJobMeterName + ".name", execution.getJobInstance().getJobName()));
+ LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer(this.meterRegistry, activeJobMeterName,
+ "Active jobs", Tag.of(BatchMetrics.METRICS_PREFIX + activeJobMeterName + ".name",
+ execution.getJobInstance().getJobName()));
LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
Observation observation = BatchMetrics
- .createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName(), new BatchJobContext(execution))
+ .createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName(), new BatchJobContext(execution),
+ this.observationRegistry)
.contextualName(execution.getJobInstance().getJobName())
.observationConvention(this.observationConvention).start();
try (Observation.Scope scope = observation.openScope()) {
@@ -439,6 +448,14 @@ public void setObservationConvention(BatchJobObservationConvention observationCo
this.observationConvention = observationConvention;
}
+ public void setObservationRegistry(ObservationRegistry observationRegistry) {
+ this.observationRegistry = observationRegistry;
+ }
+
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ }
+
@Override
public String toString() {
return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]";
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java
index 3a6651a26c..9f200da10a 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2006-2020 the original author or authors.
+ * Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,8 +22,11 @@
import java.util.List;
import java.util.Set;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobParametersIncrementer;
@@ -99,6 +102,30 @@ public B repository(JobRepository jobRepository) {
return result;
}
+ /**
+ * Sets the observation registry for the job.
+ * @param observationRegistry the observation registry (optional)
+ * @return this to enable fluent chaining
+ */
+ public B observationRegistry(ObservationRegistry observationRegistry) {
+ properties.observationRegistry = observationRegistry;
+ @SuppressWarnings("unchecked")
+ B result = (B) this;
+ return result;
+ }
+
+ /**
+ * Sets the meter registry for the job.
+ * @param meterRegistry the meter registry (optional)
+ * @return this to enable fluent chaining
+ */
+ public B meterRegistry(MeterRegistry meterRegistry) {
+ properties.meterRegistry = meterRegistry;
+ @SuppressWarnings("unchecked")
+ B result = (B) this;
+ return result;
+ }
+
/**
* Registers objects using the annotation based listener configuration.
* @param listener the object that has a method configured with listener annotation
@@ -170,6 +197,14 @@ protected void enhance(Job target) {
if (jobParametersValidator != null) {
job.setJobParametersValidator(jobParametersValidator);
}
+ ObservationRegistry observationRegistry = properties.getObservationRegistry();
+ if (observationRegistry != null) {
+ job.setObservationRegistry(observationRegistry);
+ }
+ MeterRegistry meterRegistry = properties.getMeterRegistry();
+ if (meterRegistry != null) {
+ job.setMeterRegistry(meterRegistry);
+ }
Boolean restartable = properties.getRestartable();
if (restartable != null) {
@@ -193,6 +228,10 @@ public static class CommonJobProperties {
private JobRepository jobRepository;
+ private ObservationRegistry observationRegistry;
+
+ private MeterRegistry meterRegistry;
+
private JobParametersIncrementer jobParametersIncrementer;
private JobParametersValidator jobParametersValidator;
@@ -204,6 +243,8 @@ public CommonJobProperties(CommonJobProperties properties) {
this.name = properties.name;
this.restartable = properties.restartable;
this.jobRepository = properties.jobRepository;
+ this.observationRegistry = properties.observationRegistry;
+ this.meterRegistry = properties.meterRegistry;
this.jobExecutionListeners = new LinkedHashSet<>(properties.jobExecutionListeners);
this.jobParametersIncrementer = properties.jobParametersIncrementer;
this.jobParametersValidator = properties.jobParametersValidator;
@@ -233,6 +274,22 @@ public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
+ public ObservationRegistry getObservationRegistry() {
+ return observationRegistry;
+ }
+
+ public void setObservationRegistry(ObservationRegistry observationRegistry) {
+ this.observationRegistry = observationRegistry;
+ }
+
+ public MeterRegistry getMeterRegistry() {
+ return meterRegistry;
+ }
+
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ }
+
public String getName() {
return name;
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java
index 93c2a63f4c..f0a22a7393 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java
@@ -21,7 +21,7 @@
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.LongTaskTimer;
-import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
@@ -34,8 +34,8 @@
* Central class for batch metrics. It provides:
*
*
- * - the main entry point to interact with Micrometer's {@link Metrics#globalRegistry}
- * with common metrics such as {@link Timer} and {@link LongTaskTimer}.
+ * - the main entry point to interact with Micrometer's API to create common metrics
+ * such as {@link Timer} and {@link LongTaskTimer}.
* - Some utility methods like calculating durations and formatting them in a human
* readable format.
*
@@ -54,32 +54,21 @@ public final class BatchMetrics {
public static final String STATUS_FAILURE = "FAILURE";
- /**
- * Global {@link ObservationRegistry}. A {@link DefaultMeterObservationHandler} is
- * attached to create a {@link Timer} for every finished {@link Observation}.
- */
- public static ObservationRegistry observationRegistry;
-
- static {
- observationRegistry = ObservationRegistry.create();
- observationRegistry.observationConfig()
- .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry));
- }
-
private BatchMetrics() {
}
/**
* Create a {@link Timer}.
+ * @param meterRegistry the meter registry to use
* @param name of the timer. Will be prefixed with
* {@link BatchMetrics#METRICS_PREFIX}.
* @param description of the timer
* @param tags of the timer
* @return a new timer instance
*/
- public static Timer createTimer(String name, String description, Tag... tags) {
+ public static Timer createTimer(MeterRegistry meterRegistry, String name, String description, Tag... tags) {
return Timer.builder(METRICS_PREFIX + name).description(description).tags(Arrays.asList(tags))
- .register(Metrics.globalRegistry);
+ .register(meterRegistry);
}
/**
@@ -94,7 +83,8 @@ public static Timer createTimer(String name, String description, Tag... tags) {
* @return a new observation instance
* @since 5.0
*/
- public static Observation createObservation(String name, BatchJobContext context) {
+ public static Observation createObservation(String name, BatchJobContext context,
+ ObservationRegistry observationRegistry) {
return Observation.createNotStarted(name, context, observationRegistry);
}
@@ -110,29 +100,33 @@ public static Observation createObservation(String name, BatchJobContext context
* @return a new observation instance
* @since 5.0
*/
- public static Observation createObservation(String name, BatchStepContext context) {
+ public static Observation createObservation(String name, BatchStepContext context,
+ ObservationRegistry observationRegistry) {
return Observation.createNotStarted(name, context, observationRegistry);
}
/**
* Create a new {@link Timer.Sample}.
+ * @param meterRegistry the meter registry to use
* @return a new timer sample instance
*/
- public static Timer.Sample createTimerSample() {
- return Timer.start(Metrics.globalRegistry);
+ public static Timer.Sample createTimerSample(MeterRegistry meterRegistry) {
+ return Timer.start(meterRegistry);
}
/**
* Create a new {@link LongTaskTimer}.
+ * @param meterRegistry the meter registry to use
* @param name of the long task timer. Will be prefixed with
* {@link BatchMetrics#METRICS_PREFIX}.
* @param description of the long task timer.
* @param tags of the timer
* @return a new long task timer instance
*/
- public static LongTaskTimer createLongTaskTimer(String name, String description, Tag... tags) {
+ public static LongTaskTimer createLongTaskTimer(MeterRegistry meterRegistry, String name, String description,
+ Tag... tags) {
return LongTaskTimer.builder(METRICS_PREFIX + name).description(description).tags(Arrays.asList(tags))
- .register(Metrics.globalRegistry);
+ .register(meterRegistry);
}
/**
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java
index 92aecda112..162c1978bc 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java
@@ -20,7 +20,10 @@
import java.util.List;
import java.util.stream.Collectors;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
import io.micrometer.observation.Observation;
+import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -75,6 +78,10 @@ public abstract class AbstractStep implements Step, InitializingBean, BeanNameAw
private JobRepository jobRepository;
+ private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
+
+ private MeterRegistry meterRegistry = Metrics.globalRegistry;
+
private BatchStepObservationConvention observationConvention = new DefaultBatchStepObservationConvention();
/**
@@ -203,7 +210,7 @@ public final void execute(StepExecution stepExecution)
stepExecution.setStatus(BatchStatus.STARTED);
Observation observation = BatchMetrics
.createObservation(BatchStepObservation.BATCH_STEP_OBSERVATION.getName(),
- new BatchStepContext(stepExecution))
+ new BatchStepContext(stepExecution), this.observationRegistry)
.contextualName(stepExecution.getStepName()).observationConvention(this.observationConvention).start();
getJobRepository().update(stepExecution);
@@ -424,4 +431,12 @@ public void setObservationConvention(BatchStepObservationConvention observationC
this.observationConvention = observationConvention;
}
+ public void setObservationRegistry(ObservationRegistry observationRegistry) {
+ this.observationRegistry = observationRegistry;
+ }
+
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ }
+
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java
index ab86a7c15c..01b71c9345 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java
@@ -21,6 +21,9 @@
import java.util.LinkedHashSet;
import java.util.Set;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemReadListener;
@@ -85,6 +88,8 @@ public class SimpleStepBuilder extends AbstractTaskletStepBuilder parent) {
this.processor = parent.processor;
this.itemListeners = parent.itemListeners;
this.readerTransactionalQueue = parent.readerTransactionalQueue;
+ this.meterRegistry = parent.meterRegistry;
this.transactionManager(parent.getTransactionManager());
}
@@ -159,7 +165,9 @@ protected Tasklet createTasklet() {
SimpleChunkProvider chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);
SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
chunkProvider.setListeners(new ArrayList<>(itemListeners));
+ chunkProvider.setMeterRegistry(this.meterRegistry);
chunkProcessor.setListeners(new ArrayList<>(itemListeners));
+ chunkProcessor.setMeterRegistry(this.meterRegistry);
ChunkOrientedTasklet tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
tasklet.setBuffering(!readerTransactionalQueue);
return tasklet;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java
index 1d2c05cdf6..3b0a3bba7a 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java
@@ -15,8 +15,18 @@
*/
package org.springframework.batch.core.step.builder;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.annotation.AfterStep;
@@ -24,15 +34,7 @@
import org.springframework.batch.core.listener.StepListenerFactoryBean;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.AbstractStep;
-import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.support.ReflectionUtils;
-import org.springframework.transaction.PlatformTransactionManager;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
/**
* A base class and utility for other step builders providing access to common properties
@@ -68,6 +70,16 @@ public B repository(JobRepository jobRepository) {
return self();
}
+ public B observationRegistry(ObservationRegistry observationRegistry) {
+ properties.observationRegistry = observationRegistry;
+ return self();
+ }
+
+ public B meterRegistry(MeterRegistry meterRegistry) {
+ properties.meterRegistry = meterRegistry;
+ return self();
+ }
+
public B startLimit(int startLimit) {
properties.startLimit = startLimit;
return self();
@@ -123,6 +135,16 @@ protected void enhance(Step target) {
AbstractStep step = (AbstractStep) target;
step.setJobRepository(properties.getJobRepository());
+ ObservationRegistry observationRegistry = properties.getObservationRegistry();
+ if (observationRegistry != null) {
+ step.setObservationRegistry(observationRegistry);
+ }
+
+ MeterRegistry meterRegistry = properties.getMeterRegistry();
+ if (meterRegistry != null) {
+ step.setMeterRegistry(meterRegistry);
+ }
+
Boolean allowStartIfComplete = properties.allowStartIfComplete;
if (allowStartIfComplete != null) {
step.setAllowStartIfComplete(allowStartIfComplete);
@@ -149,6 +171,10 @@ public static class CommonStepProperties {
private JobRepository jobRepository;
+ private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
+
+ private MeterRegistry meterRegistry = Metrics.globalRegistry;
+
public CommonStepProperties() {
}
@@ -157,6 +183,8 @@ public CommonStepProperties(CommonStepProperties properties) {
this.startLimit = properties.startLimit;
this.allowStartIfComplete = properties.allowStartIfComplete;
this.jobRepository = properties.jobRepository;
+ this.observationRegistry = properties.observationRegistry;
+ this.meterRegistry = properties.meterRegistry;
this.stepExecutionListeners = new ArrayList<>(properties.stepExecutionListeners);
}
@@ -168,6 +196,22 @@ public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
+ public ObservationRegistry getObservationRegistry() {
+ return observationRegistry;
+ }
+
+ public void setObservationRegistry(ObservationRegistry observationRegistry) {
+ this.observationRegistry = observationRegistry;
+ }
+
+ public MeterRegistry getMeterRegistry() {
+ return meterRegistry;
+ }
+
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ }
+
public String getName() {
return name;
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java
index 86c984bf81..144dc7c5bb 100755
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java
@@ -15,6 +15,7 @@
*/
package org.springframework.batch.core.step.factory;
+import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.ChunkListener;
@@ -84,6 +85,8 @@ public class SimpleStepFactoryBean implements FactoryBean, BeanNameA
protected JobRepository jobRepository;
+ protected ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
+
private boolean singleton = true;
private ItemStream[] streams = new ItemStream[0];
@@ -270,6 +273,15 @@ public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
+ /**
+ * Public setter for {@link ObservationRegistry}.
+ * @param observationRegistry is an optional dependency (defaults to
+ * {@link ObservationRegistry#NOOP}).
+ */
+ public void setObservationRegistry(ObservationRegistry observationRegistry) {
+ this.observationRegistry = observationRegistry;
+ }
+
/**
* Public setter for the {@link PlatformTransactionManager}.
* @param transactionManager the transaction manager to set
@@ -469,6 +481,7 @@ protected void applyConfiguration(SimpleStepBuilder builder) {
builder.transactionManager(transactionManager);
builder.transactionAttribute(getTransactionAttribute());
builder.repository(jobRepository);
+ builder.observationRegistry(observationRegistry);
builder.startLimit(startLimit);
builder.allowStartIfComplete(allowStartIfComplete);
builder.chunk(commitInterval);
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java
index 8214002097..0a77271190 100755
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java
@@ -17,7 +17,6 @@
package org.springframework.batch.core.step.item;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -220,7 +219,7 @@ protected Chunk transform(final StepContribution contribution, Chunk input
@Override
public O doWithRetry(RetryContext context) throws Exception {
- Timer.Sample sample = BatchMetrics.createTimerSample();
+ Timer.Sample sample = BatchMetrics.createTimerSample(meterRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
O output = null;
try {
@@ -336,7 +335,7 @@ public Object doWithRetry(RetryContext context) throws Exception {
if (!data.scanning()) {
chunkMonitor.setChunkSize(inputs.size());
- Timer.Sample sample = BatchMetrics.createTimerSample();
+ Timer.Sample sample = BatchMetrics.createTimerSample(meterRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
doWrite(outputs);
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java
index 193cf56810..f8d908bff1 100755
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java
@@ -18,6 +18,8 @@
import java.util.List;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
@@ -47,6 +49,8 @@ public class SimpleChunkProcessor implements ChunkProcessor, Initializi
private final MulticasterBatchListener listener = new MulticasterBatchListener<>();
+ protected MeterRegistry meterRegistry = Metrics.globalRegistry;
+
/**
* Default constructor for ease of configuration.
*/
@@ -79,6 +83,10 @@ public void setItemWriter(ItemWriter super O> itemWriter) {
this.itemWriter = itemWriter;
}
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ }
+
/**
* Check mandatory properties.
*
@@ -278,7 +286,7 @@ protected Chunk getAdjustedOutputs(Chunk inputs, Chunk outputs) {
* @throws Exception if there is a problem
*/
protected void write(StepContribution contribution, Chunk inputs, Chunk outputs) throws Exception {
- Timer.Sample sample = BatchMetrics.createTimerSample();
+ Timer.Sample sample = BatchMetrics.createTimerSample(this.meterRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
doWrite(outputs);
@@ -303,7 +311,7 @@ protected Chunk transform(StepContribution contribution, Chunk inputs) thr
for (Chunk.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
O output;
- Timer.Sample sample = BatchMetrics.createTimerSample();
+ Timer.Sample sample = BatchMetrics.createTimerSample(this.meterRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
output = doProcess(item);
@@ -333,7 +341,7 @@ protected Chunk transform(StepContribution contribution, Chunk inputs) thr
protected void stopTimer(Timer.Sample sample, StepExecution stepExecution, String metricName, String status,
String description) {
String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + metricName;
- sample.stop(BatchMetrics.createTimer(metricName, description + " duration",
+ sample.stop(BatchMetrics.createTimer(this.meterRegistry, metricName, description + " duration",
Tag.of(fullyQualifiedMetricName + ".job.name",
stepExecution.getJobExecution().getJobInstance().getJobName()),
Tag.of(fullyQualifiedMetricName + ".step.name", stepExecution.getStepName()),
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java
index 8cb63e60c5..8253c27538 100755
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java
@@ -18,6 +18,7 @@
import java.util.List;
+import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
@@ -55,6 +56,8 @@ public class SimpleChunkProvider implements ChunkProvider {
private final RepeatOperations repeatOperations;
+ private MeterRegistry meterRegistry = Metrics.globalRegistry;
+
public SimpleChunkProvider(ItemReader extends I> itemReader, RepeatOperations repeatOperations) {
this.itemReader = itemReader;
this.repeatOperations = repeatOperations;
@@ -71,6 +74,10 @@ public void setListeners(List extends StepListener> listeners) {
}
}
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ }
+
/**
* Register a listener for callbacks at the appropriate stages in a process.
* @param listener a {@link StepListener}
@@ -150,7 +157,7 @@ public RepeatStatus doInIteration(final RepeatContext context) throws Exception
private void stopTimer(Timer.Sample sample, StepExecution stepExecution, String status) {
String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + "item.read";
- sample.stop(BatchMetrics.createTimer("item.read", "Item reading duration",
+ sample.stop(BatchMetrics.createTimer(this.meterRegistry, "item.read", "Item reading duration",
Tag.of(fullyQualifiedMetricName + ".job.name",
stepExecution.getJobExecution().getJobInstance().getJobName()),
Tag.of(fullyQualifiedMetricName + ".step.name", stepExecution.getStepName()),
diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java
index c6665b7587..be24fd9be8 100644
--- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java
+++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java
@@ -26,7 +26,9 @@
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
+import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.core.tck.MeterRegistryAssert;
+import io.micrometer.observation.ObservationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -34,7 +36,6 @@
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
-import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobInterruptedException;
@@ -49,9 +50,9 @@
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.core.step.StepSupport;
import org.springframework.batch.item.ExecutionContext;
-import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.support.JdbcTransactionManager;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -111,6 +112,11 @@ void setUp() throws Exception {
job = new SimpleJob();
job.setJobRepository(jobRepository);
+ ObservationRegistry observationRegistry = ObservationRegistry.create();
+ observationRegistry.observationConfig()
+ .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry));
+ job.setObservationRegistry(observationRegistry);
+
step1 = new StubStep("TestStep1", jobRepository);
step1.setCallback(new Runnable() {
@Override
diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java
index a3b1020454..f90e8cb872 100644
--- a/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java
+++ b/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java
@@ -17,7 +17,6 @@
import java.time.Duration;
import java.time.LocalDateTime;
-import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
@@ -26,6 +25,8 @@
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
+import io.micrometer.observation.ObservationRegistry;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.ExitStatus;
@@ -215,46 +216,43 @@ void testBatchMetrics() throws Exception {
@Configuration
@EnableBatchProcessing
- @Import(DataSoourceConfiguration.class)
static class MyJobConfiguration {
- private PlatformTransactionManager transactionManager;
-
- public MyJobConfiguration(PlatformTransactionManager transactionManager) {
- this.transactionManager = transactionManager;
- }
-
@Bean
- public Step step1(JobRepository jobRepository) {
+ public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
- .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, this.transactionManager).build();
+ .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager).build();
}
@Bean
- public Step step2(JobRepository jobRepository) {
- return new StepBuilder("step2", jobRepository).chunk(2, this.transactionManager)
+ public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
+ return new StepBuilder("step2", jobRepository).chunk(2, transactionManager)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5)))
.writer(items -> items.forEach(System.out::println)).build();
}
@Bean
- public Step step3(JobRepository jobRepository) {
- return new StepBuilder("step3", jobRepository).chunk(2, this.transactionManager)
+ public Step step3(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
+ return new StepBuilder("step3", jobRepository).chunk(2, transactionManager)
.reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10)))
.writer(items -> items.forEach(System.out::println)).faultTolerant().skip(Exception.class)
.skipLimit(3).build();
}
@Bean
- public Job job(JobRepository jobRepository) {
- return new JobBuilder("job", jobRepository).start(step1(jobRepository)).next(step2(jobRepository))
- .next(step3(jobRepository)).build();
+ public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
+ return new JobBuilder("job", jobRepository).start(step1(jobRepository, transactionManager))
+ .next(step2(jobRepository, transactionManager)).next(step3(jobRepository, transactionManager))
+ .build();
}
- }
-
- @Configuration
- static class DataSoourceConfiguration {
+ @Bean
+ public ObservationRegistry observationRegistry() {
+ ObservationRegistry observationRegistry = ObservationRegistry.create();
+ observationRegistry.observationConfig()
+ .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry));
+ return observationRegistry;
+ }
@Bean
public DataSource dataSource() {
diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java
index 2fb627df56..f8303b23e9 100644
--- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java
+++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java
@@ -21,7 +21,9 @@
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
+import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.core.tck.MeterRegistryAssert;
+import io.micrometer.observation.ObservationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -190,6 +192,12 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
@Test
void testExecute() throws Exception {
tested.setStepExecutionListeners(new StepExecutionListener[] { listener1, listener2 });
+
+ ObservationRegistry observationRegistry = ObservationRegistry.create();
+ observationRegistry.observationConfig()
+ .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry));
+ tested.setObservationRegistry(observationRegistry);
+
tested.execute(execution);
int i = 0;
diff --git a/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java b/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java
index d605fc8323..9bfe596156 100644
--- a/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java
+++ b/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java
@@ -15,8 +15,11 @@
*/
package org.springframework.batch.test.observability;
+import javax.sql.DataSource;
+
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.core.tck.MeterRegistryAssert;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.test.SampleTestRunner;
@@ -28,13 +31,20 @@
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
-import org.springframework.batch.core.observability.BatchMetrics;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.job.builder.JobBuilder;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.StepBuilder;
+import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.batch.test.JobLauncherTestUtils;
-import org.springframework.batch.test.SpringBatchTestJUnit5Tests;
import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.springframework.jdbc.support.JdbcTransactionManager;
import static io.micrometer.tracing.test.simple.SpansAssert.assertThat;
@@ -44,6 +54,9 @@ class ObservabilitySampleStepTests extends SampleTestRunner {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
+ @Autowired
+ private ObservationRegistry observationRegistry;
+
ObservabilitySampleStepTests() {
super(SampleRunnerConfig.builder().build());
}
@@ -55,7 +68,7 @@ protected MeterRegistry createMeterRegistry() {
@Override
protected ObservationRegistry createObservationRegistry() {
- return BatchMetrics.observationRegistry;
+ return this.observationRegistry;
}
@BeforeEach
@@ -91,9 +104,40 @@ public SampleTestRunnerConsumer yourCode() {
}
@Configuration(proxyBeanMethods = false)
- @Import(SpringBatchTestJUnit5Tests.JobConfiguration.class)
+ @EnableBatchProcessing
static class TestConfig {
+ @Bean
+ public ObservationRegistry observationRegistry() {
+ ObservationRegistry observationRegistry = ObservationRegistry.create();
+ observationRegistry.observationConfig()
+ .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry));
+ return observationRegistry;
+ }
+
+ @Bean
+ public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
+ return new StepBuilder("step", jobRepository)
+ .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager).build();
+ }
+
+ @Bean
+ public Job job(JobRepository jobRepository, Step step) {
+ return new JobBuilder("job", jobRepository).start(step).build();
+ }
+
+ @Bean
+ public DataSource dataSource() {
+ return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.HSQL)
+ .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
+ .addScript("/org/springframework/batch/core/schema-hsqldb.sql").build();
+ }
+
+ @Bean
+ public JdbcTransactionManager transactionManager(DataSource dataSource) {
+ return new JdbcTransactionManager(dataSource);
+ }
+
}
}