7
7
"strings"
8
8
9
9
csi "github.com/container-storage-interface/spec/lib/go/csi"
10
+ fsnotify "github.com/fsnotify/fsnotify"
10
11
11
12
"k8s.io/klog/v2"
12
13
@@ -25,7 +26,7 @@ var raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName
25
26
26
27
func setupCaching (devicePath string , req * csi.NodeStageVolumeRequest , nodeId string ) (string , error ) {
27
28
volumeId := req .GetVolumeId ()
28
- volumeGroupName := getVolumeGroupName (nodeId )
29
+ volumeGroupName := GetVolumeGroupName (nodeId )
29
30
mainDevicePath := "/dev/" + volumeGroupName + "/" + getLvName (mainLvSuffix , volumeId )
30
31
mainLvName := getLvName (mainLvSuffix , volumeId )
31
32
klog .V (2 ).Infof ("Volume group available on node %v " , volumeGroupName )
@@ -37,12 +38,12 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
37
38
infoString := strings .TrimSpace (string (info ))
38
39
raidedLocalSsdPath = raidedLssdPrefix + infoString
39
40
40
- vgExists := checkVgExists (volumeGroupName )
41
+ vgExists := CheckVgExists (volumeGroupName )
41
42
if vgExists {
42
43
// Clean up Volume Group before adding the PD
43
- reduceVolumeGroup (volumeGroupName , true )
44
+ ReduceVolumeGroup (volumeGroupName , true )
44
45
} else {
45
- err := createVg (volumeGroupName , devicePath , raidedLocalSsdPath )
46
+ err := CreateVg (volumeGroupName , raidedLocalSsdPath )
46
47
if err != nil {
47
48
return mainDevicePath , err
48
49
}
@@ -75,7 +76,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
75
76
klog .Errorf ("Errored while deactivating VG %v: err: %v: %s" , vgNameForPv , err , info )
76
77
}
77
78
// CLean up volume group to remove any dangling PV refrences
78
- reduceVolumeGroup (vgNameForPv , false )
79
+ ReduceVolumeGroup (vgNameForPv , false )
79
80
_ , isCached := isCachingSetup (mainLvName )
80
81
// We will continue to uncache even if it errors to check caching as it is not a terminal issue.
81
82
if isCached {
@@ -91,7 +92,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
91
92
return "" , fmt .Errorf ("errored while uncaching main LV. %v: %s" , err , info )
92
93
}
93
94
// CLean up volume group to remove any dangling PV refrences
94
- reduceVolumeGroup (vgNameForPv , false )
95
+ ReduceVolumeGroup (vgNameForPv , false )
95
96
}
96
97
info , err = common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "vgmerge" , []string {volumeGroupName , vgNameForPv }... )
97
98
if err != nil {
@@ -189,7 +190,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
189
190
return mainDevicePath , nil
190
191
}
191
192
192
- func checkVgExists (volumeGroupName string ) bool {
193
+ func CheckVgExists (volumeGroupName string ) bool {
193
194
args := []string {}
194
195
info , err := common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "vgscan" , args ... )
195
196
if err != nil {
@@ -202,8 +203,8 @@ func checkVgExists(volumeGroupName string) bool {
202
203
203
204
func cleanupCache (volumeId string , nodeId string ) error {
204
205
205
- volumeGroupName := getVolumeGroupName (nodeId )
206
- if ! checkVgExists (volumeGroupName ) {
206
+ volumeGroupName := GetVolumeGroupName (nodeId )
207
+ if ! CheckVgExists (volumeGroupName ) {
207
208
// If volume group doesn't exist then there's nothing to uncache
208
209
return nil
209
210
}
@@ -228,7 +229,7 @@ func cleanupCache(volumeId string, nodeId string) error {
228
229
return nil
229
230
}
230
231
231
- func getVolumeGroupName (nodePath string ) string {
232
+ func GetVolumeGroupName (nodePath string ) string {
232
233
nodeSlice := strings .Split (nodePath , "/" )
233
234
nodeId := nodeSlice [len (nodeSlice )- 1 ]
234
235
nodeHash := common .ShortString (nodeId )
@@ -241,7 +242,7 @@ func getLvName(suffix string, volumeId string) string {
241
242
return fmt .Sprintf ("%s-%s" , suffix , pvcName )
242
243
}
243
244
244
- func createVg (volumeGroupName string , devicePath string , raidedLocalSsds string ) error {
245
+ func CreateVg (volumeGroupName string , raidedLocalSsds string ) error {
245
246
args := []string {
246
247
"--zero" ,
247
248
"y" ,
@@ -263,7 +264,7 @@ func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string)
263
264
return nil
264
265
}
265
266
266
- func reduceVolumeGroup (volumeGroupName string , force bool ) {
267
+ func ReduceVolumeGroup (volumeGroupName string , force bool ) {
267
268
args := []string {
268
269
"--removemissing" ,
269
270
volumeGroupName ,
@@ -366,3 +367,65 @@ func isCachingSetup(mainLvName string) (error, bool) {
366
367
}
367
368
return nil , false
368
369
}
370
+
371
+ func StartWatcher (nodeName string ) {
372
+ dirToWatch := "/dev/"
373
+ watcher , err := fsnotify .NewWatcher ()
374
+ if err != nil {
375
+ klog .V (2 ).ErrorS (err , "errored while creating watcher" )
376
+ }
377
+ defer watcher .Close ()
378
+
379
+ // out of the box fsnotify can watch a single file, or a single directory
380
+ if err := watcher .Add (dirToWatch ); err != nil {
381
+ klog .V (2 ).ErrorS (err , "errored while adding watcher directory" )
382
+ }
383
+ errorCh := make (chan error , 1 )
384
+ // Handle the error received from the watcher goroutine
385
+ go watchDiskDetaches (watcher , nodeName , errorCh )
386
+
387
+ select {
388
+ case err := <- errorCh :
389
+ klog .Errorf ("watcher encountered an error: %v" , err )
390
+ }
391
+ }
392
+
393
+ func watchDiskDetaches (watcher * fsnotify.Watcher , nodeName string , errorCh chan error ) error {
394
+ for {
395
+ select {
396
+ // watch for errors
397
+ case err := <- watcher .Errors :
398
+ errorCh <- fmt .Errorf ("disk update event errored: %v" , err )
399
+ // watch for events
400
+ case event := <- watcher .Events :
401
+ // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata.
402
+ // This might include some non-LVM changes, no harm in updating metadata multiple times.
403
+ ReduceVolumeGroup (GetVolumeGroupName (nodeName ), true )
404
+ klog .V (2 ).Infof ("disk attach/detach event %#v\n " , event )
405
+ }
406
+ }
407
+ }
408
+
409
+ func ValidateRaidedLSSDinVG (vgName string ) error {
410
+ args := []string {
411
+ "--noheadings" ,
412
+ "-o" ,
413
+ "pv_name" ,
414
+ "--select" ,
415
+ "vg_name=" + vgName ,
416
+ }
417
+ info , err := common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "pvs" , args ... )
418
+ if err != nil {
419
+ return fmt .Errorf ("errored while checking physical volume details %v: %s" , err , info )
420
+ // On error info contains the error message which we cannot use for further steps
421
+ }
422
+
423
+ klog .V (2 ).Infof ("Got PVs %v in VG %v" , strings .TrimSpace (string (info )), vgName )
424
+ if ! strings .Contains (string (info ), "/dev/md127" ) {
425
+ info , err := common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "vgextend" , []string {vgName , "/dev/md127" }... )
426
+ if err != nil {
427
+ klog .Errorf ("errored while extending VGs %v: %s" , err , info )
428
+ }
429
+ }
430
+ return nil
431
+ }
0 commit comments