diff --git a/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java b/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java new file mode 100644 index 0000000000..b7fc7040a3 --- /dev/null +++ b/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java @@ -0,0 +1,68 @@ +package io.kubernetes.client.examples; + +import io.kubernetes.client.leaderelection.LeaderCallbacks; +import io.kubernetes.client.leaderelection.LeaderElectionConfig; +import io.kubernetes.client.leaderelection.LeaderElector; +import io.kubernetes.client.leaderelection.resourcelock.ConfigMapLock; +import io.kubernetes.client.leaderelection.resourcelock.ResourceLockConfig; +import io.kubernetes.client.util.Config; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * A simple example of how to use the LeaderElection Java API + * + * @author yu + */ +public class LeaderElectionExample { + public static void main(String[] args) throws IOException { + final String name = "my-cm-lock"; + final String namespace = "kube-system"; + + ResourceLockConfig lockConfig = new ResourceLockConfig(); + lockConfig.setIdentity(String.format("%s-%s", name, System.currentTimeMillis())); + ConfigMapLock lock = new ConfigMapLock(name, namespace, Config.defaultClient(), lockConfig); + + LeaderCallbacks callbacks = new LeaderCallbacks() { + @Override + public void onStartedLeading() { + System.out.println("onStartedLeading"); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(86400)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void onStopLeading() { + System.out.println("onStopLeading"); + } + + @Override + public void onNewLeader(String identity) { + System.out.println("onNewLeader identity: " + identity); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(86400)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + LeaderElectionConfig electionConfig = new LeaderElectionConfig( + lock, + TimeUnit.SECONDS.toMillis(30), + TimeUnit.SECONDS.toMillis(15), + TimeUnit.SECONDS.toMillis(5), + callbacks, + true, + name); + LeaderElector leaderElector = new LeaderElector(electionConfig); + try { + leaderElector.run(); + } finally { + leaderElector.cancel(); + } + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/LeaderCallbacks.java b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderCallbacks.java new file mode 100644 index 0000000000..044320c170 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderCallbacks.java @@ -0,0 +1,38 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection; + +/** + * LeaderCallbacks are callbacks that are triggered during certain lifecycle events of the + * LeaderElector. These are invoked asynchronously. + * + * possible future callbacks: * OnChallenge() + */ +public interface LeaderCallbacks { + + /** + * OnStartedLeading is called when a LeaderElector client starts leading + */ + void onStartedLeading(); + + /** + * OnStoppedLeading is called when a LeaderElector client stops leading + */ + void onStopLeading(); + + /** + * OnNewLeader is called when the client observes a leader that is not the previously observed + * leader. This includes the first observed leader when the client starts. + */ + void onNewLeader(String identity); +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElectionConfig.java b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElectionConfig.java new file mode 100644 index 0000000000..9c62a1c33a --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElectionConfig.java @@ -0,0 +1,150 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection; + +import io.kubernetes.client.leaderelection.resourcelock.Lock; + + +public class LeaderElectionConfig { + + /** + * Lock is the resource that will be used for locking + */ + private Lock lock; + /** + * LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. + * This is measured against time of last observed ack. + */ + private long leaseDuration; + /** + * RenewDeadline is the duration that the acting master will retry refreshing leadership before + * giving up. + */ + private long renewDeadline; + /** + * RetryPeriod is the duration the LeaderElector clients should wait between tries of actions. + */ + private long retryPeriod; + /** + * Callbacks are callbacks that are triggered during certain lifecycle events of the + * LeaderElector + */ + private LeaderCallbacks callbacks; + /** + * ReleaseOnCancel should be set true if the lock should be released when the run context is + * cancelled. If you set this to true, you must ensure all code guarded by this lease has + * successfully completed prior to cancelling the context, or you may have two processes + * simultaneously acting on the critical path. + */ + private boolean releaseOnCancel; + /** + * Name is the name of the resource lock for debugging + */ + private String name; + + public LeaderElectionConfig( + Lock lock, long leaseDuration, long renewDeadline, long retryPeriod, + LeaderCallbacks callbacks, + boolean releaseOnCancel, String name) { + this.lock = lock; + this.leaseDuration = leaseDuration; + this.renewDeadline = renewDeadline; + this.retryPeriod = retryPeriod; + this.callbacks = callbacks; + this.releaseOnCancel = releaseOnCancel; + this.name = name; + } + + /** + * Lock is the resource that will be used for locking + */ + public Lock getLock() { + return lock; + } + + public void setLock(Lock lock) { + this.lock = lock; + } + + /** + * LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. + * This is measured against time of last observed ack. + */ + public long getLeaseDuration() { + return leaseDuration; + } + + public void setLeaseDuration(long leaseDuration) { + this.leaseDuration = leaseDuration; + } + + /** + * RenewDeadline is the duration that the acting master will retry refreshing leadership before + * giving up. + */ + public long getRenewDeadline() { + return renewDeadline; + } + + public void setRenewDeadline(long renewDeadline) { + this.renewDeadline = renewDeadline; + } + + /** + * RetryPeriod is the duration the LeaderElector clients should wait between tries of actions. + */ + public long getRetryPeriod() { + return retryPeriod; + } + + public void setRetryPeriod(long retryPeriod) { + this.retryPeriod = retryPeriod; + } + + /** + * ReleaseOnCancel should be set true if the lock should be released when the run context is + * cancelled. If you set this to true, you must ensure all code guarded by this lease has + * successfully completed prior to cancelling the context, or you may have two processes + * simultaneously acting on the critical path. + */ + public boolean isReleaseOnCancel() { + return releaseOnCancel; + } + + public void setReleaseOnCancel(boolean releaseOnCancel) { + this.releaseOnCancel = releaseOnCancel; + } + + /** + * Name is the name of the resource lock for debugging + */ + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + /** + * Callbacks are callbacks that are triggered during certain lifecycle events of the + * LeaderElector + */ + public LeaderCallbacks getCallbacks() { + return callbacks; + } + + public void setCallbacks(LeaderCallbacks callbacks) { + this.callbacks = callbacks; + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElectionRecord.java b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElectionRecord.java new file mode 100644 index 0000000000..30ce5170f5 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElectionRecord.java @@ -0,0 +1,133 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +/** + * LeaderElectionRecord is the record that is stored in the leader election annotation. This + * information should be used for observational purposes only and could be replaced with a random + * string (e.g. UUID) with only slight modification of this code. + */ +public class LeaderElectionRecord { + + private static SimpleDateFormat dateFormat = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss'Z'"); + + /** + * HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and all callers + * may acquire. Versions of this library prior to Kubernetes 1.14 will not attempt to acquire + * leases with empty identities and will wait for the full lease interval to expire before + * attempting to reacquire. This value is set to empty when a client voluntarily steps down. + */ + private String holderIdentity; + private int leaseDurationSeconds; + private String acquireTime; + private String renewTime; + private int leaderTransitions = 0; + + public LeaderElectionRecord() { + } + + public LeaderElectionRecord(String holderIdentity, int leaseDurationSeconds, String acquireTime, + String renewTime) { + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + this.holderIdentity = holderIdentity; + this.leaseDurationSeconds = leaseDurationSeconds; + this.acquireTime = acquireTime; + this.renewTime = renewTime; + } + + public LeaderElectionRecord(String holderIdentity, int leaseDurationSeconds, long acquireTime, + long renewTime) { + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + this.holderIdentity = holderIdentity; + this.leaseDurationSeconds = leaseDurationSeconds; + setAcquireTime(acquireTime); + setRenewTime(renewTime); + } + + @Override + public String toString() { + return String.format( + "LeaderElectionRecord", + holderIdentity, leaseDurationSeconds, acquireTime, renewTime); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof LeaderElectionRecord && obj.hashCode() == hashCode(); + } + + /** + * HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and all callers + * may acquire. Versions of this library prior to Kubernetes 1.14 will not attempt to acquire + * leases with empty identities and will wait for the full lease interval to expire before + * attempting to reacquire. This value is set to empty when a client voluntarily steps down. + */ + public String getHolderIdentity() { + return holderIdentity; + } + + public void setHolderIdentity(String holderIdentity) { + this.holderIdentity = holderIdentity; + } + + public int getLeaseDurationSeconds() { + return leaseDurationSeconds; + } + + public void setLeaseDurationSeconds(int leaseDurationSeconds) { + this.leaseDurationSeconds = leaseDurationSeconds; + } + + public String getAcquireTime() { + return acquireTime; + } + + public void setAcquireTime(String acquireTime) { + this.acquireTime = acquireTime; + } + + public void setAcquireTime(long acquireTime) { + this.acquireTime = dateFormat.format(new Date(acquireTime)); + } + + public String getRenewTime() { + return renewTime; + } + + public void setRenewTime(String renewTime) { + this.renewTime = renewTime; + } + + public void setRenewTime(long renewTime) { + this.renewTime = dateFormat.format(new Date(renewTime)); + } + + public int getLeaderTransitions() { + return leaderTransitions; + } + + public void setLeaderTransitions(int leaderTransitions) { + this.leaderTransitions = leaderTransitions; + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElector.java b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElector.java new file mode 100644 index 0000000000..e6ea577469 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/LeaderElector.java @@ -0,0 +1,315 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection; + +import io.kubernetes.client.ApiException; +import io.kubernetes.client.leaderelection.resourcelock.ResourceNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * LeaderElector is a leader election client. + */ +public class LeaderElector implements Runnable { + + private Random random = new Random(); + private Logger logger = LoggerFactory.getLogger(LeaderElector.class); + + private LeaderElectionConfig config; + /** + * internal bookkeeping + */ + private LeaderElectionRecord observedRecord; + private Long observedTime; + /** + * used to implement OnNewLeader(), may lag slightly from the value observedRecord.HolderIdentity + * if the transition has not yet been reported. + */ + private String reportedLeader; + + private AtomicBoolean cancel = new AtomicBoolean(false); + + private ExecutorService executor; + + private static final double JITTER_FACTOR = 1.2; + + public LeaderElector(LeaderElectionConfig config) { + this(config, 10); + } + + public LeaderElector(LeaderElectionConfig config, int corePoolSize) { + String reason = checkLeaderElectionConfig(config); + if (reason != null) { + throw new RuntimeException(reason); + } + this.config = config; + executor = Executors.newScheduledThreadPool(corePoolSize); + } + + public LeaderElector(LeaderElectionConfig config, int corePoolSize, ThreadFactory threadFactory) { + String reason = checkLeaderElectionConfig(config); + if (reason != null) { + throw new RuntimeException(reason); + } + this.config = config; + executor = Executors.newScheduledThreadPool(corePoolSize, threadFactory); + } + + /** + * @param config -- the config to check + * @return null for no problem; otherwise return fail reason + */ + public static String checkLeaderElectionConfig(LeaderElectionConfig config) { + if (config.getLeaseDuration() <= config.getRenewDeadline()) { + return "leaseDuration must be greater than renewDeadline"; + } + if (config.getRenewDeadline() <= JITTER_FACTOR * config.getRetryPeriod()) { + return "renewDeadline must be greater than retryPeriod*JitterFactor"; + } + if (config.getLeaseDuration() < 1) { + return "leaseDuration must be greater than zero"; + } + if (config.getRenewDeadline() < 1) { + return "renewDeadline must be greater than zero"; + } + if (config.getRetryPeriod() < 1) { + return "retryPeriod must be greater than zero"; + } + if (config.getLock() == null) { + return "Lock must not be null."; + } + return null; + } + + /** + * this method will block until lost the lock just like RunOrDie() method in client-go + */ + @Override + public void run() { + try { + if (!acquire(cancel)) { + return; + } + Runnable runnable = () -> config.getCallbacks().onStartedLeading(); + executor.submit(runnable); + renew(cancel); + } finally { + config.getCallbacks().onStopLeading(); + executor.shutdown(); + } + } + + public void cancel() { + cancel.set(false); + } + + /** + * GetLeader returns the identity of the last observed leader or returns the empty string if no + * leader has yet been observed. + */ + public String getLeader() { + return observedRecord.getHolderIdentity(); + } + + /** + * IsLeader returns true if the last observed leader was this client else returns false. + */ + public boolean isLeader() { + return config.getLock().identity().equals(observedRecord.getHolderIdentity()); + } + + /** + * Check will determine if the current lease is expired by more than timeout. + * + * @return true for expired; + */ + public boolean check(long maxTolerableExpiredLease) { + if (isLeader()) { + // If we are more than timeout seconds after the lease duration that is past the timeout + // on the lease renew. Time to start reporting ourselves as unhealthy. We should have + // died but conditions like deadlock can prevent this. (See #70819) + // failed election to renew leadership on lease + return System.currentTimeMillis() - observedTime + > config.getLeaseDuration() + maxTolerableExpiredLease; + } + return false; + } + + /** + * acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew + * succeeds. return false if ctx signals done. + */ + private boolean acquire(AtomicBoolean cancel) { + String desc = config.getLock().describe(); + logger.info("attempting to acquire leader lease {}...", desc); + try { + while (!cancel.get()) { + boolean success = tryAcquireOrRenew(); + maybeReportTransition(); + if (success) { + config.getLock().recordEvent("became leader"); + logger.info("successfully acquired lease {}", desc); + return true; + } else { + logger.debug("failed to acuqire lease {}", desc); + } + // sleep time jittered + Thread.sleep(config.getRetryPeriod() + + (long) (random.nextDouble() * JITTER_FACTOR * config.getRetryPeriod())); + } + } catch (InterruptedException e) { + logger.warn("acquire thread interrupted", e); + } + return false; + } + + /** + * renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or + * ctx signals done. + */ + private void renew(AtomicBoolean cancel) { + try { + while (!cancel.get()) { + String desc = config.getLock().describe(); + try { + Callable callable = () -> { + boolean done = false; + while (!done && !cancel.get()) { + done = tryAcquireOrRenew(); + Thread.sleep(config.getRetryPeriod()); + } + return done; + }; + executor.invokeAll( + Arrays.asList(callable), config.getRenewDeadline(), TimeUnit.MILLISECONDS); + maybeReportTransition(); + logger.debug("successfully renewed lease {}", desc); + } catch (InterruptedException e) { + logger.warn("failed to tryAcquireOrRenew", e); + } catch (Exception e) { + config.getLock().recordEvent("stopped leading"); + logger.info("failed to renew lease {}", desc, e); + } + Thread.sleep(config.getRetryPeriod()); + } + } catch (InterruptedException e) { + logger.warn("renew thread interrupted", e); + } finally { + if (config.isReleaseOnCancel()) { + release(); + } + } + } + + /** + * tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, else it tries + * to renew the lease if it has already been acquired. Returns true on success else returns + * false. + * + * @return return true when acquire (or renew) lock success, otherwise false + */ + private boolean tryAcquireOrRenew() { + final long now = System.currentTimeMillis(); + LeaderElectionRecord leaderElectionRecord = new LeaderElectionRecord( + config.getLock().identity(), + (int) TimeUnit.MILLISECONDS.toSeconds(config.getLeaseDuration()), now, now); + LeaderElectionRecord oldLeaderElectionRecord; + try { + // 1. obtain or create the ElectionRecord + oldLeaderElectionRecord = config.getLock().get(); + } catch (ResourceNotFoundException e) { + try { + config.getLock().create(leaderElectionRecord); + } catch (ApiException e1) { + logger.error("error initially creating leader election record", e1); + return false; + } + observedRecord = leaderElectionRecord; + observedTime = now; + return true; + } catch (ApiException e) { + logger.error("error retrieving resource lock, {}", config.getLock().describe(), e); + return false; + } + + // 2. Record obtained, check the Identity & Time + if (!oldLeaderElectionRecord.equals(observedRecord)) { + observedRecord = oldLeaderElectionRecord; + observedTime = System.currentTimeMillis(); + } + if (observedRecord.getHolderIdentity().length() > 0 + && (observedTime + config.getLeaseDuration() > System.currentTimeMillis()) + && !isLeader()) { + logger.debug("lock is held by {} and has not yet expired", + oldLeaderElectionRecord.getHolderIdentity()); + return false; + } + + // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if (isLeader()) { + leaderElectionRecord.setAcquireTime(oldLeaderElectionRecord.getAcquireTime()); + leaderElectionRecord.setLeaderTransitions(oldLeaderElectionRecord.getLeaderTransitions()); + } else { + leaderElectionRecord.setLeaderTransitions(oldLeaderElectionRecord.getLeaderTransitions() + 1); + } + + // update lock itself + try { + config.getLock().update(leaderElectionRecord); + observedRecord = leaderElectionRecord; + observedTime = System.currentTimeMillis(); + return true; + } catch (ApiException e) { + logger.error("Failed to update lock", e); + return false; + } + } + + private void maybeReportTransition() { + if (reportedLeader != null && reportedLeader.equals(observedRecord.getHolderIdentity())) { + return; + } + reportedLeader = observedRecord.getHolderIdentity(); + Callable callable = () -> { + config.getCallbacks().onNewLeader(reportedLeader); + return true; + }; + executor.submit(callable); + } + + /** + * release attempts to release the leader lease if we have acquired it. + */ + private boolean release() { + if (!isLeader()) { + return true; + } + LeaderElectionRecord leaderElectionRecord = new LeaderElectionRecord(); + leaderElectionRecord.setLeaderTransitions(observedRecord.getLeaderTransitions()); + try { + config.getLock().update(leaderElectionRecord); + } catch (ApiException e) { + logger.error("Failed to release lock", e); + return false; + } + observedRecord = leaderElectionRecord; + observedTime = System.currentTimeMillis(); + return true; + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ConfigMapLock.java b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ConfigMapLock.java new file mode 100644 index 0000000000..6d6457e921 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ConfigMapLock.java @@ -0,0 +1,24 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection.resourcelock; + +import io.kubernetes.client.ApiClient; + + +public class ConfigMapLock extends ResourceLock { + + public ConfigMapLock(String name, String namespace, ApiClient client, + ResourceLockConfig resourceLockConfig) { + super(name, namespace, client, "v1", "ConfigMap", "configmaps", resourceLockConfig); + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/Lock.java b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/Lock.java new file mode 100644 index 0000000000..217bce511a --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/Lock.java @@ -0,0 +1,38 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection.resourcelock; + +import io.kubernetes.client.ApiException; +import io.kubernetes.client.leaderelection.LeaderElectionRecord; + +/** + * Lock offers a common interface for locking on arbitrary resources used in leader election. The + * Interface is used to hide the details on specific implementations in order to allow them to + * change over time. This interface is strictly for use by the leaderelection code. + */ +public interface Lock { + + String LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"; + + LeaderElectionRecord get() throws ApiException; + + void create(LeaderElectionRecord leaderElectionRecord) throws ApiException; + + void update(LeaderElectionRecord leaderElectionRecord) throws ApiException; + + void recordEvent(String s); + + String describe(); + + String identity(); +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceLock.java b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceLock.java new file mode 100644 index 0000000000..8867a2183a --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceLock.java @@ -0,0 +1,197 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection.resourcelock; + +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; +import com.squareup.okhttp.Call; +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.ApiException; +import io.kubernetes.client.ApiResponse; +import io.kubernetes.client.leaderelection.LeaderElectionRecord; + +import java.util.HashMap; +import java.util.Map; + +public class ResourceLock implements Lock { + + private static final String METADATA = "metadata"; + private static final String ANNOTATIONS = "annotations"; + + private final String name; + private final String namespace; + + private final ApiClient client; + private final String apiVersion; + private final String kind; + private final String type; + + private final ResourceLockConfig resourceLockConfig; + private JsonObject object; + + public ResourceLock(String name, String namespace, ApiClient client, String apiVersion, + String kind, String type, + ResourceLockConfig resourceLockConfig) { + this.name = name; + this.namespace = namespace; + this.client = client; + this.apiVersion = apiVersion; + this.kind = kind; + this.type = type; + this.resourceLockConfig = resourceLockConfig; + } + + private String getRootUrl() { + return String.format("/api/%s/namespaces/%s/%s", + client.escapeString(apiVersion), + client.escapeString(namespace), + client.escapeString(type)); + } + + private Map getHeaders() { + Map localVarHeaderParams = new HashMap(); + final String[] localVarAccepts = { + "application/json", "application/yaml", "application/vnd.kubernetes.protobuf" + }; + final String localVarAccept = client.selectHeaderAccept(localVarAccepts); + if (localVarAccept != null) { + localVarHeaderParams.put("Accept", localVarAccept); + } + + final String[] localVarContentTypes = { + "*/*" + }; + final String localVarContentType = client.selectHeaderContentType(localVarContentTypes); + localVarHeaderParams.put("Content-Type", localVarContentType); + + return localVarHeaderParams; + } + + private String[] getAuthNames() { + return new String[]{"BearerToken"}; + } + + /** + * Get returns the election record from a ConfigMap Annotation + */ + @Override + public LeaderElectionRecord get() throws ApiException { + LeaderElectionRecord leaderElectionRecord = null; + String url = String.format("%s/%s", getRootUrl(), client.escapeString(name)); + Call call = client.buildCall(url, "GET", null, null, null, + getHeaders(), null, getAuthNames(), null); + try { + ApiResponse response = client.execute(call, new TypeToken() { + }.getType()); + object = response.getData(); + } catch (ApiException e) { + if (e.getCode() == 404) { + throw new ResourceNotFoundException(url); + } + } + if (object.has(METADATA)) { + JsonObject meta = object.getAsJsonObject(METADATA); + if (!meta.has(ANNOTATIONS)) { + JsonObject annotations = new JsonObject(); + meta.add(ANNOTATIONS, annotations); + } + JsonObject annotations = meta.getAsJsonObject(ANNOTATIONS); + if (annotations.has(LeaderElectionRecordAnnotationKey)) { + leaderElectionRecord = client.getJSON().deserialize( + annotations.get(LeaderElectionRecordAnnotationKey).getAsString(), + LeaderElectionRecord.class); + } + } + return leaderElectionRecord; + } + + /** + * build Resource Object with Leader Election annotation + * + * @param json leader election annotation value + * @return JsonObject + */ + private JsonObject getObject(String json) { + JsonObject annotations = new JsonObject(); + annotations.addProperty(LeaderElectionRecordAnnotationKey, json); + JsonObject metadata = new JsonObject(); + if (name != null) { + metadata.addProperty("name", name); + } + metadata.add(ANNOTATIONS, annotations); + JsonObject cm = new JsonObject(); + cm.addProperty("apiVersion", apiVersion); + cm.addProperty("kind", kind); + cm.add(METADATA, metadata); + return cm; + } + + /** + * Create attempts to create a LeaderElectionRecord annotation + */ + @Override + public void create(LeaderElectionRecord leaderElectionRecord) throws ApiException { + String json = client.getJSON().serialize(leaderElectionRecord); + JsonObject cm = getObject(json); + + String url = getRootUrl(); + Call call = client.buildCall(url, "POST", null, null, cm, getHeaders(), + null, getAuthNames(), null); + ApiResponse response = client.execute(call, new TypeToken() { + }.getType()); + object = response.getData(); + } + + /** + * Update will update an existing annotation on a given resource. + */ + @Override + public void update(LeaderElectionRecord leaderElectionRecord) throws ApiException { + if (object == null) { + throw new ApiException("configmap not initialized, call get or create first"); + } + String json = client.getJSON().serialize(leaderElectionRecord); + JsonObject cm = getObject(json); + String url = String.format("%s/%s", getRootUrl(), client.escapeString(name)); + Call call = client.buildCall(url, "PUT", null, null, cm, getHeaders(), + null, getAuthNames(), null); + ApiResponse response = client.execute(call, new TypeToken() { + }.getType()); + object = response.getData(); + } + + /** + * RecordEvent in leader election while adding meta-data + */ + @Override + public void recordEvent(String s) { + // TODO: not implement + return; + } + + /** + * Describe is used to convert details on current resource lock into a string + */ + @Override + public String describe() { + return String.format("%s/%s", namespace, name); + } + + /** + * @return the Identity of the lock + */ + @Override + public String identity() { + return resourceLockConfig.getIdentity(); + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceLockConfig.java b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceLockConfig.java new file mode 100644 index 0000000000..db7d17cb49 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceLockConfig.java @@ -0,0 +1,37 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection.resourcelock; + +/** + * ResourceLockConfig common data that exists across different resource locks + */ +public class ResourceLockConfig { + + /** + * Identity is the unique string identifying a lease holder across all participants in an + * election. + */ + private String identity; + + /** + * Identity is the unique string identifying a lease holder across all participants in an + * election. + */ + public String getIdentity() { + return identity; + } + + public void setIdentity(String identity) { + this.identity = identity; + } +} diff --git a/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceNotFoundException.java b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceNotFoundException.java new file mode 100644 index 0000000000..8bc2feed01 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/leaderelection/resourcelock/ResourceNotFoundException.java @@ -0,0 +1,27 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.leaderelection.resourcelock; + +import io.kubernetes.client.ApiException; + + +public class ResourceNotFoundException extends ApiException { + + public ResourceNotFoundException() { + + } + + public ResourceNotFoundException(String message) { + super(message); + } +} diff --git a/util/src/test/java/io/kubernetes/client/util/leaderelection/LeaderElectionTest.java b/util/src/test/java/io/kubernetes/client/util/leaderelection/LeaderElectionTest.java new file mode 100644 index 0000000000..da70e38aef --- /dev/null +++ b/util/src/test/java/io/kubernetes/client/util/leaderelection/LeaderElectionTest.java @@ -0,0 +1,79 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package io.kubernetes.client.util.leaderelection; + +import io.kubernetes.client.leaderelection.LeaderCallbacks; +import io.kubernetes.client.leaderelection.LeaderElectionConfig; +import io.kubernetes.client.leaderelection.LeaderElector; +import io.kubernetes.client.leaderelection.resourcelock.ConfigMapLock; +import io.kubernetes.client.leaderelection.resourcelock.ResourceLockConfig; +import io.kubernetes.client.util.Config; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class LeaderElectionTest { + + @Test + public void leaderelection() throws IOException { + final String name = "my-cm-lock"; + final String namespace = "kube-system"; + + ResourceLockConfig lockConfig = new ResourceLockConfig(); + lockConfig.setIdentity(String.format("%s-%s", name, System.currentTimeMillis())); + ConfigMapLock lock = new ConfigMapLock(name, namespace, Config.defaultClient(), lockConfig); + + LeaderCallbacks callbacks = new LeaderCallbacks() { + @Override + public void onStartedLeading() { + System.out.println("onStartedLeading"); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(86400)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void onStopLeading() { + System.out.println("onStopLeading"); + } + + @Override + public void onNewLeader(String identity) { + System.out.println("onNewLeader identity: " + identity); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(86400)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + LeaderElectionConfig electionConfig = new LeaderElectionConfig( + lock, + TimeUnit.SECONDS.toMillis(30), + TimeUnit.SECONDS.toMillis(15), + TimeUnit.SECONDS.toMillis(5), + callbacks, + true, + name); + LeaderElector leaderElector = new LeaderElector(electionConfig); + try { + leaderElector.run(); + } finally { + leaderElector.cancel(); + } + } +}