Skip to content

Implementing watcher & reboot stability for data cache to master branch. #1946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,14 @@ func handle() {
if *maxConcurrentFormatAndMount > 0 {
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
}
}

if *enableDataCacheFlag {
if nodeName == nil || *nodeName == "" {
klog.Errorf("Data Cache enabled, but --node-name not passed")
}
if err := setupDataCache(ctx, *nodeName); err != nil {
klog.Errorf("Data Cache setup failed: %v", err)
if *enableDataCacheFlag {
if nodeName == nil || *nodeName == "" {
klog.Errorf("Data Cache enabled, but --node-name not passed")
}
if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil {
klog.Errorf("DataCache setup failed: %v", err)
}
go driver.StartWatcher(*nodeName)
}
}

Expand Down Expand Up @@ -385,7 +385,7 @@ func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
return availableLssds, nil
}

func setupDataCache(ctx context.Context, nodeName string) error {
func setupDataCache(ctx context.Context, nodeName string, nodeId string) error {
isAlreadyRaided, err := driver.IsRaided()
if err != nil {
klog.V(4).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
Expand Down Expand Up @@ -415,6 +415,11 @@ func setupDataCache(ctx context.Context, nodeName string) error {
return fmt.Errorf("Failed to Raid local SSDs, unable to setup Data Cache, got error %v", err)
}

// Initializing data cache node (VG checks w/ raided lssd)
if err := driver.InitializeDataCacheNode(nodeId); err != nil {
return err
}

klog.V(4).Infof("LSSD caching is setup for the Data Cache enabled node %s", nodeName)
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this updated manually or it is updated via go mod vendor and go mod tidy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was from "go get github.com/fsnotify/fsnotify"

github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fsouza/fake-gcs-server v0.0.0-20180612165233-e85be23bdaa8/go.mod h1:1/HufuJ+eaDf4KTnYdS6HJMGvMRU8d4cYTuu/1QaBbI=
github.com/fsouza/fake-gcs-server v1.19.4/go.mod h1:I0/88nHCASqJJ5M7zVF0zKODkYTcuXFW5J5yajsNJnE=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
Expand Down
99 changes: 97 additions & 2 deletions pkg/gce-pd-csi-driver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

csi "github.com/container-storage-interface/spec/lib/go/csi"
fsnotify "github.com/fsnotify/fsnotify"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -66,7 +67,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
// Clean up Volume Group before adding the PD
reduceVolumeGroup(volumeGroupName, true)
} else {
err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath)
err := createVg(volumeGroupName, raidedLocalSsdPath)
if err != nil {
return mainDevicePath, err
}
Expand Down Expand Up @@ -430,7 +431,7 @@ func getLvName(suffix string, volumeId string) string {
return fmt.Sprintf("%s-%s", suffix, pvcName)
}

func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error {
func createVg(volumeGroupName string, raidedLocalSsds string) error {
args := []string{
"--zero",
"y",
Expand Down Expand Up @@ -547,3 +548,97 @@ func fetchChunkSizeKiB(cacheSize string) (string, error) {
// default chunk size unit KiB
return strconv.FormatInt(int64(chunkSize), 10) + "KiB", nil
}

func InitializeDataCacheNode(nodeId string) error {
raidedLocalSsdPath, err := fetchRAIDedLocalSsdPath()
if err != nil {
return err
}
volumeGroupName := getVolumeGroupName(nodeId)

vgExists := checkVgExists(volumeGroupName)
// Check if the required volume group already exists
if vgExists {
// Clean up Volume Group before adding the PD
reduceVolumeGroup(volumeGroupName, true)

// validate that raidedLSSD is part of VG
err = validateRaidedLSSDinVG(volumeGroupName, raidedLocalSsdPath)
if err != nil {
return fmt.Errorf("failed validate local ssd in vg %v: %v", volumeGroupName, err)
}
} else {
err := createVg(volumeGroupName, raidedLocalSsdPath)
if err != nil {
return err
}
}
return nil
}

func StartWatcher(nodeName string) {
dirToWatch := "/dev/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some logs to indicate the start of a watcher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a log.

watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.V(2).ErrorS(err, "errored while creating watcher")
}
klog.V(2).Infof("Watcher started for directory %v", dirToWatch)
defer watcher.Close()

// out of the box fsnotify can watch a single file, or a single directory
if err := watcher.Add(dirToWatch); err != nil {
klog.V(2).ErrorS(err, "errored while adding watcher directory")
}
errorCh := make(chan error, 1)
// Handle the error received from the watcher goroutine
go watchDiskDetaches(watcher, nodeName, errorCh)

select {
case err := <-errorCh:
klog.Errorf("watcher encountered an error: %v", err)
}
}

func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
for {
select {
// watch for errors
case err := <-watcher.Errors:
errorCh <- fmt.Errorf("disk update event errored: %v", err)
// watch for events
case event := <-watcher.Events:
// In case of an event i.e. creation or deletion of any new PV, we update the VG metadata.
// This might include some non-LVM changes, no harm in updating metadata multiple times.
reduceVolumeGroup(getVolumeGroupName(nodeName), true)
klog.V(2).Infof("disk attach/detach event %#v\n", event)
}
}
}

func validateRaidedLSSDinVG(vgName string, lssdPath string) error {
args := []string{
"--noheadings",
"-o",
"pv_name",
"--select",
"vg_name=" + vgName,
}
info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "pvs", args...)
if err != nil {
return fmt.Errorf("errored while checking physical volume details %v: %s", err, info)
// On error info contains the error message which we cannot use for further steps
}

if !strings.Contains(string(info), lssdPath) {
return addRaidedLSSDToVg(vgName, lssdPath)
}
return nil
}

func addRaidedLSSDToVg(vgName, lssdPath string) error {
info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgextend", []string{vgName, lssdPath}...)
if err != nil {
return fmt.Errorf("errored while extending VGs %v: %s", err, info)
}
return nil
}
14 changes: 14 additions & 0 deletions vendor/github.com/fsnotify/fsnotify/.cirrus.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/fsnotify/fsnotify/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/fsnotify/fsnotify/.mailmap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading