Skip to content

add workqueue implementation for kube-style controller #525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 19, 2019

Conversation

adohe-zz
Copy link
Contributor

@adohe-zz adohe-zz commented Mar 9, 2019

ref #523.

Need more tests.

@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Mar 9, 2019
@k8s-ci-robot k8s-ci-robot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Mar 9, 2019
import java.util.concurrent.TimeUnit;

/**
* The default delaying queue implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more documentation about what this is and how it is used. We can't assume people are familiar with the client go approach.

private DelayQueue delayQueue = new DelayQueue();
private ExecutorService waitingWorker = Executors.newSingleThreadExecutor();

public DefaultDelayingQueue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth making the ExecutorService an optional parameter, for testing and other reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

/**
* The default delaying queue implementation.
*/
public class DefaultDelayingQueue extends DefaultWorkQueue implements DelayingQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use generic types instead of just passing Object

}
}

private class WaitForEntry implements Delayed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this class static. (alternately, just use an anonymous class above)

I think I prefer the anonymous class approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use static inner class.

/**
* The default rate limiting queue implementation.
*/
public class DefaultRateLimitingQueue extends DefaultDelayingQueue implements RateLimitingQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above wrt generics.

private final Condition notEmpty = lock.newCondition();

public DefaultWorkQueue() {
this.queue = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lists.newLinkedList() (or don't use Sets below...)


private AtomicBoolean shuttingDown = new AtomicBoolean(false);

private final ReentrantLock lock = new ReentrantLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get rid of this lock and just used synchronized on the methods.

lock.lock();
try {
while (queue.size() == 0 && !shuttingDown.get()) {
notEmpty.await();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use this.wait() and this.notify() instead of condition variable.

* Get blocks until it can return an item to be processed. If shutdown = true,
* the caller should end their process.
*
* @return the object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which object?

/**
* The workqueue interface defines the queue behavior.
*/
public interface WorkQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a generic instead of using Object


import java.time.Duration;

public interface RateLimiter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are lots of other retry libraries out there (e.g. https://github.com/jhalterman/failsafe) do we need to implement this ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't notice this before, will take a deep look

this.processing.add(obj);
this.dirty.remove(obj);
return obj;
} catch (Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to catch here? this seems like a bad idea... (at the very least we should log)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this depends on how to handle exception, throw exception out here seems better


@Override
public void shutDown() {
this.shuttingDown.compareAndSet(false, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to notify() here too...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, should use notifyAll() here

import java.util.concurrent.locks.ReentrantLock;

/**
* The default workqueue implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should document how you expect this to be used.

@brendandburns
Copy link
Contributor

I have three high level comments:

  1. Please use generics everywhere.
  2. Please add much better documentation about usage, I don't think anyone reading this library will know how to use it.
  3. If we only ever expect to have one implementation, let's not have interfaces, let's just have the implementation.

Other comments: More tests are definitely required, some sample code would be great.

@yue9944882
Copy link
Member

yue9944882 commented Mar 10, 2019

thx so much for help w/ this @adohe !

do you think about splitting this pull into two parts? one for initializing the new module, another for the implementation so that it won't be the blocker for other tasks. thx!

  1. Please use generics everywhere.

+1

@yue9944882
Copy link
Member

@adohe seems recently too swamped to bump the thread. let me take over it. we'll finally get a java controller example into this repo.

@adohe-zz adohe-zz force-pushed the master branch 2 times, most recently from c2563f2 to 5fc35cd Compare May 1, 2019 07:39
@adohe-zz
Copy link
Contributor Author

adohe-zz commented May 1, 2019

comments addressed, I am working on much better usage documentation and test cases. cc @brendandburns @yue9944882

@yue9944882
Copy link
Member

ci looks not happy

@adohe-zz adohe-zz force-pushed the master branch 2 times, most recently from 18f92b2 to bd4ada9 Compare May 29, 2019 06:30
@k8s-ci-robot k8s-ci-robot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels May 29, 2019
@cizezsy
Copy link
Contributor

cizezsy commented Jun 22, 2019

Any update on this PR? I really want to use this feature in client-java :)

@yue9944882
Copy link
Member

@cizezsy the guy is recently too busy to push the progress. and also, the pull is a bit more complex so it might take time. welcome to split an another pass if you want to contribute.

package io.kubernetes.client.extended.function;

@FunctionalInterface
public interface ConditionFunc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it, use functional.Supplier as replacement

@cizezsy cizezsy mentioned this pull request Jul 15, 2019
@adohe-zz adohe-zz force-pushed the master branch 3 times, most recently from a6c2951 to b862d68 Compare July 17, 2019 01:06
@yue9944882
Copy link
Member

/lgtm
/approve
/hold

lazy consensus if @brendandburns would like another rounds of review

@k8s-ci-robot k8s-ci-robot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jul 17, 2019
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: adohe, yue9944882

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot added lgtm "Looks good to me", indicates that a PR is ready to be merged. approved Indicates a PR has been approved by an approver from all required OWNERS files. labels Jul 17, 2019
@yue9944882
Copy link
Member

/hold cancel

@k8s-ci-robot k8s-ci-robot removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jul 19, 2019
@k8s-ci-robot k8s-ci-robot merged commit 40982b5 into kubernetes-client:master Jul 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants