-
Notifications
You must be signed in to change notification settings - Fork 142
/
Copy pathcsi_resizer.go
280 lines (237 loc) · 8.5 KB
/
csi_resizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resizer
import (
"context"
"errors"
"fmt"
"strconv"
"time"
csilib "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-resizer/pkg/csi"
"github.com/kubernetes-csi/external-resizer/pkg/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"
)
var (
controllerServiceNotSupportErr = errors.New("CSI driver does not support controller service")
resizeNotSupportErr = errors.New("CSI driver neither supports controller resize nor node resize")
)
func NewResizerFromClient(
csiClient csi.Client,
timeout time.Duration,
k8sClient kubernetes.Interface,
informerFactory informers.SharedInformerFactory,
driverName string) (Resizer, error) {
supportControllerService, err := supportsPluginControllerService(csiClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to check if plugin supports controller service: %v", err)
}
if !supportControllerService {
return nil, controllerServiceNotSupportErr
}
supportControllerResize, err := supportsControllerResize(csiClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to check if plugin supports controller resize: %v", err)
}
if !supportControllerResize {
supportsNodeResize, err := supportsNodeResize(csiClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to check if plugin supports node resize: %v", err)
}
if supportsNodeResize {
klog.Info("The CSI driver supports node resize only, using trivial resizer to handle resize requests")
return newTrivialResizer(driverName), nil
}
return nil, resizeNotSupportErr
}
return &csiResizer{
name: driverName,
client: csiClient,
timeout: timeout,
k8sClient: k8sClient,
}, nil
}
type csiResizer struct {
name string
client csi.Client
timeout time.Duration
k8sClient kubernetes.Interface
}
func (r *csiResizer) Name() string {
return r.name
}
// CanSupport returns whether the PV is supported by resizer
// Resizer will resize the volume if it is CSI volume or is migration enabled in-tree volume
func (r *csiResizer) CanSupport(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) bool {
resizerName := pvc.Annotations[util.VolumeResizerKey]
// resizerName will be CSI driver name when CSI migration is enabled
// otherwise, it will be in-tree plugin name
// r.name is the CSI driver name, return true only when they match
// and the CSI driver is migrated
translator := csitrans.New()
if translator.IsMigratedCSIDriverByName(r.name) && resizerName == r.name {
return true
}
source := pv.Spec.CSI
if source == nil {
klog.V(4).Infof("PV %s is not a CSI volume, skip it", pv.Name)
return false
}
if source.Driver != r.name {
klog.V(4).Infof("Skip resize PV %s for resizer %s", pv.Name, source.Driver)
return false
}
return true
}
// Resize resizes the persistence volume given request size
// It supports both CSI volume and migrated in-tree volume
func (r *csiResizer) Resize(pv *v1.PersistentVolume, requestSize resource.Quantity) (resource.Quantity, bool, error) {
oldSize := pv.Spec.Capacity[v1.ResourceStorage]
var volumeID string
var source *v1.CSIPersistentVolumeSource
var pvSpec v1.PersistentVolumeSpec
var migrated bool
if pv.Spec.CSI != nil {
// handle CSI volume
source = pv.Spec.CSI
volumeID = source.VolumeHandle
pvSpec = pv.Spec
} else {
translator := csitrans.New()
if translator.IsMigratedCSIDriverByName(r.name) {
// handle migrated in-tree volume
csiPV, err := translator.TranslateInTreePVToCSI(pv)
if err != nil {
return oldSize, false, fmt.Errorf("failed to translate persistent volume: %v", err)
}
migrated = true
source = csiPV.Spec.CSI
pvSpec = csiPV.Spec
volumeID = source.VolumeHandle
} else {
// non-migrated in-tree volume
return oldSize, false, fmt.Errorf("volume %v is not migrated to CSI", pv.Name)
}
}
if len(volumeID) == 0 {
return oldSize, false, errors.New("empty volume handle")
}
var secrets map[string]string
secreRef := source.ControllerExpandSecretRef
if secreRef != nil {
var err error
secrets, err = getCredentials(r.k8sClient, secreRef)
if err != nil {
return oldSize, false, err
}
}
capability, err := GetVolumeCapabilities(pvSpec)
if err != nil {
return oldSize, false, fmt.Errorf("failed to get capabilities of volume %s with %v", pv.Name, err)
}
ctx, cancel := timeoutCtx(r.timeout)
resizeCtx := context.WithValue(ctx, connection.AdditionalInfoKey, connection.AdditionalInfo{Migrated: strconv.FormatBool(migrated)})
defer cancel()
newSizeBytes, nodeResizeRequired, err := r.client.Expand(resizeCtx, volumeID, requestSize.Value(), secrets, capability)
if err != nil {
return oldSize, nodeResizeRequired, err
}
return *resource.NewQuantity(newSizeBytes, resource.BinarySI), nodeResizeRequired, err
}
// GetVolumeCapabilities returns volumecapability from PV spec
func GetVolumeCapabilities(pvSpec v1.PersistentVolumeSpec) (*csilib.VolumeCapability, error) {
m := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range pvSpec.AccessModes {
m[mode] = true
}
if pvSpec.CSI == nil {
return nil, errors.New("CSI volume source was nil")
}
var cap *csilib.VolumeCapability
if pvSpec.VolumeMode != nil && *pvSpec.VolumeMode == v1.PersistentVolumeBlock {
cap = &csilib.VolumeCapability{
AccessType: &csilib.VolumeCapability_Block{
Block: &csilib.VolumeCapability_BlockVolume{},
},
AccessMode: &csilib.VolumeCapability_AccessMode{},
}
} else {
fsType := pvSpec.CSI.FSType
cap = &csilib.VolumeCapability{
AccessType: &csilib.VolumeCapability_Mount{
Mount: &csilib.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: pvSpec.MountOptions,
},
},
AccessMode: &csilib.VolumeCapability_AccessMode{},
}
}
// Translate array of modes into single VolumeCapability
switch {
case m[v1.ReadWriteMany]:
// ReadWriteMany trumps everything, regardless what other modes are set
cap.AccessMode.Mode = csilib.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
case m[v1.ReadOnlyMany] && m[v1.ReadWriteOnce]:
// This is no way how to translate this to CSI...
return nil, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOnce on the same PersistentVolume")
case m[v1.ReadOnlyMany]:
// There is only ReadOnlyMany set
cap.AccessMode.Mode = csilib.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case m[v1.ReadWriteOnce]:
// There is only ReadWriteOnce set
cap.AccessMode.Mode = csilib.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
default:
return nil, fmt.Errorf("unsupported AccessMode combination: %+v", pvSpec.AccessModes)
}
return cap, nil
}
func supportsPluginControllerService(client csi.Client, timeout time.Duration) (bool, error) {
ctx, cancel := timeoutCtx(timeout)
defer cancel()
return client.SupportsPluginControllerService(ctx)
}
func supportsControllerResize(client csi.Client, timeout time.Duration) (bool, error) {
ctx, cancel := timeoutCtx(timeout)
defer cancel()
return client.SupportsControllerResize(ctx)
}
func supportsNodeResize(client csi.Client, timeout time.Duration) (bool, error) {
ctx, cancel := timeoutCtx(timeout)
defer cancel()
return client.SupportsNodeResize(ctx)
}
func timeoutCtx(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), timeout)
}
func getCredentials(k8sClient kubernetes.Interface, ref *v1.SecretReference) (map[string]string, error) {
if ref == nil {
return nil, nil
}
secret, err := k8sClient.CoreV1().Secrets(ref.Namespace).Get(context.TODO(), ref.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err)
}
credentials := map[string]string{}
for key, value := range secret.Data {
credentials[key] = string(value)
}
return credentials, nil
}