Skip to content

ItemStream is not recognized on scoped steps #4026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Sam-Kruglov opened this issue Nov 19, 2021 · 12 comments
Closed

ItemStream is not recognized on scoped steps #4026

Sam-Kruglov opened this issue Nov 19, 2021 · 12 comments
Labels
status: declined Features that we don't intend to implement or Bug reports that are invalid or missing enough details

Comments

@Sam-Kruglov
Copy link

Sam-Kruglov commented Nov 19, 2021

Bug description
If I declare a @StepScope @Bean method with return type JsonItemReader, it will get wrapped in a lazy proxy. In this case, instanceof ItemStream returns true and open() is called.

Screenshot 2021-11-19 at 18 30 05

If I declare a @StepScope @Bean method with return type ItemReader, it will get wrapped in a lazy proxy. In this case, instanceof ItemStream returns false and open() is not called.

Screenshot 2021-11-19 at 18 29 01

Code reference:

protected void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader,
ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) {
if (itemHandler instanceof ItemStream) {
stream((ItemStream) itemHandler);
}
if (StepListenerFactoryBean.isListener(itemHandler)) {

The proxy is created due to @StepScope annotation. The whole bean method gets turned into a lazy object that is only evaluated during the job run, so that it is able to inject job parameters into it. So, there is no way for the job to know in advance the exact type of the final object. The solution would be to inspect every step during job runtime and see if it's an instance of ItemStream or not.

Environment
Spring Batch 4.3.3
Spring Boot 2.5.0
Java 11

Minimal Complete Reproducible example

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;

import java.util.List;
import java.util.Map;

@Configuration
@EnableBatchProcessing
public class Example {

    public static final String SRC_ENTITIES_JSON = "srcEntitiesJson";

    @Autowired private JobBuilderFactory jobs;
    @Autowired private StepBuilderFactory steps;

    @Bean
    public Job listenerPropagationJob(TaskletStep step) {
        return jobs.get("job")
                .start(step)
                .build();
    }

    @Bean
    public TaskletStep step(ItemReader<Map<String, Object>> srcEntityReader) {
        return steps.get("step")
                .<Map<String, Object>, Map<String, Object>>chunk(1)
                .reader(srcEntityReader)
                .writer(srcEntities -> srcEntities.forEach(srcEntity -> System.out.println("writing " + srcEntity)))
                .build();
    }

    @StepScope
    @Bean
    @SuppressWarnings("unchecked")
    // fixme this will work correctly only if return type is JsonItemReader<Map<String, Object>>
    // see org.springframework.batch.core.step.builder.SimpleStepBuilder.registerAsStreamsAndListeners
    public ItemReader<Map<String, Object>> srcEntityReader(
            ObjectMapper mapper,
            @Value("#{jobParameters[" + SRC_ENTITIES_JSON + "]}") String json
    ) {
        return new JsonItemReaderBuilder<Map<String, Object>>()
                .jsonObjectReader(new JacksonJsonObjectReader<>(mapper, (Class<Map<String, Object>>) (Class<?>) Map.class))
                .resource(new ByteArrayResource(json.getBytes()))
                .name("entityJsonReader")
                .build();
    }

    public static void main(String[] args) throws Exception {
        var context = new AnnotationConfigApplicationContext(Example.class, JacksonAutoConfiguration.class);
        var mapper = context.getBean(ObjectMapper.class);
        var launcher = context.getBean(JobLauncher.class);
        var json = mapper.writeValueAsString(List.of(Map.of("field", "value")));
        var parameters = new JobParametersBuilder()
                .addString(SRC_ENTITIES_JSON, json, false)
                .toJobParameters();
        launcher.run(context.getBean(Job.class), parameters);
    }
}
@Sam-Kruglov Sam-Kruglov added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Nov 19, 2021
@Sam-Kruglov Sam-Kruglov changed the title ItemStream is not recognized if wrapped in JDK proxy ItemStream is not recognized on scoped beans Nov 19, 2021
@Sam-Kruglov Sam-Kruglov changed the title ItemStream is not recognized on scoped beans ItemStream is not recognized on scoped steps Nov 19, 2021
@Sam-Kruglov
Copy link
Author

My usecase:

  1. I poll a bunch of messages. I couldn't find a way to poll from spring batch, so I poll via quartz and on each poll I start/restart the same job instance (I'm polling multiple sources with different clients, each poll has its own job instance).
  2. I want to make the processing robust, so I serialize the messages back to JSON and persist into job parameters as String (I changed SQL schema to hold big text there instead of just varchar(255)).
  3. But I don't want every step of the job to deserialize it all over again, so I thought of creating a ThreadLocal<List<Map<String, Object>> to hold all the messages in deserialized form as a sort of deserialization cache.
  4. I need to conditionally supply JsonItemReader or ListItemReader depending on the thread local variable. So, the nearest common superclass is ItermReader, which is what I return from the @Bean method.

UPD: So far I actually only have one step, so, for now, I decided to just drop the thread local, maybe it's not such a big deal since I'm reading the string from memory. But it might be useful if I get hundreds of messages and multiple steps. For now, just returning JsonItemReader.

@mdeinum
Copy link
Contributor

mdeinum commented Nov 23, 2021

The problem is that the proxy is created very early in the process by Spring (not Spring Batch) and if you want to figure the type out you would need to instantiate the beans which is something you want to prevent. Now especially in your case where you want to switch this might lead to instantiating multiple beans which you don't want.

Next it isn't possible as well as the Step is a singleton which only has a reference to the proxy at that time and needs to register the callbacks.

What you could do is create an ItemStreamReader implemention that delegates to which instance you want and conditionally check if the delegate is an ItemStream and if so propagate the call else make it a no-op if you want to switch. That way you have the correct proxy created and can place the logic to switch in that specific implementation.

Or find another way to kick-off the batch job as i'm not sure using a ThreadLocal or JobParameters is the best way to process this. You might want to have a look at Spring Integration which could do the polling and put messages on an (internal) channel which you could then read with an ItemReader (you could use JMS, Kafka, RabbitMQ etc.).

@Sam-Kruglov
Copy link
Author

Sam-Kruglov commented Nov 24, 2021

Thanks for the suggestions, I'll try them out for sure! About no-op, I even thought of myself. I'll take a look if Spring Integration can persist incoming messages.

I figured out how to resolve the real type. With this code, the example works:

@Bean
public TaskletStep step(
        ItemReader<Map<String, Object>> srcEntityReader,
        ObjectProvider<TaskletStep> stepProvider
) {
    return steps.get("step")
            .<Map<String, Object>, Map<String, Object>>chunk(1)
            .reader(srcEntityReader)
            .writer(srcEntities -> srcEntities.forEach(srcEntity -> System.out.println("writing " + srcEntity)))
            .listener(new StepExecutionListener() {
                @Override
                public void beforeStep(StepExecution stepExecution) {
                    if (srcEntityReader instanceof Advised) {
                        var realReader = ((Advised) srcEntityReader).getTargetSource().getTarget(); //try catch for getTarget
                        if (realReader instanceof ItemStream) {
                            stepProvider.getObject().registerStream(((ItemStream) realReader));
                        }
                    }
                }

                @Override
                public ExitStatus afterStep(StepExecution stepExecution) {
                    return null;
                }
            })
            .build();
}

getTarget will just delegate to getBean method, so it's fine to call twice.

@mdeinum
Copy link
Contributor

mdeinum commented Nov 24, 2021

With one problem that the scoped proxy is initialized early on, which wouldn't allow for changing the reader from different executions (unless you fully restart the job).

@Sam-Kruglov
Copy link
Author

I'm fairly new to spring batch, can you say a couple of keywords I can search about changing the reader from executions? Can't find anything about it.

@mdeinum
Copy link
Contributor

mdeinum commented Nov 24, 2021

In your own question you state that you switch the reader based on a ThreadLocal variable. Also judging from your own code you are using @StepScope. So how/when you switch is already there (judging by your own question). A job configuration is loaded once and your Step is a singleton definition, whereas your reader is @StepScope so a new one will be created per run of this job. So if you keep your app running and launch a job multiple times this code won't even work (as the actual bean differs from each run) or you could switch from reader A to reader B (as you said yourself) in between runs.

So it doesn't have to be a problem if you are reloading the configuration for each launch of the job, however if this configuration is part of a larger application that keeps running you do have an issue.

@Sam-Kruglov
Copy link
Author

Sam-Kruglov commented Nov 24, 2021

Right, makes sense. So, the step should be @JobScope also, so it's different for each run. That should solve it. I already have it scoped in my real code as I'm using the job parameters inside my processor.

@Sam-Kruglov
Copy link
Author

Sam-Kruglov commented Nov 24, 2021

In general case though, I think we can assume people don't supply dynamic readers, so this listener logic can still help if user declared JsonItemReader as ItemReader bean. A note can be added to the docs saying for dynamic readers you must use @JobScope on the step.

And it can also help remove this log that I currently see:

o.s.b.c.l.AbstractListenerFactoryBean    : org.springframework.batch.item.ItemReader is an interface. The implementing class will not be queried for annotation based listener configurations. If using @StepScope on a @Bean method, be sure to return the implementing class so listener annotations can be used.

@Sam-Kruglov
Copy link
Author

Sam-Kruglov commented Nov 25, 2021

Off-topic, about the suggestions.

I actually can not only get data by polling an external system but also by accepting an HTTP request from an external system, for which it will hang until the job is complete and give the calculated response. The job should be done in under 3-5 seconds (a 3rd party backend will wait, not human). So, I think introducing an internal channel for incoming data will make responding to HTTP request more complex than just calling JobLauncher.

I already wrote a second step in the job that relies on the resulting data written by the previous step. According to the official docs, I'm supposed to save it into JobExecutionContext at the end of the first step and then read it from there again at the start of the second step. Wouldn't that approach be pretty much the same as I did with persisting to JobParameters instead? It's just I call ObjectMapper explicitly for injecting as a String to JobParameters, while Spring Batch does the same call for the execution context under the hood.

I think for the sake of consistency, it's either I'm passing data through execution context and job parameters, or I'm introducing 2 internal channels for passing stuff between steps and for starting the job. I think not involving channels will make the code simpler.

@fmbenhassine
Copy link
Contributor

@Sam-Kruglov Based on your comment here: #4026 (comment), should we consider this issue to be solved?

fixme this will work correctly only if return type is JsonItemReader<Map<String, Object>>

I just wanted to add that the proxy is created based on the return type of the bean definition method, so the return type should be at least an ItemStream. The documentation was updated in #1502, see https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#scoping-item-streams

Off-topic, about the suggestions.

For support and off-topic discussions, please use StackOverflow. Thank you.

@fmbenhassine fmbenhassine added status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter and removed status: waiting-for-triage Issues that we did not analyse yet labels Jan 17, 2023
@Sam-Kruglov
Copy link
Author

@fmbenhassine thanks for coming back to this!

  1. Can we add that beforeStep logic to spring batch so that it would be recognized as a stream reader even if I declare the bean as an item reader? If you look at my first message I return different readers based on if threadlocal variable is set or not, if it is I read from there, if not I read from db (in example I read from job params but see point 2 below)
  2. I ended up splitting this into 2 steps. First step always reads from threadlocal to save each message into database. Second step always reads from the database and persists its result there too. Basically all steps read from db and save to db except the first one. I have one job bean and I launch it on every poll with run id in params so it has a unique instance. I think the point 1 could be used to pass results between steps via threadlocal to skip querying the database and fallback to doing it if threadlocal is not set (if we’re restarting the job from last step)

@fmbenhassine fmbenhassine added status: feedback-provided Issues for which the feedback requested from the reporter was provided and removed status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter labels Apr 25, 2023
@fmbenhassine
Copy link
Contributor

Thank you for your feedback.

Can we add that #4026 (comment) to spring batch so that it would be recognized as a stream reader even if I declare the bean as an item reader?

I don't think we need to make Spring Batch extract the advised target from the proxy and register it as a stream, just for the user to not adjust the return type of the bean definition method as documented. As you mentioned For now, just returning JsonItemReader., that is the right thing to do.

I ended up splitting this into 2 steps

Re-reading your use case, I see no need to use ThreadLocals, and using two steps is a better idea IMO. The option of trying to do everything in one step is what makes things difficult, and I would always recommend to split the logic in small, well defined steps that do one thing and do it well.

Closing this issue since the proxy is registered as expected when the return type of the bean definition method is defined as documented.

@fmbenhassine fmbenhassine closed this as not planned Won't fix, can't repro, duplicate, stale Apr 25, 2023
@fmbenhassine fmbenhassine added status: declined Features that we don't intend to implement or Bug reports that are invalid or missing enough details and removed type: bug status: feedback-provided Issues for which the feedback requested from the reporter was provided labels Apr 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: declined Features that we don't intend to implement or Bug reports that are invalid or missing enough details
Projects
None yet
Development

No branches or pull requests

3 participants