7
7
"strings"
8
8
9
9
csi "github.com/container-storage-interface/spec/lib/go/csi"
10
+ fsnotify "github.com/fsnotify/fsnotify"
10
11
11
12
"k8s.io/klog/v2"
12
13
@@ -25,7 +26,7 @@ var raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName
25
26
26
27
func setupCaching (devicePath string , req * csi.NodeStageVolumeRequest , nodeId string ) (string , error ) {
27
28
volumeId := req .GetVolumeId ()
28
- volumeGroupName := getVolumeGroupName (nodeId )
29
+ volumeGroupName := GetVolumeGroupName (nodeId )
29
30
mainDevicePath := "/dev/" + volumeGroupName + "/" + getLvName (mainLvSuffix , volumeId )
30
31
mainLvName := getLvName (mainLvSuffix , volumeId )
31
32
klog .V (2 ).Infof ("Volume group available on node %v " , volumeGroupName )
@@ -37,17 +38,6 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
37
38
infoString := strings .TrimSpace (string (info ))
38
39
raidedLocalSsdPath = raidedLssdPrefix + infoString
39
40
40
- vgExists := checkVgExists (volumeGroupName )
41
- if vgExists {
42
- // Clean up Volume Group before adding the PD
43
- reduceVolumeGroup (volumeGroupName , true )
44
- } else {
45
- err := createVg (volumeGroupName , devicePath , raidedLocalSsdPath )
46
- if err != nil {
47
- return mainDevicePath , err
48
- }
49
- }
50
-
51
41
// Check if the Physical Volume(PV) is part of some other volume group
52
42
args := []string {
53
43
"--select" ,
@@ -75,7 +65,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
75
65
klog .Errorf ("Errored while deactivating VG %v: err: %v: %s" , vgNameForPv , err , info )
76
66
}
77
67
// CLean up volume group to remove any dangling PV refrences
78
- reduceVolumeGroup (vgNameForPv , false )
68
+ ReduceVolumeGroup (vgNameForPv , false )
79
69
_ , isCached := isCachingSetup (mainLvName )
80
70
// We will continue to uncache even if it errors to check caching as it is not a terminal issue.
81
71
if isCached {
@@ -91,7 +81,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
91
81
return "" , fmt .Errorf ("errored while uncaching main LV. %v: %s" , err , info )
92
82
}
93
83
// CLean up volume group to remove any dangling PV refrences
94
- reduceVolumeGroup (vgNameForPv , false )
84
+ ReduceVolumeGroup (vgNameForPv , false )
95
85
}
96
86
info , err = common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "vgmerge" , []string {volumeGroupName , vgNameForPv }... )
97
87
if err != nil {
@@ -189,7 +179,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
189
179
return mainDevicePath , nil
190
180
}
191
181
192
- func checkVgExists (volumeGroupName string ) bool {
182
+ func CheckVgExists (volumeGroupName string ) bool {
193
183
args := []string {}
194
184
info , err := common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "vgscan" , args ... )
195
185
if err != nil {
@@ -202,8 +192,8 @@ func checkVgExists(volumeGroupName string) bool {
202
192
203
193
func cleanupCache (volumeId string , nodeId string ) error {
204
194
205
- volumeGroupName := getVolumeGroupName (nodeId )
206
- if ! checkVgExists (volumeGroupName ) {
195
+ volumeGroupName := GetVolumeGroupName (nodeId )
196
+ if ! CheckVgExists (volumeGroupName ) {
207
197
// If volume group doesn't exist then there's nothing to uncache
208
198
return nil
209
199
}
@@ -228,7 +218,7 @@ func cleanupCache(volumeId string, nodeId string) error {
228
218
return nil
229
219
}
230
220
231
- func getVolumeGroupName (nodePath string ) string {
221
+ func GetVolumeGroupName (nodePath string ) string {
232
222
nodeSlice := strings .Split (nodePath , "/" )
233
223
nodeId := nodeSlice [len (nodeSlice )- 1 ]
234
224
nodeHash := common .ShortString (nodeId )
@@ -241,7 +231,7 @@ func getLvName(suffix string, volumeId string) string {
241
231
return fmt .Sprintf ("%s-%s" , suffix , pvcName )
242
232
}
243
233
244
- func createVg (volumeGroupName string , devicePath string , raidedLocalSsds string ) error {
234
+ func CreateVg (volumeGroupName string , raidedLocalSsds string ) error {
245
235
args := []string {
246
236
"--zero" ,
247
237
"y" ,
@@ -263,7 +253,7 @@ func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string)
263
253
return nil
264
254
}
265
255
266
- func reduceVolumeGroup (volumeGroupName string , force bool ) {
256
+ func ReduceVolumeGroup (volumeGroupName string , force bool ) {
267
257
args := []string {
268
258
"--removemissing" ,
269
259
volumeGroupName ,
@@ -366,3 +356,65 @@ func isCachingSetup(mainLvName string) (error, bool) {
366
356
}
367
357
return nil , false
368
358
}
359
+
360
+ func StartWatcher (nodeName string ) {
361
+ dirToWatch := "/dev/"
362
+ watcher , err := fsnotify .NewWatcher ()
363
+ if err != nil {
364
+ klog .V (2 ).ErrorS (err , "errored while creating watcher" )
365
+ }
366
+ defer watcher .Close ()
367
+
368
+ // out of the box fsnotify can watch a single file, or a single directory
369
+ if err := watcher .Add (dirToWatch ); err != nil {
370
+ klog .V (2 ).ErrorS (err , "errored while adding watcher directory" )
371
+ }
372
+ errorCh := make (chan error , 1 )
373
+ // Handle the error received from the watcher goroutine
374
+ go watchDiskDetaches (watcher , nodeName , errorCh )
375
+
376
+ select {
377
+ case err := <- errorCh :
378
+ klog .Errorf ("watcher encountered an error: %v" , err )
379
+ }
380
+ }
381
+
382
+ func watchDiskDetaches (watcher * fsnotify.Watcher , nodeName string , errorCh chan error ) error {
383
+ for {
384
+ select {
385
+ // watch for errors
386
+ case err := <- watcher .Errors :
387
+ errorCh <- fmt .Errorf ("disk update event errored: %v" , err )
388
+ // watch for events
389
+ case event := <- watcher .Events :
390
+ // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata.
391
+ // This might include some non-LVM changes, no harm in updating metadata multiple times.
392
+ ReduceVolumeGroup (GetVolumeGroupName (nodeName ), true )
393
+ klog .V (2 ).Infof ("disk attach/detach event %#v\n " , event )
394
+ }
395
+ }
396
+ }
397
+
398
+ func ValidateRaidedLSSDinVG (vgName string ) error {
399
+ args := []string {
400
+ "--noheadings" ,
401
+ "-o" ,
402
+ "pv_name" ,
403
+ "--select" ,
404
+ "vg_name=" + vgName ,
405
+ }
406
+ info , err := common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "pvs" , args ... )
407
+ if err != nil {
408
+ return fmt .Errorf ("errored while checking physical volume details %v: %s" , err , info )
409
+ // On error info contains the error message which we cannot use for further steps
410
+ }
411
+
412
+ klog .V (2 ).Infof ("Got PVs %v in VG %v" , strings .TrimSpace (string (info )), vgName )
413
+ if ! strings .Contains (string (info ), "/dev/md127" ) {
414
+ info , err := common .RunCommand ("" /* pipedCmd */ , "" /* pipedCmdArg */ , "vgextend" , []string {vgName , "/dev/md127" }... )
415
+ if err != nil {
416
+ klog .Errorf ("errored while extending VGs %v: %s" , err , info )
417
+ }
418
+ }
419
+ return nil
420
+ }
0 commit comments