Skip to content

Leader Election #523 #538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kubernetes.client.examples;

import io.kubernetes.client.leaderelection.LeaderCallbacks;
import io.kubernetes.client.leaderelection.LeaderElectionConfig;
import io.kubernetes.client.leaderelection.LeaderElector;
import io.kubernetes.client.leaderelection.resourcelock.ConfigMapLock;
import io.kubernetes.client.leaderelection.resourcelock.ResourceLockConfig;
import io.kubernetes.client.util.Config;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* A simple example of how to use the LeaderElection Java API
*
* @author yu
*/
public class LeaderElectionExample {
public static void main(String[] args) throws IOException {
final String name = "my-cm-lock";
final String namespace = "kube-system";

ResourceLockConfig lockConfig = new ResourceLockConfig();
lockConfig.setIdentity(String.format("%s-%s", name, System.currentTimeMillis()));
ConfigMapLock lock = new ConfigMapLock(name, namespace, Config.defaultClient(), lockConfig);

LeaderCallbacks callbacks = new LeaderCallbacks() {
@Override
public void onStartedLeading() {
System.out.println("onStartedLeading");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(86400));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void onStopLeading() {
System.out.println("onStopLeading");
}

@Override
public void onNewLeader(String identity) {
System.out.println("onNewLeader identity: " + identity);
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(86400));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
LeaderElectionConfig electionConfig = new LeaderElectionConfig(
lock,
TimeUnit.SECONDS.toMillis(30),
TimeUnit.SECONDS.toMillis(15),
TimeUnit.SECONDS.toMillis(5),
callbacks,
true,
name);
LeaderElector leaderElector = new LeaderElector(electionConfig);
try {
leaderElector.run();
} finally {
leaderElector.cancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to be dense. what's the point of cancelling if the elector.run already returned?

Copy link
Author

@atttx123 atttx123 Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just fellow the logic in leaderelection.go, the Run function behave like this:

  • block the call thread
  • if itself get the leader lock, run a goroutine for OnStartedLeading
  • if others get the leader lock, this lock will fail into a while sleep loop, unit itself get the leader lock
    so, the leaderElector.run() will block the thread and will not return, so the cancel() function is designed for elegant exit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel here seems supposed to called in another thread. what'll read the boolean if run already exit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,cancel() set an AtomicBoolean like a stop channel in golang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the example suggests that "we're supposed to call cancel after the elector exited". i'm confused about the order.

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.kubernetes.client.leaderelection;

/**
* LeaderCallbacks are callbacks that are triggered during certain
* lifecycle events of the LeaderElector. These are invoked asynchronously.
*
* possible future callbacks:
* * OnChallenge()
*/
public interface LeaderCallbacks {
/**
* OnStartedLeading is called when a LeaderElector client starts leading
*/
void onStartedLeading();

/**
* OnStoppedLeading is called when a LeaderElector client stops leading
*/
void onStopLeading();

/**
* OnNewLeader is called when the client observes a leader that is
* not the previously observed leader. This includes the first observed
* leader when the client starts.
*/
void onNewLeader(String identity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.kubernetes.client.leaderelection;

import io.kubernetes.client.leaderelection.resourcelock.Lock;

/**
* Created by IDEA on 2019-03-24 16:24
*
* @author yu
*/
public class LeaderElectionConfig {
/**
* Lock is the resource that will be used for locking
*/
private Lock lock;
/**
* LeaseDuration is the duration that non-leader candidates will
* wait to force acquire leadership. This is measured against time of
* last observed ack.
*/
private long leaseDuration;
/**
* RenewDeadline is the duration that the acting master will retry
* refreshing leadership before giving up.
*/
private long renewDeadline;
/**
* RetryPeriod is the duration the LeaderElector clients should wait
* between tries of actions.
*/
private long retryPeriod;
/**
* Callbacks are callbacks that are triggered during certain lifecycle
* events of the LeaderElector
*/
private LeaderCallbacks callbacks;
/**
* ReleaseOnCancel should be set true if the lock should be released
* when the run context is cancelled. If you set this to true, you must
* ensure all code guarded by this lease has successfully completed
* prior to cancelling the context, or you may have two processes
* simultaneously acting on the critical path.
*/
private boolean releaseOnCancel;
/**
* Name is the name of the resource lock for debugging
*/
private String name;

public LeaderElectionConfig(
Lock lock, long leaseDuration, long renewDeadline, long retryPeriod, LeaderCallbacks callbacks,
boolean releaseOnCancel, String name) {
this.lock = lock;
this.leaseDuration = leaseDuration;
this.renewDeadline = renewDeadline;
this.retryPeriod = retryPeriod;
this.callbacks = callbacks;
this.releaseOnCancel = releaseOnCancel;
this.name = name;
}

/**
* Lock is the resource that will be used for locking
*/
public Lock getLock() {
return lock;
}

public void setLock(Lock lock) {
this.lock = lock;
}

/**
* LeaseDuration is the duration that non-leader candidates will
* wait to force acquire leadership. This is measured against time of
* last observed ack.
*/
public long getLeaseDuration() {
return leaseDuration;
}

public void setLeaseDuration(long leaseDuration) {
this.leaseDuration = leaseDuration;
}

/**
* RenewDeadline is the duration that the acting master will retry
* refreshing leadership before giving up.
*/
public long getRenewDeadline() {
return renewDeadline;
}

public void setRenewDeadline(long renewDeadline) {
this.renewDeadline = renewDeadline;
}

/**
* RetryPeriod is the duration the LeaderElector clients should wait
* between tries of actions.
*/
public long getRetryPeriod() {
return retryPeriod;
}

public void setRetryPeriod(long retryPeriod) {
this.retryPeriod = retryPeriod;
}

/**
* ReleaseOnCancel should be set true if the lock should be released
* when the run context is cancelled. If you set this to true, you must
* ensure all code guarded by this lease has successfully completed
* prior to cancelling the context, or you may have two processes
* simultaneously acting on the critical path.
*/
public boolean isReleaseOnCancel() {
return releaseOnCancel;
}

public void setReleaseOnCancel(boolean releaseOnCancel) {
this.releaseOnCancel = releaseOnCancel;
}

/**
* Name is the name of the resource lock for debugging
*/
public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

/**
* Callbacks are callbacks that are triggered during certain lifecycle
* events of the LeaderElector
*/
public LeaderCallbacks getCallbacks() {
return callbacks;
}

public void setCallbacks(LeaderCallbacks callbacks) {
this.callbacks = callbacks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.kubernetes.client.leaderelection;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;

/**
* LeaderElectionRecord is the record that is stored in the leader election annotation.
* This information should be used for observational purposes only and could be replaced
* with a random string (e.g. UUID) with only slight modification of this code.
*/
public class LeaderElectionRecord {
private static SimpleDateFormat dateFormat = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss'Z'");

/**
* HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and
* all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not
* attempt to acquire leases with empty identities and will wait for the full lease
* interval to expire before attempting to reacquire. This value is set to empty when
* a client voluntarily steps down.
*/
private String holderIdentity;
private int leaseDurationSeconds;
private String acquireTime;
private String renewTime;
private int leaderTransitions = 0;

public LeaderElectionRecord() {}

public LeaderElectionRecord(String holderIdentity, int leaseDurationSeconds, String acquireTime, String renewTime) {
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

this.holderIdentity = holderIdentity;
this.leaseDurationSeconds = leaseDurationSeconds;
this.acquireTime = acquireTime;
this.renewTime = renewTime;
}

public LeaderElectionRecord(String holderIdentity, int leaseDurationSeconds, long acquireTime, long renewTime) {
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

this.holderIdentity = holderIdentity;
this.leaseDurationSeconds = leaseDurationSeconds;
setAcquireTime(acquireTime);
setRenewTime(renewTime);
}

@Override
public String toString() {
return String.format("LeaderElectionRecord<HolderIdentity=%s,LeaseDurationSeconds=%s,AcquireTime=%s,RenewTime=%s>",
holderIdentity, leaseDurationSeconds, acquireTime, renewTime);
}

@Override
public int hashCode() {
return toString().hashCode();
}

@Override
public boolean equals(Object obj) {
return obj instanceof LeaderElectionRecord && obj.hashCode() == hashCode();
}

/**
* HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and
* all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not
* attempt to acquire leases with empty identities and will wait for the full lease
* interval to expire before attempting to reacquire. This value is set to empty when
* a client voluntarily steps down.
*/
public String getHolderIdentity() {
return holderIdentity;
}

public void setHolderIdentity(String holderIdentity) {
this.holderIdentity = holderIdentity;
}

public int getLeaseDurationSeconds() {
return leaseDurationSeconds;
}

public void setLeaseDurationSeconds(int leaseDurationSeconds) {
this.leaseDurationSeconds = leaseDurationSeconds;
}

public String getAcquireTime() {
return acquireTime;
}

public void setAcquireTime(String acquireTime) {
this.acquireTime = acquireTime;
}

public void setAcquireTime(long acquireTime) {
this.acquireTime = dateFormat.format(new Date(acquireTime));
}

public String getRenewTime() {
return renewTime;
}

public void setRenewTime(String renewTime) {
this.renewTime = renewTime;
}

public void setRenewTime(long renewTime) {
this.renewTime = dateFormat.format(new Date(renewTime));
}

public int getLeaderTransitions() {
return leaderTransitions;
}

public void setLeaderTransitions(int leaderTransitions) {
this.leaderTransitions = leaderTransitions;
}
}
Loading