|
7 | 7 | "strings"
|
8 | 8 |
|
9 | 9 | csi "github.com/container-storage-interface/spec/lib/go/csi"
|
| 10 | + fsnotify "github.com/fsnotify/fsnotify" |
10 | 11 |
|
11 | 12 | "k8s.io/klog/v2"
|
12 | 13 |
|
@@ -42,7 +43,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
|
42 | 43 | // Clean up Volume Group before adding the PD
|
43 | 44 | reduceVolumeGroup(volumeGroupName, true)
|
44 | 45 | } else {
|
45 |
| - err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath) |
| 46 | + err := createVg(volumeGroupName, raidedLocalSsdPath) |
46 | 47 | if err != nil {
|
47 | 48 | return mainDevicePath, err
|
48 | 49 | }
|
@@ -241,7 +242,7 @@ func getLvName(suffix string, volumeId string) string {
|
241 | 242 | return fmt.Sprintf("%s-%s", suffix, pvcName)
|
242 | 243 | }
|
243 | 244 |
|
244 |
| -func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error { |
| 245 | +func createVg(volumeGroupName string, raidedLocalSsds string) error { |
245 | 246 | args := []string{
|
246 | 247 | "--zero",
|
247 | 248 | "y",
|
@@ -366,3 +367,94 @@ func isCachingSetup(mainLvName string) (error, bool) {
|
366 | 367 | }
|
367 | 368 | return nil, false
|
368 | 369 | }
|
| 370 | + |
| 371 | +func InitializeDataCacheNode(nodeId string) error { |
| 372 | + // vgcreate with the raided local ssds. |
| 373 | + info, err := common.RunCommand("grep", raidedLocalSsdName, "ls", raidedLssdPrefix) |
| 374 | + if err != nil { |
| 375 | + return fmt.Errorf("failed while listing raided devices, err: %v, output: %v", err, info) |
| 376 | + } |
| 377 | + infoString := strings.TrimSpace(string(info)) |
| 378 | + raidedLocalSsdPath = raidedLssdPrefix + infoString |
| 379 | + volumeGroupName := getVolumeGroupName(nodeId) |
| 380 | + |
| 381 | + vgExists := checkVgExists(volumeGroupName) |
| 382 | + // Check if the required volume group already exists |
| 383 | + if vgExists { |
| 384 | + // Clean up Volume Group before adding the PD |
| 385 | + reduceVolumeGroup(volumeGroupName, true) |
| 386 | + |
| 387 | + // validate that raidedLSSD is part of VG |
| 388 | + err = validateRaidedLSSDinVG(volumeGroupName) |
| 389 | + if err != nil { |
| 390 | + return fmt.Errorf("failed validate local ssd in vg %v: %v", volumeGroupName, err) |
| 391 | + } |
| 392 | + } else { |
| 393 | + err := createVg(volumeGroupName, raidedLocalSsdPath) |
| 394 | + if err != nil { |
| 395 | + return err |
| 396 | + } |
| 397 | + } |
| 398 | + return nil |
| 399 | +} |
| 400 | + |
| 401 | +func StartWatcher(nodeName string) { |
| 402 | + dirToWatch := "/dev/" |
| 403 | + watcher, err := fsnotify.NewWatcher() |
| 404 | + if err != nil { |
| 405 | + klog.V(2).ErrorS(err, "errored while creating watcher") |
| 406 | + } |
| 407 | + defer watcher.Close() |
| 408 | + |
| 409 | + // out of the box fsnotify can watch a single file, or a single directory |
| 410 | + if err := watcher.Add(dirToWatch); err != nil { |
| 411 | + klog.V(2).ErrorS(err, "errored while adding watcher directory") |
| 412 | + } |
| 413 | + errorCh := make(chan error, 1) |
| 414 | + // Handle the error received from the watcher goroutine |
| 415 | + go watchDiskDetaches(watcher, nodeName, errorCh) |
| 416 | + |
| 417 | + select { |
| 418 | + case err := <-errorCh: |
| 419 | + klog.Errorf("watcher encountered an error: %v", err) |
| 420 | + } |
| 421 | +} |
| 422 | + |
| 423 | +func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { |
| 424 | + for { |
| 425 | + select { |
| 426 | + // watch for errors |
| 427 | + case err := <-watcher.Errors: |
| 428 | + errorCh <- fmt.Errorf("disk update event errored: %v", err) |
| 429 | + // watch for events |
| 430 | + case event := <-watcher.Events: |
| 431 | + // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata. |
| 432 | + // This might include some non-LVM changes, no harm in updating metadata multiple times. |
| 433 | + reduceVolumeGroup(getVolumeGroupName(nodeName), true) |
| 434 | + klog.V(2).Infof("disk attach/detach event %#v\n", event) |
| 435 | + } |
| 436 | + } |
| 437 | +} |
| 438 | + |
| 439 | +func validateRaidedLSSDinVG(vgName string) error { |
| 440 | + args := []string{ |
| 441 | + "--noheadings", |
| 442 | + "-o", |
| 443 | + "pv_name", |
| 444 | + "--select", |
| 445 | + "vg_name=" + vgName, |
| 446 | + } |
| 447 | + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "pvs", args...) |
| 448 | + if err != nil { |
| 449 | + return fmt.Errorf("errored while checking physical volume details %v: %s", err, info) |
| 450 | + // On error info contains the error message which we cannot use for further steps |
| 451 | + } |
| 452 | + |
| 453 | + if !strings.Contains(string(info), "/dev/md127") { |
| 454 | + info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgextend", []string{vgName, "/dev/md127"}...) |
| 455 | + if err != nil { |
| 456 | + klog.Errorf("errored while extending VGs %v: %s", err, info) |
| 457 | + } |
| 458 | + } |
| 459 | + return nil |
| 460 | +} |
0 commit comments