Skip to content

Commit 7486c62

Browse files
antoine-gaillardFricounet
authored andcommitted
[local] Add startup taint removal logic
1 parent d643e5c commit 7486c62

File tree

245 files changed

+28690
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

245 files changed

+28690
-3
lines changed

cmd/gce-pd-csi-driver/main.go

+11
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
4949
grpcLogCharCap = flag.Int("grpc-log-char-cap", 10000, "The maximum amount of characters logged for every grpc responses")
5050
enableOtelTracing = flag.Bool("enable-otel-tracing", false, "If set, enable opentelemetry tracing for the driver. The tracing is disabled by default. Configure the exporter endpoint with OTEL_EXPORTER_OTLP_ENDPOINT and other env variables, see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration.")
51+
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster")
5152

5253
errorBackoffInitialDurationMs = flag.Int("backoff-initial-duration-ms", 200, "The amount of ms for the initial duration of the backoff condition for controller publish/unpublish CSI operations. Default is 200.")
5354
errorBackoffMaxDurationMs = flag.Int("backoff-max-duration-ms", 300000, "The amount of ms for the max duration of the backoff condition for controller publish/unpublish CSI operations. Default is 300000 (5m).")
@@ -89,6 +90,8 @@ var (
8990

9091
extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '<parent_id1>/<tag_key1>/<tag_value1>,...,<parent_idN>/<tag_keyN>/<tag_valueN>'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details")
9192

93+
removeNotReadyTaint = flag.Bool("remove-not-ready-taint", true, "remove NotReady taint from node when node is ready")
94+
9295
version string
9396
)
9497

@@ -244,6 +247,14 @@ func handle() {
244247
if *maxConcurrentFormatAndMount > 0 {
245248
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
246249
}
250+
251+
if *removeNotReadyTaint {
252+
client, err := common.GetKubeClient(*kubeconfig)
253+
if err != nil {
254+
klog.Errorf("Failed to get kube client: %v. Will not remove startup taint.", err.Error())
255+
}
256+
nodeServer.RemoveStartupTaint(client)
257+
}
247258
}
248259

249260
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ require (
5656
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5757
github.com/davecgh/go-spew v1.1.1 // indirect
5858
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
59+
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
5960
github.com/felixge/httpsnoop v1.0.4 // indirect
6061
github.com/fsnotify/fsnotify v1.5.4 // indirect
6162
github.com/go-logr/logr v1.4.2 // indirect

go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,7 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
10081008
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
10091009
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
10101010
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
1011+
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
10111012
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
10121013
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4=
10131014
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=

pkg/common/constants.go

+2
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ const (
3232

3333
// Label that is set on a disk when it is used by a 'multi-zone' VolumeHandle
3434
MultiZoneLabel = "goog-gke-multi-zone"
35+
36+
PDCSIDriverName = "pd.csi.storage.gke.io"
3537
)

pkg/common/utils.go

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
clientset "k8s.io/client-go/kubernetes"
24+
"k8s.io/client-go/tools/clientcmd"
2325
"net/http"
2426
"regexp"
2527
"slices"
@@ -696,3 +698,12 @@ func NewLimiter(limit, burst int, emptyBucket bool) *rate.Limiter {
696698

697699
return limiter
698700
}
701+
702+
func GetKubeClient(kubeconfig string) (clientset.Interface, error) {
703+
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
704+
if err != nil {
705+
return nil, err
706+
}
707+
708+
return clientset.NewForConfig(config)
709+
}

pkg/gce-pd-csi-driver/node.go

+134-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ package gceGCEDriver
1616

1717
import (
1818
"context"
19+
"encoding/json"
1920
"errors"
2021
"fmt"
22+
"k8s.io/apimachinery/pkg/util/wait"
23+
"k8s.io/client-go/kubernetes"
2124
"os"
2225
"path/filepath"
2326
"regexp"
@@ -33,6 +36,9 @@ import (
3336
"k8s.io/klog/v2"
3437
"k8s.io/mount-utils"
3538

39+
corev1 "k8s.io/api/core/v1"
40+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
41+
k8stypes "k8s.io/apimachinery/pkg/types"
3642
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3743
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3844
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
@@ -81,10 +87,21 @@ const (
8187
fsTypeExt3 = "ext3"
8288

8389
readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$"
90+
// agentNotReadyNodeTaintKeySuffix contains the suffix of the key of taints to be removed on driver startup
91+
agentNotReadyNodeTaintKeySuffix = "/agent-not-ready"
8492
)
8593

8694
var (
8795
readAheadKBMountFlagRegex = regexp.MustCompile(readAheadKBMountFlagRegexPattern)
96+
97+
// taintRemovalInitialDelay is the initial delay for node taint removal
98+
taintRemovalInitialDelay = 1 * time.Second
99+
// taintRemovalBackoff is the exponential backoff configuration for node taint removal
100+
taintRemovalBackoff = wait.Backoff{
101+
Duration: 500 * time.Millisecond,
102+
Factor: 2,
103+
Steps: 10, // Max delay = 0.5 * 2^9 = ~4 minutes
104+
}
88105
)
89106

90107
func getDefaultFsType() string {
@@ -502,7 +519,7 @@ func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRe
502519
Segments: map[string]string{common.TopologyKeyZone: ns.MetadataService.GetZone()},
503520
}
504521

505-
nodeID := common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName())
522+
nodeID := ns.getNodeID()
506523

507524
volumeLimits, err := ns.GetVolumeLimits()
508525

@@ -514,6 +531,10 @@ func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRe
514531
return resp, err
515532
}
516533

534+
func (ns *GCENodeServer) getNodeID() string {
535+
return common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName())
536+
}
537+
517538
func (ns *GCENodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
518539
if len(req.VolumeId) == 0 {
519540
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
@@ -673,3 +694,115 @@ func (ns *GCENodeServer) GetVolumeLimits() (int64, error) {
673694
}
674695
return volumeLimitBig, nil
675696
}
697+
698+
func (ns *GCENodeServer) RemoveStartupTaint(kubeClient kubernetes.Interface) {
699+
// Remove taint from node to indicate driver startup success
700+
// This is done at the last possible moment to prevent race conditions or false positive removals
701+
time.AfterFunc(taintRemovalInitialDelay, func() {
702+
removeTaintInBackground(kubeClient, taintRemovalBackoff, removeNotReadyTaint)
703+
})
704+
}
705+
706+
// Struct for JSON patch operations
707+
type JSONPatch struct {
708+
OP string `json:"op,omitempty"`
709+
Path string `json:"path,omitempty"`
710+
Value interface{} `json:"value"`
711+
}
712+
713+
// removeTaintInBackground is a goroutine that retries removeNotReadyTaint with exponential backoff
714+
func removeTaintInBackground(k8sClient kubernetes.Interface, backoff wait.Backoff, removalFunc func(kubernetes.Interface) error) {
715+
backoffErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
716+
err := removalFunc(k8sClient)
717+
if err != nil {
718+
klog.ErrorS(err, "Unexpected failure when attempting to remove node taint(s)")
719+
return false, nil
720+
}
721+
return true, nil
722+
})
723+
724+
if backoffErr != nil {
725+
klog.ErrorS(backoffErr, "Retries exhausted, giving up attempting to remove node taint(s)")
726+
}
727+
}
728+
729+
// removeNotReadyTaint removes the taint pd.csi.storage.gke.io/agent-not-ready from the local node
730+
// This taint can be optionally applied by users to prevent startup race conditions such as
731+
// https://github.com/kubernetes/kubernetes/issues/95911
732+
func removeNotReadyTaint(clientset kubernetes.Interface) error {
733+
nodeName := os.Getenv("CSI_NODE_NAME")
734+
if nodeName == "" {
735+
klog.V(4).InfoS("CSI_NODE_NAME missing, skipping taint removal")
736+
return nil
737+
}
738+
739+
agentNotReadyNodeTaintKey := fmt.Sprintf("%s%s", common.PDCSIDriverName, agentNotReadyNodeTaintKeySuffix)
740+
741+
node, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
742+
if err != nil {
743+
return err
744+
}
745+
746+
err = checkAllocatable(clientset, nodeName)
747+
if err != nil {
748+
return err
749+
}
750+
751+
var taintsToKeep []corev1.Taint
752+
for _, taint := range node.Spec.Taints {
753+
if taint.Key != agentNotReadyNodeTaintKey {
754+
taintsToKeep = append(taintsToKeep, taint)
755+
} else {
756+
klog.V(4).InfoS("Queued taint for removal", "key", taint.Key, "effect", taint.Effect)
757+
}
758+
}
759+
760+
if len(taintsToKeep) == len(node.Spec.Taints) {
761+
klog.V(4).InfoS("No taints to remove on node, skipping taint removal")
762+
return nil
763+
}
764+
765+
patchRemoveTaints := []JSONPatch{
766+
{
767+
OP: "test",
768+
Path: "/spec/taints",
769+
Value: node.Spec.Taints,
770+
},
771+
{
772+
OP: "replace",
773+
Path: "/spec/taints",
774+
Value: taintsToKeep,
775+
},
776+
}
777+
778+
patch, err := json.Marshal(patchRemoveTaints)
779+
if err != nil {
780+
return err
781+
}
782+
783+
_, err = clientset.CoreV1().Nodes().Patch(context.Background(), nodeName, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
784+
if err != nil {
785+
return err
786+
}
787+
klog.InfoS("Removed taint(s) from local node", "node", nodeName)
788+
return nil
789+
}
790+
791+
func checkAllocatable(clientset kubernetes.Interface, nodeName string) error {
792+
csiNode, err := clientset.StorageV1().CSINodes().Get(context.Background(), nodeName, metav1.GetOptions{})
793+
if err != nil {
794+
return fmt.Errorf("isAllocatableSet: failed to get CSINode for %s: %w", nodeName, err)
795+
}
796+
797+
for _, driver := range csiNode.Spec.Drivers {
798+
if driver.Name == common.PDCSIDriverName {
799+
if driver.Allocatable != nil && driver.Allocatable.Count != nil {
800+
klog.InfoS("CSINode Allocatable value is set", "nodeName", nodeName, "count", *driver.Allocatable.Count)
801+
return nil
802+
}
803+
return fmt.Errorf("isAllocatableSet: allocatable value not set for driver on node %s", nodeName)
804+
}
805+
}
806+
807+
return fmt.Errorf("isAllocatableSet: driver not found on node %s", nodeName)
808+
}

0 commit comments

Comments
 (0)