|
9 | 9 | "strings"
|
10 | 10 |
|
11 | 11 | csi "github.com/container-storage-interface/spec/lib/go/csi"
|
| 12 | + fsnotify "github.com/fsnotify/fsnotify" |
12 | 13 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
13 | 14 | "k8s.io/client-go/kubernetes"
|
14 | 15 | "k8s.io/client-go/rest"
|
@@ -66,7 +67,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
|
66 | 67 | // Clean up Volume Group before adding the PD
|
67 | 68 | reduceVolumeGroup(volumeGroupName, true)
|
68 | 69 | } else {
|
69 |
| - err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath) |
| 70 | + err := createVg(volumeGroupName, raidedLocalSsdPath) |
70 | 71 | if err != nil {
|
71 | 72 | return mainDevicePath, err
|
72 | 73 | }
|
@@ -430,7 +431,7 @@ func getLvName(suffix string, volumeId string) string {
|
430 | 431 | return fmt.Sprintf("%s-%s", suffix, pvcName)
|
431 | 432 | }
|
432 | 433 |
|
433 |
| -func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error { |
| 434 | +func createVg(volumeGroupName string, raidedLocalSsds string) error { |
434 | 435 | args := []string{
|
435 | 436 | "--zero",
|
436 | 437 | "y",
|
@@ -547,3 +548,97 @@ func fetchChunkSizeKiB(cacheSize string) (string, error) {
|
547 | 548 | // default chunk size unit KiB
|
548 | 549 | return strconv.FormatInt(int64(chunkSize), 10) + "KiB", nil
|
549 | 550 | }
|
| 551 | + |
| 552 | +func InitializeDataCacheNode(nodeId string) error { |
| 553 | + raidedLocalSsdPath, err := fetchRAIDedLocalSsdPath() |
| 554 | + if err != nil { |
| 555 | + return err |
| 556 | + } |
| 557 | + volumeGroupName := getVolumeGroupName(nodeId) |
| 558 | + |
| 559 | + vgExists := checkVgExists(volumeGroupName) |
| 560 | + // Check if the required volume group already exists |
| 561 | + if vgExists { |
| 562 | + // Clean up Volume Group before adding the PD |
| 563 | + reduceVolumeGroup(volumeGroupName, true) |
| 564 | + |
| 565 | + // validate that raidedLSSD is part of VG |
| 566 | + err = validateRaidedLSSDinVG(volumeGroupName, raidedLocalSsdPath) |
| 567 | + if err != nil { |
| 568 | + return fmt.Errorf("failed validate local ssd in vg %v: %v", volumeGroupName, err) |
| 569 | + } |
| 570 | + } else { |
| 571 | + err := createVg(volumeGroupName, raidedLocalSsdPath) |
| 572 | + if err != nil { |
| 573 | + return err |
| 574 | + } |
| 575 | + } |
| 576 | + return nil |
| 577 | +} |
| 578 | + |
| 579 | +func StartWatcher(nodeName string) { |
| 580 | + dirToWatch := "/dev/" |
| 581 | + watcher, err := fsnotify.NewWatcher() |
| 582 | + if err != nil { |
| 583 | + klog.V(2).ErrorS(err, "errored while creating watcher") |
| 584 | + } |
| 585 | + klog.V(2).Infof("Watcher started for directory %v", dirToWatch) |
| 586 | + defer watcher.Close() |
| 587 | + |
| 588 | + // out of the box fsnotify can watch a single file, or a single directory |
| 589 | + if err := watcher.Add(dirToWatch); err != nil { |
| 590 | + klog.V(2).ErrorS(err, "errored while adding watcher directory") |
| 591 | + } |
| 592 | + errorCh := make(chan error, 1) |
| 593 | + // Handle the error received from the watcher goroutine |
| 594 | + go watchDiskDetaches(watcher, nodeName, errorCh) |
| 595 | + |
| 596 | + select { |
| 597 | + case err := <-errorCh: |
| 598 | + klog.Errorf("watcher encountered an error: %v", err) |
| 599 | + } |
| 600 | +} |
| 601 | + |
| 602 | +func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { |
| 603 | + for { |
| 604 | + select { |
| 605 | + // watch for errors |
| 606 | + case err := <-watcher.Errors: |
| 607 | + errorCh <- fmt.Errorf("disk update event errored: %v", err) |
| 608 | + // watch for events |
| 609 | + case event := <-watcher.Events: |
| 610 | + // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata. |
| 611 | + // This might include some non-LVM changes, no harm in updating metadata multiple times. |
| 612 | + reduceVolumeGroup(getVolumeGroupName(nodeName), true) |
| 613 | + klog.V(2).Infof("disk attach/detach event %#v\n", event) |
| 614 | + } |
| 615 | + } |
| 616 | +} |
| 617 | + |
| 618 | +func validateRaidedLSSDinVG(vgName string, lssdPath string) error { |
| 619 | + args := []string{ |
| 620 | + "--noheadings", |
| 621 | + "-o", |
| 622 | + "pv_name", |
| 623 | + "--select", |
| 624 | + "vg_name=" + vgName, |
| 625 | + } |
| 626 | + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "pvs", args...) |
| 627 | + if err != nil { |
| 628 | + return fmt.Errorf("errored while checking physical volume details %v: %s", err, info) |
| 629 | + // On error info contains the error message which we cannot use for further steps |
| 630 | + } |
| 631 | + |
| 632 | + if !strings.Contains(string(info), lssdPath) { |
| 633 | + return addRaidedLSSDToVg(vgName, lssdPath) |
| 634 | + } |
| 635 | + return nil |
| 636 | +} |
| 637 | + |
| 638 | +func addRaidedLSSDToVg(vgName, lssdPath string) error { |
| 639 | + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgextend", []string{vgName, lssdPath}...) |
| 640 | + if err != nil { |
| 641 | + return fmt.Errorf("errored while extending VGs %v: %s", err, info) |
| 642 | + } |
| 643 | + return nil |
| 644 | +} |
0 commit comments