diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchObservabilityBeanPostProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchObservabilityBeanPostProcessor.java new file mode 100644 index 0000000000..d269797323 --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchObservabilityBeanPostProcessor.java @@ -0,0 +1,65 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.configuration.annotation; + +import io.micrometer.observation.ObservationRegistry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.batch.core.job.AbstractJob; +import org.springframework.batch.core.step.AbstractStep; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; + +/** + * Bean post processor that configures observable batch artifacts (jobs and steps) with + * Micrometer's observation registry. + * + * @author Mahmoud Ben Hassine + * @since 5.0 + */ +public class BatchObservabilityBeanPostProcessor implements BeanFactoryPostProcessor, BeanPostProcessor { + + private static final Log LOGGER = LogFactory.getLog(BatchObservabilityBeanPostProcessor.class); + + private ConfigurableListableBeanFactory beanFactory; + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + try { + ObservationRegistry observationRegistry = this.beanFactory.getBean(ObservationRegistry.class); + if (bean instanceof AbstractJob) { + ((AbstractJob) bean).setObservationRegistry(observationRegistry); + } + if (bean instanceof AbstractStep) { + ((AbstractStep) bean).setObservationRegistry(observationRegistry); + } + } + catch (NoSuchBeanDefinitionException e) { + LOGGER.info("No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP"); + } + return bean; + } + +} diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java index 8259b05159..ac28ad33ba 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.java @@ -151,7 +151,8 @@ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented -@Import({ BatchRegistrar.class, ScopeConfiguration.class, AutomaticJobRegistrarBeanPostProcessor.class }) +@Import({ BatchRegistrar.class, ScopeConfiguration.class, AutomaticJobRegistrarBeanPostProcessor.class, + BatchObservabilityBeanPostProcessor.class }) public @interface EnableBatchProcessing { /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java index 4df2422c06..2d69bc9c61 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java @@ -22,8 +22,11 @@ import java.util.stream.Collectors; import io.micrometer.core.instrument.LongTaskTimer; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,6 +90,10 @@ public abstract class AbstractJob implements Job, StepLocator, BeanNameAware, In private StepHandler stepHandler; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + + private MeterRegistry meterRegistry = Metrics.globalRegistry; + private BatchJobObservationConvention observationConvention = new DefaultBatchJobObservationConvention(); /** @@ -285,11 +292,13 @@ public final void execute(JobExecution execution) { JobSynchronizationManager.register(execution); String activeJobMeterName = "job.active"; - LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer(activeJobMeterName, "Active jobs", Tag.of( - BatchMetrics.METRICS_PREFIX + activeJobMeterName + ".name", execution.getJobInstance().getJobName())); + LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer(this.meterRegistry, activeJobMeterName, + "Active jobs", Tag.of(BatchMetrics.METRICS_PREFIX + activeJobMeterName + ".name", + execution.getJobInstance().getJobName())); LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start(); Observation observation = BatchMetrics - .createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName(), new BatchJobContext(execution)) + .createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName(), new BatchJobContext(execution), + this.observationRegistry) .contextualName(execution.getJobInstance().getJobName()) .observationConvention(this.observationConvention).start(); try (Observation.Scope scope = observation.openScope()) { @@ -439,6 +448,14 @@ public void setObservationConvention(BatchJobObservationConvention observationCo this.observationConvention = observationConvention; } + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + public void setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + @Override public String toString() { return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]"; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java index 3a6651a26c..9f200da10a 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/JobBuilderHelper.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2020 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,11 @@ import java.util.List; import java.util.Set; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.JobParametersIncrementer; @@ -99,6 +102,30 @@ public B repository(JobRepository jobRepository) { return result; } + /** + * Sets the observation registry for the job. + * @param observationRegistry the observation registry (optional) + * @return this to enable fluent chaining + */ + public B observationRegistry(ObservationRegistry observationRegistry) { + properties.observationRegistry = observationRegistry; + @SuppressWarnings("unchecked") + B result = (B) this; + return result; + } + + /** + * Sets the meter registry for the job. + * @param meterRegistry the meter registry (optional) + * @return this to enable fluent chaining + */ + public B meterRegistry(MeterRegistry meterRegistry) { + properties.meterRegistry = meterRegistry; + @SuppressWarnings("unchecked") + B result = (B) this; + return result; + } + /** * Registers objects using the annotation based listener configuration. * @param listener the object that has a method configured with listener annotation @@ -170,6 +197,14 @@ protected void enhance(Job target) { if (jobParametersValidator != null) { job.setJobParametersValidator(jobParametersValidator); } + ObservationRegistry observationRegistry = properties.getObservationRegistry(); + if (observationRegistry != null) { + job.setObservationRegistry(observationRegistry); + } + MeterRegistry meterRegistry = properties.getMeterRegistry(); + if (meterRegistry != null) { + job.setMeterRegistry(meterRegistry); + } Boolean restartable = properties.getRestartable(); if (restartable != null) { @@ -193,6 +228,10 @@ public static class CommonJobProperties { private JobRepository jobRepository; + private ObservationRegistry observationRegistry; + + private MeterRegistry meterRegistry; + private JobParametersIncrementer jobParametersIncrementer; private JobParametersValidator jobParametersValidator; @@ -204,6 +243,8 @@ public CommonJobProperties(CommonJobProperties properties) { this.name = properties.name; this.restartable = properties.restartable; this.jobRepository = properties.jobRepository; + this.observationRegistry = properties.observationRegistry; + this.meterRegistry = properties.meterRegistry; this.jobExecutionListeners = new LinkedHashSet<>(properties.jobExecutionListeners); this.jobParametersIncrementer = properties.jobParametersIncrementer; this.jobParametersValidator = properties.jobParametersValidator; @@ -233,6 +274,22 @@ public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } + public ObservationRegistry getObservationRegistry() { + return observationRegistry; + } + + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + + public void setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + public String getName() { return name; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java index 93c2a63f4c..f0a22a7393 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import io.micrometer.core.instrument.LongTaskTimer; -import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; @@ -34,8 +34,8 @@ * Central class for batch metrics. It provides: * * @@ -54,32 +54,21 @@ public final class BatchMetrics { public static final String STATUS_FAILURE = "FAILURE"; - /** - * Global {@link ObservationRegistry}. A {@link DefaultMeterObservationHandler} is - * attached to create a {@link Timer} for every finished {@link Observation}. - */ - public static ObservationRegistry observationRegistry; - - static { - observationRegistry = ObservationRegistry.create(); - observationRegistry.observationConfig() - .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry)); - } - private BatchMetrics() { } /** * Create a {@link Timer}. + * @param meterRegistry the meter registry to use * @param name of the timer. Will be prefixed with * {@link BatchMetrics#METRICS_PREFIX}. * @param description of the timer * @param tags of the timer * @return a new timer instance */ - public static Timer createTimer(String name, String description, Tag... tags) { + public static Timer createTimer(MeterRegistry meterRegistry, String name, String description, Tag... tags) { return Timer.builder(METRICS_PREFIX + name).description(description).tags(Arrays.asList(tags)) - .register(Metrics.globalRegistry); + .register(meterRegistry); } /** @@ -94,7 +83,8 @@ public static Timer createTimer(String name, String description, Tag... tags) { * @return a new observation instance * @since 5.0 */ - public static Observation createObservation(String name, BatchJobContext context) { + public static Observation createObservation(String name, BatchJobContext context, + ObservationRegistry observationRegistry) { return Observation.createNotStarted(name, context, observationRegistry); } @@ -110,29 +100,33 @@ public static Observation createObservation(String name, BatchJobContext context * @return a new observation instance * @since 5.0 */ - public static Observation createObservation(String name, BatchStepContext context) { + public static Observation createObservation(String name, BatchStepContext context, + ObservationRegistry observationRegistry) { return Observation.createNotStarted(name, context, observationRegistry); } /** * Create a new {@link Timer.Sample}. + * @param meterRegistry the meter registry to use * @return a new timer sample instance */ - public static Timer.Sample createTimerSample() { - return Timer.start(Metrics.globalRegistry); + public static Timer.Sample createTimerSample(MeterRegistry meterRegistry) { + return Timer.start(meterRegistry); } /** * Create a new {@link LongTaskTimer}. + * @param meterRegistry the meter registry to use * @param name of the long task timer. Will be prefixed with * {@link BatchMetrics#METRICS_PREFIX}. * @param description of the long task timer. * @param tags of the timer * @return a new long task timer instance */ - public static LongTaskTimer createLongTaskTimer(String name, String description, Tag... tags) { + public static LongTaskTimer createLongTaskTimer(MeterRegistry meterRegistry, String name, String description, + Tag... tags) { return LongTaskTimer.builder(METRICS_PREFIX + name).description(description).tags(Arrays.asList(tags)) - .register(Metrics.globalRegistry); + .register(meterRegistry); } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java index 92aecda112..162c1978bc 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java @@ -20,7 +20,10 @@ import java.util.List; import java.util.stream.Collectors; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,6 +78,10 @@ public abstract class AbstractStep implements Step, InitializingBean, BeanNameAw private JobRepository jobRepository; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + + private MeterRegistry meterRegistry = Metrics.globalRegistry; + private BatchStepObservationConvention observationConvention = new DefaultBatchStepObservationConvention(); /** @@ -203,7 +210,7 @@ public final void execute(StepExecution stepExecution) stepExecution.setStatus(BatchStatus.STARTED); Observation observation = BatchMetrics .createObservation(BatchStepObservation.BATCH_STEP_OBSERVATION.getName(), - new BatchStepContext(stepExecution)) + new BatchStepContext(stepExecution), this.observationRegistry) .contextualName(stepExecution.getStepName()).observationConvention(this.observationConvention).start(); getJobRepository().update(stepExecution); @@ -424,4 +431,12 @@ public void setObservationConvention(BatchStepObservationConvention observationC this.observationConvention = observationConvention; } + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + public void setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java index ab86a7c15c..01b71c9345 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java @@ -21,6 +21,9 @@ import java.util.LinkedHashSet; import java.util.Set; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; + import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.ItemProcessListener; import org.springframework.batch.core.ItemReadListener; @@ -85,6 +88,8 @@ public class SimpleStepBuilder extends AbstractTaskletStepBuilder parent) { this.processor = parent.processor; this.itemListeners = parent.itemListeners; this.readerTransactionalQueue = parent.readerTransactionalQueue; + this.meterRegistry = parent.meterRegistry; this.transactionManager(parent.getTransactionManager()); } @@ -159,7 +165,9 @@ protected Tasklet createTasklet() { SimpleChunkProvider chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations); SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter()); chunkProvider.setListeners(new ArrayList<>(itemListeners)); + chunkProvider.setMeterRegistry(this.meterRegistry); chunkProcessor.setListeners(new ArrayList<>(itemListeners)); + chunkProcessor.setMeterRegistry(this.meterRegistry); ChunkOrientedTasklet tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor); tasklet.setBuffering(!readerTransactionalQueue); return tasklet; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java index 1d2c05cdf6..3b0a3bba7a 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilderHelper.java @@ -15,8 +15,18 @@ */ package org.springframework.batch.core.step.builder; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.annotation.AfterStep; @@ -24,15 +34,7 @@ import org.springframework.batch.core.listener.StepListenerFactoryBean; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.AbstractStep; -import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.support.ReflectionUtils; -import org.springframework.transaction.PlatformTransactionManager; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; /** * A base class and utility for other step builders providing access to common properties @@ -68,6 +70,16 @@ public B repository(JobRepository jobRepository) { return self(); } + public B observationRegistry(ObservationRegistry observationRegistry) { + properties.observationRegistry = observationRegistry; + return self(); + } + + public B meterRegistry(MeterRegistry meterRegistry) { + properties.meterRegistry = meterRegistry; + return self(); + } + public B startLimit(int startLimit) { properties.startLimit = startLimit; return self(); @@ -123,6 +135,16 @@ protected void enhance(Step target) { AbstractStep step = (AbstractStep) target; step.setJobRepository(properties.getJobRepository()); + ObservationRegistry observationRegistry = properties.getObservationRegistry(); + if (observationRegistry != null) { + step.setObservationRegistry(observationRegistry); + } + + MeterRegistry meterRegistry = properties.getMeterRegistry(); + if (meterRegistry != null) { + step.setMeterRegistry(meterRegistry); + } + Boolean allowStartIfComplete = properties.allowStartIfComplete; if (allowStartIfComplete != null) { step.setAllowStartIfComplete(allowStartIfComplete); @@ -149,6 +171,10 @@ public static class CommonStepProperties { private JobRepository jobRepository; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + + private MeterRegistry meterRegistry = Metrics.globalRegistry; + public CommonStepProperties() { } @@ -157,6 +183,8 @@ public CommonStepProperties(CommonStepProperties properties) { this.startLimit = properties.startLimit; this.allowStartIfComplete = properties.allowStartIfComplete; this.jobRepository = properties.jobRepository; + this.observationRegistry = properties.observationRegistry; + this.meterRegistry = properties.meterRegistry; this.stepExecutionListeners = new ArrayList<>(properties.stepExecutionListeners); } @@ -168,6 +196,22 @@ public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } + public ObservationRegistry getObservationRegistry() { + return observationRegistry; + } + + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + + public void setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + public String getName() { return name; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java index 86c984bf81..144dc7c5bb 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java @@ -15,6 +15,7 @@ */ package org.springframework.batch.core.step.factory; +import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.ChunkListener; @@ -84,6 +85,8 @@ public class SimpleStepFactoryBean implements FactoryBean, BeanNameA protected JobRepository jobRepository; + protected ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + private boolean singleton = true; private ItemStream[] streams = new ItemStream[0]; @@ -270,6 +273,15 @@ public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } + /** + * Public setter for {@link ObservationRegistry}. + * @param observationRegistry is an optional dependency (defaults to + * {@link ObservationRegistry#NOOP}). + */ + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + /** * Public setter for the {@link PlatformTransactionManager}. * @param transactionManager the transaction manager to set @@ -469,6 +481,7 @@ protected void applyConfiguration(SimpleStepBuilder builder) { builder.transactionManager(transactionManager); builder.transactionAttribute(getTransactionAttribute()); builder.repository(jobRepository); + builder.observationRegistry(observationRegistry); builder.startLimit(startLimit); builder.allowStartIfComplete(allowStartIfComplete); builder.chunk(commitInterval); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java index 8214002097..0a77271190 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java @@ -17,7 +17,6 @@ package org.springframework.batch.core.step.item; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -220,7 +219,7 @@ protected Chunk transform(final StepContribution contribution, Chunk input @Override public O doWithRetry(RetryContext context) throws Exception { - Timer.Sample sample = BatchMetrics.createTimerSample(); + Timer.Sample sample = BatchMetrics.createTimerSample(meterRegistry); String status = BatchMetrics.STATUS_SUCCESS; O output = null; try { @@ -336,7 +335,7 @@ public Object doWithRetry(RetryContext context) throws Exception { if (!data.scanning()) { chunkMonitor.setChunkSize(inputs.size()); - Timer.Sample sample = BatchMetrics.createTimerSample(); + Timer.Sample sample = BatchMetrics.createTimerSample(meterRegistry); String status = BatchMetrics.STATUS_SUCCESS; try { doWrite(outputs); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java index 193cf56810..f8d908bff1 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java @@ -18,6 +18,8 @@ import java.util.List; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; @@ -47,6 +49,8 @@ public class SimpleChunkProcessor implements ChunkProcessor, Initializi private final MulticasterBatchListener listener = new MulticasterBatchListener<>(); + protected MeterRegistry meterRegistry = Metrics.globalRegistry; + /** * Default constructor for ease of configuration. */ @@ -79,6 +83,10 @@ public void setItemWriter(ItemWriter itemWriter) { this.itemWriter = itemWriter; } + public void setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + /** * Check mandatory properties. * @@ -278,7 +286,7 @@ protected Chunk getAdjustedOutputs(Chunk inputs, Chunk outputs) { * @throws Exception if there is a problem */ protected void write(StepContribution contribution, Chunk inputs, Chunk outputs) throws Exception { - Timer.Sample sample = BatchMetrics.createTimerSample(); + Timer.Sample sample = BatchMetrics.createTimerSample(this.meterRegistry); String status = BatchMetrics.STATUS_SUCCESS; try { doWrite(outputs); @@ -303,7 +311,7 @@ protected Chunk transform(StepContribution contribution, Chunk inputs) thr for (Chunk.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { final I item = iterator.next(); O output; - Timer.Sample sample = BatchMetrics.createTimerSample(); + Timer.Sample sample = BatchMetrics.createTimerSample(this.meterRegistry); String status = BatchMetrics.STATUS_SUCCESS; try { output = doProcess(item); @@ -333,7 +341,7 @@ protected Chunk transform(StepContribution contribution, Chunk inputs) thr protected void stopTimer(Timer.Sample sample, StepExecution stepExecution, String metricName, String status, String description) { String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + metricName; - sample.stop(BatchMetrics.createTimer(metricName, description + " duration", + sample.stop(BatchMetrics.createTimer(this.meterRegistry, metricName, description + " duration", Tag.of(fullyQualifiedMetricName + ".job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), Tag.of(fullyQualifiedMetricName + ".step.name", stepExecution.getStepName()), diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java index 8cb63e60c5..8253c27538 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java @@ -18,6 +18,7 @@ import java.util.List; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; @@ -55,6 +56,8 @@ public class SimpleChunkProvider implements ChunkProvider { private final RepeatOperations repeatOperations; + private MeterRegistry meterRegistry = Metrics.globalRegistry; + public SimpleChunkProvider(ItemReader itemReader, RepeatOperations repeatOperations) { this.itemReader = itemReader; this.repeatOperations = repeatOperations; @@ -71,6 +74,10 @@ public void setListeners(List listeners) { } } + public void setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + /** * Register a listener for callbacks at the appropriate stages in a process. * @param listener a {@link StepListener} @@ -150,7 +157,7 @@ public RepeatStatus doInIteration(final RepeatContext context) throws Exception private void stopTimer(Timer.Sample sample, StepExecution stepExecution, String status) { String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + "item.read"; - sample.stop(BatchMetrics.createTimer("item.read", "Item reading duration", + sample.stop(BatchMetrics.createTimer(this.meterRegistry, "item.read", "Item reading duration", Tag.of(fullyQualifiedMetricName + ".job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), Tag.of(fullyQualifiedMetricName + ".step.name", stepExecution.getStepName()), diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java index c6665b7587..be24fd9be8 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/SimpleJobTests.java @@ -26,7 +26,9 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationRegistry; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,7 +36,6 @@ import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobInterruptedException; @@ -49,9 +50,9 @@ import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.core.step.StepSupport; import org.springframework.batch.item.ExecutionContext; -import org.springframework.jdbc.support.JdbcTransactionManager; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.support.JdbcTransactionManager; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -111,6 +112,11 @@ void setUp() throws Exception { job = new SimpleJob(); job.setJobRepository(jobRepository); + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry)); + job.setObservationRegistry(observationRegistry); + step1 = new StubStep("TestStep1", jobRepository); step1.setCallback(new Runnable() { @Override diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java index a3b1020454..f90e8cb872 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java @@ -17,7 +17,6 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.List; @@ -26,6 +25,8 @@ import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.observation.ObservationRegistry; import org.junit.jupiter.api.Test; import org.springframework.batch.core.ExitStatus; @@ -215,46 +216,43 @@ void testBatchMetrics() throws Exception { @Configuration @EnableBatchProcessing - @Import(DataSoourceConfiguration.class) static class MyJobConfiguration { - private PlatformTransactionManager transactionManager; - - public MyJobConfiguration(PlatformTransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - @Bean - public Step step1(JobRepository jobRepository) { + public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) - .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, this.transactionManager).build(); + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager).build(); } @Bean - public Step step2(JobRepository jobRepository) { - return new StepBuilder("step2", jobRepository).chunk(2, this.transactionManager) + public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + return new StepBuilder("step2", jobRepository).chunk(2, transactionManager) .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5))) .writer(items -> items.forEach(System.out::println)).build(); } @Bean - public Step step3(JobRepository jobRepository) { - return new StepBuilder("step3", jobRepository).chunk(2, this.transactionManager) + public Step step3(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + return new StepBuilder("step3", jobRepository).chunk(2, transactionManager) .reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10))) .writer(items -> items.forEach(System.out::println)).faultTolerant().skip(Exception.class) .skipLimit(3).build(); } @Bean - public Job job(JobRepository jobRepository) { - return new JobBuilder("job", jobRepository).start(step1(jobRepository)).next(step2(jobRepository)) - .next(step3(jobRepository)).build(); + public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + return new JobBuilder("job", jobRepository).start(step1(jobRepository, transactionManager)) + .next(step2(jobRepository, transactionManager)).next(step3(jobRepository, transactionManager)) + .build(); } - } - - @Configuration - static class DataSoourceConfiguration { + @Bean + public ObservationRegistry observationRegistry() { + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry)); + return observationRegistry; + } @Bean public DataSource dataSource() { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java index 2fb627df56..f8303b23e9 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/NonAbstractStepTests.java @@ -21,7 +21,9 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationRegistry; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -190,6 +192,12 @@ protected void doExecute(StepExecution stepExecution) throws Exception { @Test void testExecute() throws Exception { tested.setStepExecutionListeners(new StepExecutionListener[] { listener1, listener2 }); + + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry)); + tested.setObservationRegistry(observationRegistry); + tested.execute(execution); int i = 0; diff --git a/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java b/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java index d605fc8323..9bfe596156 100644 --- a/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java +++ b/spring-batch-test/src/test/java/org/springframework/batch/test/observability/ObservabilitySampleStepTests.java @@ -15,8 +15,11 @@ */ package org.springframework.batch.test.observability; +import javax.sql.DataSource; + import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; import io.micrometer.core.tck.MeterRegistryAssert; import io.micrometer.observation.ObservationRegistry; import io.micrometer.tracing.test.SampleTestRunner; @@ -28,13 +31,20 @@ import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.observability.BatchMetrics; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.batch.test.JobLauncherTestUtils; -import org.springframework.batch.test.SpringBatchTestJUnit5Tests; import org.springframework.batch.test.context.SpringBatchTest; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.jdbc.support.JdbcTransactionManager; import static io.micrometer.tracing.test.simple.SpansAssert.assertThat; @@ -44,6 +54,9 @@ class ObservabilitySampleStepTests extends SampleTestRunner { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; + @Autowired + private ObservationRegistry observationRegistry; + ObservabilitySampleStepTests() { super(SampleRunnerConfig.builder().build()); } @@ -55,7 +68,7 @@ protected MeterRegistry createMeterRegistry() { @Override protected ObservationRegistry createObservationRegistry() { - return BatchMetrics.observationRegistry; + return this.observationRegistry; } @BeforeEach @@ -91,9 +104,40 @@ public SampleTestRunnerConsumer yourCode() { } @Configuration(proxyBeanMethods = false) - @Import(SpringBatchTestJUnit5Tests.JobConfiguration.class) + @EnableBatchProcessing static class TestConfig { + @Bean + public ObservationRegistry observationRegistry() { + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler(new DefaultMeterObservationHandler(Metrics.globalRegistry)); + return observationRegistry; + } + + @Bean + public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager) { + return new StepBuilder("step", jobRepository) + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager).build(); + } + + @Bean + public Job job(JobRepository jobRepository, Step step) { + return new JobBuilder("job", jobRepository).start(step).build(); + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.HSQL) + .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("/org/springframework/batch/core/schema-hsqldb.sql").build(); + } + + @Bean + public JdbcTransactionManager transactionManager(DataSource dataSource) { + return new JdbcTransactionManager(dataSource); + } + } }