|
8 | 8 | "strings"
|
9 | 9 |
|
10 | 10 | csi "github.com/container-storage-interface/spec/lib/go/csi"
|
| 11 | + fsnotify "github.com/fsnotify/fsnotify" |
11 | 12 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
12 | 13 | "k8s.io/client-go/kubernetes"
|
13 | 14 | "k8s.io/client-go/rest"
|
@@ -57,7 +58,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
|
57 | 58 | // Clean up Volume Group before adding the PD
|
58 | 59 | reduceVolumeGroup(volumeGroupName, true)
|
59 | 60 | } else {
|
60 |
| - err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath) |
| 61 | + err := createVg(volumeGroupName, raidedLocalSsdPath) |
61 | 62 | if err != nil {
|
62 | 63 | return mainDevicePath, err
|
63 | 64 | }
|
@@ -395,7 +396,7 @@ func getLvName(suffix string, volumeId string) string {
|
395 | 396 | return fmt.Sprintf("%s-%s", suffix, pvcName)
|
396 | 397 | }
|
397 | 398 |
|
398 |
| -func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error { |
| 399 | +func createVg(volumeGroupName string, raidedLocalSsds string) error { |
399 | 400 | args := []string{
|
400 | 401 | "--zero",
|
401 | 402 | "y",
|
@@ -497,3 +498,92 @@ func isCachingSetup(mainLvName string) (error, bool) {
|
497 | 498 | }
|
498 | 499 | return nil, false
|
499 | 500 | }
|
| 501 | + |
| 502 | +func InitializeDataCacheNode(nodeId string) error { |
| 503 | + raidedLocalSsdPath, err := fetchRAIDedLocalSsdPath() |
| 504 | + if err != nil { |
| 505 | + return err |
| 506 | + } |
| 507 | + volumeGroupName := getVolumeGroupName(nodeId) |
| 508 | + |
| 509 | + vgExists := checkVgExists(volumeGroupName) |
| 510 | + // Check if the required volume group already exists |
| 511 | + if vgExists { |
| 512 | + // Clean up Volume Group before adding the PD |
| 513 | + reduceVolumeGroup(volumeGroupName, true) |
| 514 | + |
| 515 | + // validate that raidedLSSD is part of VG |
| 516 | + err = validateRaidedLSSDinVG(volumeGroupName, raidedLocalSsdPath) |
| 517 | + if err != nil { |
| 518 | + return fmt.Errorf("failed validate local ssd in vg %v: %v", volumeGroupName, err) |
| 519 | + } |
| 520 | + } else { |
| 521 | + err := createVg(volumeGroupName, raidedLocalSsdPath) |
| 522 | + if err != nil { |
| 523 | + return err |
| 524 | + } |
| 525 | + } |
| 526 | + return nil |
| 527 | +} |
| 528 | + |
| 529 | +func StartWatcher(nodeName string) { |
| 530 | + dirToWatch := "/dev/" |
| 531 | + watcher, err := fsnotify.NewWatcher() |
| 532 | + if err != nil { |
| 533 | + klog.V(2).ErrorS(err, "errored while creating watcher") |
| 534 | + } |
| 535 | + klog.V(2).Infof("Watcher started for directory %v", dirToWatch) |
| 536 | + defer watcher.Close() |
| 537 | + |
| 538 | + // out of the box fsnotify can watch a single file, or a single directory |
| 539 | + if err := watcher.Add(dirToWatch); err != nil { |
| 540 | + klog.V(2).ErrorS(err, "errored while adding watcher directory") |
| 541 | + } |
| 542 | + errorCh := make(chan error, 1) |
| 543 | + // Handle the error received from the watcher goroutine |
| 544 | + go watchDiskDetaches(watcher, nodeName, errorCh) |
| 545 | + |
| 546 | + select { |
| 547 | + case err := <-errorCh: |
| 548 | + klog.Errorf("watcher encountered an error: %v", err) |
| 549 | + } |
| 550 | +} |
| 551 | + |
| 552 | +func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { |
| 553 | + for { |
| 554 | + select { |
| 555 | + // watch for errors |
| 556 | + case err := <-watcher.Errors: |
| 557 | + errorCh <- fmt.Errorf("disk update event errored: %v", err) |
| 558 | + // watch for events |
| 559 | + case event := <-watcher.Events: |
| 560 | + // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata. |
| 561 | + // This might include some non-LVM changes, no harm in updating metadata multiple times. |
| 562 | + reduceVolumeGroup(getVolumeGroupName(nodeName), true) |
| 563 | + klog.V(2).Infof("disk attach/detach event %#v\n", event) |
| 564 | + } |
| 565 | + } |
| 566 | +} |
| 567 | + |
| 568 | +func validateRaidedLSSDinVG(vgName string, lssdPath string) error { |
| 569 | + args := []string{ |
| 570 | + "--noheadings", |
| 571 | + "-o", |
| 572 | + "pv_name", |
| 573 | + "--select", |
| 574 | + "vg_name=" + vgName, |
| 575 | + } |
| 576 | + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "pvs", args...) |
| 577 | + if err != nil { |
| 578 | + return fmt.Errorf("errored while checking physical volume details %v: %s", err, info) |
| 579 | + // On error info contains the error message which we cannot use for further steps |
| 580 | + } |
| 581 | + |
| 582 | + if !strings.Contains(string(info), lssdPath) { |
| 583 | + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgextend", []string{vgName, lssdPath}...) |
| 584 | + if err != nil { |
| 585 | + klog.Errorf("errored while extending VGs %v: %s", err, info) |
| 586 | + } |
| 587 | + } |
| 588 | + return nil |
| 589 | +} |
0 commit comments