Skip to content

Compute New Seek Position From Current Offset #3078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
eblocha opened this issue Feb 28, 2024 · 10 comments · Fixed by #3099
Closed

Compute New Seek Position From Current Offset #3078

eblocha opened this issue Feb 28, 2024 · 10 comments · Fixed by #3099

Comments

@eblocha
Copy link

eblocha commented Feb 28, 2024

Expected Behavior

I have a use-case where I only want to seek to a specified offset if it is before the consumer's current offset. Ideally I would like to have a method on ConsumerSeekCallback where I would provide a Function<Long, Long> that computes the new offset given the current one.

Current Behavior

Currently the framework only provides a way to seek to a specific offset unconditionally, or provide a relative offset from the current one. There is no way to access the current offset value when performing a seek.

Context

For my use-case, there are certain events that require re-processing records that were already processed before. However, I should not seek forward in the Kafka partition, because the partition may contain other records that are unrelated to the event and shouldn't be skipped (sorry if this is a bit vague).

So, I need to know what the current partition offset is before I decide to actually seek from the event.

In order to try to accomplish this, I have the Kafka listener extend AbstractConsumerSeekAware, then use the methods from that to perform the seek when the event occurs.

I have considered using the onIdleContainer method to store the current offsets, but reading the source code it seems that this is only called after a certain time has elapsed, so these offsets are not guaranteed to be up-to-date by the time I decide to seek.

I have also considered using the KafkaListenerEndpointRegistry, however this only allows pausing/resuming/stopping containers, and does not provide access to the underlying Consumer.

@sobychacko
Copy link
Contributor

sobychacko commented Feb 29, 2024

@eblocha, I want to understand this use case a bit more. The seekRelative method allows you to seek relatively from the current offset by setting toCurrent to be true. See the javadocs.

/**
 * Perform a seek relative to the start, end, or current position. When called 
 * from {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)}
 * or from {@link ConsumerSeekAware#onIdleContainer(Map, ConsumerSeekCallback)}
 * perform the seek immediately on the consumer. When called from elsewhere, queue
 * the seek operation. The queued seek will occur after any pending offset
 * commits. The consumer must be currently assigned the specified partition.
 * @param topic the topic.
 * @param partition the partition.
 * @param offset the offset; positive values are relative to the start, negative
 * values are relative to the end, unless toCurrent is true.
 * @param toCurrent true for the offset to be relative to the current position
 * rather than the beginning or end.
 * @since 2.3
 */
void seekRelative(String topic, int partition, long offset, boolean toCurrent);

When you are setting toCurrent to true, you are, in effect, getting access to the current offset (not the application directly, but the framework internally goes to the current offset and then does a relative seek). Since the framework does that already, what benefit will it gain by the application having access to the current offset? Please elaborate more if possible. We are open to making any API changes if there is room for improvement or a need. Thanks!

@eblocha
Copy link
Author

eblocha commented Mar 1, 2024

@sobychacko Thanks for the response! With seekRelative, it requires the application to know the relative offset, but I only know the absolute offset. In order to get a relative one, I'd need to access the current offset. I also only want to seek if the current offset is ahead of the target offset.

For a bit more context, the topic contains change-data-capture records from a database, and the events mark when a snapshot was completed for a table (the snapshot data is not sent to the topic). The producer is paused during the snapshot, and saves metadata containing the last offset/partition for the table data before restarting the producer.

I can initialize data or recover from failures by consuming the last taken snapshot, then seeking the consumer back to when it was taken. If I am doing this for multiple tables whose snapshots may have been taken at different times/offsets, I don't want to jump forward in the partition after consuming another snapshot for a different table.

topic1, partition1 ---------|-------------------------|------
                            ^                         ^
                      snapshot, table_a         snapshot, table_b

Rough diagram above. If a consumer reads snapshot for table_a, then snapshot for table_b, I don't want to seek forward to the position table_b was taken on, because I may skip changes that happened to table_a after the snapshot.

@sobychacko
Copy link
Contributor

If I understand it correctly, you don't have info about the relative offset to seek to, but only an absolute offset (possibly from metadata you save?) How is this absolute offset different from the target offset? I think we should explore it more. If we can provide a new API - something like seekRelative(String topic, int partition, Function<Long, Long> offset) will that work? Could you tell me what the implementation will be for the function that you provide? In this case, we expect the function to calculate the offset to seek to relatively from the current offset. But if you don't know that offset to begin with, how would you calculate it in the function?

@eblocha
Copy link
Author

eblocha commented Mar 1, 2024

Correct, I only have access to the absolute offset I should seek to, with the caveat that I should not seek if I'm currently behind that offset.

I think that API would work. If the function is computing the relative offset, it would be implemented as follows:

public class ComputeRelativeOffset implements Function<Long, Long> {
    private final Long absoluteTargetOffset;

    public ComputeRelativeOffset(Long absoluteTargetOffset) {
        this.absoluteTargetOffset = absoluteTargetOffset;
    }

    public Long apply(Long currentOffset) {
        if (currentOffset > absoluteTargetOffset) {
            return absoluteTargetOffset - currentOffset;
        } else {
            return 0L; // do not change position
        }
    }
}

If the function is computing the absolute offset, it would be implemented like this:

public class ComputeNewOffset implements Function<Long, Long> {
    private final Long absoluteTargetOffset;

    public ComputeNewOffset(Long absoluteTargetOffset) {
        this.absoluteTargetOffset = absoluteTargetOffset;
    }

    public Long apply(Long currentOffset) {
        if (currentOffset > absoluteTargetOffset) {
            return absoluteTargetOffset;
        } else {
            return currentOffset;
        }
    }
}

@sobychacko
Copy link
Contributor

That makes sense. Thanks for that clarification. I think going with the absolute offset (your second approach) might be the better solution. I don't see anything that explicitly requires a relative calculation here. We can look into it next week possibly, unless you want to contribute a fix. Thanks!

sobychacko added a commit to sobychacko/spring-kafka that referenced this issue Mar 5, 2024
Fixes: spring-projects#3078

 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset
   based on the current offset. This is accomplished by a user-defined function
   where the user can make decision on the offset to seek to based on the current
   offset which is available via the function's input.
 * Adding tests, docs.
@artembilan artembilan added this to the 3.2.0-M2 milestone Mar 5, 2024
artembilan pushed a commit that referenced this issue Mar 5, 2024
Fixes: #3078

 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset
   based on the current offset. This is accomplished by a user-defined function
   where the user can make decision on the offset to seek to based on the current
   offset which is available via the function's input.
 * Adding tests, docs.

* Addressing PR review
@sobychacko
Copy link
Contributor

@eblocha We added this feature on the main branch. Please take a look at the commits above. Please try it on the latest snapshot (3.2.0-SNAPSHOT) and give us any feedback.

@eblocha
Copy link
Author

eblocha commented Mar 7, 2024

@eblocha We added this feature on the main branch. Please take a look at the commits above. Please try it on the latest snapshot (3.2.0-SNAPSHOT) and give us any feedback.

Hi @sobychacko, thanks for the quick turnaround! I tried out the feature, but I am getting the following error:

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2491)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2475)
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1747)
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seek(KafkaMessageListenerContainer.java:3330)

I think this is happening because the implementation reads the current offset when the seek function is called, instead of when it is actually applied. The existing seek methods seem to defer accessing the current offset until the seeks are performed.

@sobychacko
Copy link
Contributor

@eblocha, thanks for that feedback. We will take a look at this soon.

sobychacko added a commit to sobychacko/spring-kafka that referenced this issue Mar 8, 2024
Fixes: spring-projects#3108

* When the seek API that uses the user-provided function to compute the offset to seek,
  there is a concurrency issue in which the Kafka consumer is used unsafely.

See this for details: spring-projects#3078 (comment)
artembilan pushed a commit that referenced this issue Mar 8, 2024
Fixes: #3108

* When the seek API that uses the user-provided function to compute the offset to seek,
  there is a concurrency issue in which the Kafka consumer is used unsafely.

See this for details: #3078 (comment)

* Fix Checkstyle violation
@sobychacko
Copy link
Contributor

@eblocha The concurrency issue is addressed via #3108. Could you try it on the latest snapshot and let us know about any further issues?

@eblocha
Copy link
Author

eblocha commented Mar 8, 2024

@sobychacko It's working beautifully, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants