-
Notifications
You must be signed in to change notification settings - Fork 2k
add workqueue implementation for kube-style controller #525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* The default delaying queue implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more documentation about what this is and how it is used. We can't assume people are familiar with the client go approach.
private DelayQueue delayQueue = new DelayQueue(); | ||
private ExecutorService waitingWorker = Executors.newSingleThreadExecutor(); | ||
|
||
public DefaultDelayingQueue() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth making the ExecutorService
an optional parameter, for testing and other reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
/** | ||
* The default delaying queue implementation. | ||
*/ | ||
public class DefaultDelayingQueue extends DefaultWorkQueue implements DelayingQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use generic types instead of just passing Object
} | ||
} | ||
|
||
private class WaitForEntry implements Delayed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this class static. (alternately, just use an anonymous class above)
I think I prefer the anonymous class approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use static inner class.
/** | ||
* The default rate limiting queue implementation. | ||
*/ | ||
public class DefaultRateLimitingQueue extends DefaultDelayingQueue implements RateLimitingQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above wrt generics.
private final Condition notEmpty = lock.newCondition(); | ||
|
||
public DefaultWorkQueue() { | ||
this.queue = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lists.newLinkedList() (or don't use Sets below...)
|
||
private AtomicBoolean shuttingDown = new AtomicBoolean(false); | ||
|
||
private final ReentrantLock lock = new ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of this lock and just used synchronized
on the methods.
lock.lock(); | ||
try { | ||
while (queue.size() == 0 && !shuttingDown.get()) { | ||
notEmpty.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use this.wait()
and this.notify()
instead of condition variable.
* Get blocks until it can return an item to be processed. If shutdown = true, | ||
* the caller should end their process. | ||
* | ||
* @return the object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which object?
/** | ||
* The workqueue interface defines the queue behavior. | ||
*/ | ||
public interface WorkQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a generic instead of using Object
|
||
import java.time.Duration; | ||
|
||
public interface RateLimiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are lots of other retry libraries out there (e.g. https://github.com/jhalterman/failsafe) do we need to implement this ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't notice this before, will take a deep look
this.processing.add(obj); | ||
this.dirty.remove(obj); | ||
return obj; | ||
} catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you want to catch here? this seems like a bad idea... (at the very least we should log)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this depends on how to handle exception, throw exception out here seems better
|
||
@Override | ||
public void shutDown() { | ||
this.shuttingDown.compareAndSet(false, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to notify()
here too...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, should use notifyAll()
here
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
/** | ||
* The default workqueue implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should document how you expect this to be used.
I have three high level comments:
Other comments: More tests are definitely required, some sample code would be great. |
thx so much for help w/ this @adohe ! do you think about splitting this pull into two parts? one for initializing the new module, another for the implementation so that it won't be the blocker for other tasks. thx!
+1 |
@adohe seems recently too swamped to bump the thread. let me take over it. we'll finally get a java controller example into this repo. |
c2563f2
to
5fc35cd
Compare
comments addressed, I am working on much better usage documentation and test cases. cc @brendandburns @yue9944882 |
ci looks not happy |
18f92b2
to
bd4ada9
Compare
Any update on this PR? I really want to use this feature in client-java :) |
@cizezsy the guy is recently too busy to push the progress. and also, the pull is a bit more complex so it might take time. welcome to split an another pass if you want to contribute. |
package io.kubernetes.client.extended.function; | ||
|
||
@FunctionalInterface | ||
public interface ConditionFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it, use functional.Supplier as replacement
a6c2951
to
b862d68
Compare
/lgtm lazy consensus if @brendandburns would like another rounds of review |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: adohe, yue9944882 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/hold cancel |
ref #523.
Need more tests.