-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Compute New Seek Position From Current Offset #3078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@eblocha, I want to understand this use case a bit more. The
When you are setting |
@sobychacko Thanks for the response! With For a bit more context, the topic contains change-data-capture records from a database, and the events mark when a snapshot was completed for a table (the snapshot data is not sent to the topic). The producer is paused during the snapshot, and saves metadata containing the last offset/partition for the table data before restarting the producer. I can initialize data or recover from failures by consuming the last taken snapshot, then seeking the consumer back to when it was taken. If I am doing this for multiple tables whose snapshots may have been taken at different times/offsets, I don't want to jump forward in the partition after consuming another snapshot for a different table.
Rough diagram above. If a consumer reads snapshot for table_a, then snapshot for table_b, I don't want to seek forward to the position table_b was taken on, because I may skip changes that happened to table_a after the snapshot. |
If I understand it correctly, you don't have info about the relative offset to seek to, but only an absolute offset (possibly from metadata you save?) How is this absolute offset different from the target offset? I think we should explore it more. If we can provide a new API - something like |
Correct, I only have access to the absolute offset I should seek to, with the caveat that I should not seek if I'm currently behind that offset. I think that API would work. If the function is computing the relative offset, it would be implemented as follows: public class ComputeRelativeOffset implements Function<Long, Long> {
private final Long absoluteTargetOffset;
public ComputeRelativeOffset(Long absoluteTargetOffset) {
this.absoluteTargetOffset = absoluteTargetOffset;
}
public Long apply(Long currentOffset) {
if (currentOffset > absoluteTargetOffset) {
return absoluteTargetOffset - currentOffset;
} else {
return 0L; // do not change position
}
}
} If the function is computing the absolute offset, it would be implemented like this: public class ComputeNewOffset implements Function<Long, Long> {
private final Long absoluteTargetOffset;
public ComputeNewOffset(Long absoluteTargetOffset) {
this.absoluteTargetOffset = absoluteTargetOffset;
}
public Long apply(Long currentOffset) {
if (currentOffset > absoluteTargetOffset) {
return absoluteTargetOffset;
} else {
return currentOffset;
}
}
} |
That makes sense. Thanks for that clarification. I think going with the absolute offset (your second approach) might be the better solution. I don't see anything that explicitly requires a relative calculation here. We can look into it next week possibly, unless you want to contribute a fix. Thanks! |
Fixes: spring-projects#3078 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset based on the current offset. This is accomplished by a user-defined function where the user can make decision on the offset to seek to based on the current offset which is available via the function's input. * Adding tests, docs.
Fixes: #3078 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset based on the current offset. This is accomplished by a user-defined function where the user can make decision on the offset to seek to based on the current offset which is available via the function's input. * Adding tests, docs. * Addressing PR review
@eblocha We added this feature on the |
Hi @sobychacko, thanks for the quick turnaround! I tried out the feature, but I am getting the following error:
I think this is happening because the implementation reads the current offset when the seek function is called, instead of when it is actually applied. The existing seek methods seem to defer accessing the current offset until the seeks are performed. |
@eblocha, thanks for that feedback. We will take a look at this soon. |
Fixes: spring-projects#3108 * When the seek API that uses the user-provided function to compute the offset to seek, there is a concurrency issue in which the Kafka consumer is used unsafely. See this for details: spring-projects#3078 (comment)
Fixes: #3108 * When the seek API that uses the user-provided function to compute the offset to seek, there is a concurrency issue in which the Kafka consumer is used unsafely. See this for details: #3078 (comment) * Fix Checkstyle violation
@sobychacko It's working beautifully, thank you! |
Expected Behavior
I have a use-case where I only want to seek to a specified offset if it is before the consumer's current offset. Ideally I would like to have a method on
ConsumerSeekCallback
where I would provide aFunction<Long, Long>
that computes the new offset given the current one.Current Behavior
Currently the framework only provides a way to seek to a specific offset unconditionally, or provide a relative offset from the current one. There is no way to access the current offset value when performing a seek.
Context
For my use-case, there are certain events that require re-processing records that were already processed before. However, I should not seek forward in the Kafka partition, because the partition may contain other records that are unrelated to the event and shouldn't be skipped (sorry if this is a bit vague).
So, I need to know what the current partition offset is before I decide to actually seek from the event.
In order to try to accomplish this, I have the Kafka listener extend
AbstractConsumerSeekAware
, then use the methods from that to perform the seek when the event occurs.I have considered using the
onIdleContainer
method to store the current offsets, but reading the source code it seems that this is only called after a certain time has elapsed, so these offsets are not guaranteed to be up-to-date by the time I decide to seek.I have also considered using the
KafkaListenerEndpointRegistry
, however this only allows pausing/resuming/stopping containers, and does not provide access to the underlyingConsumer
.The text was updated successfully, but these errors were encountered: