This repository was archived by the owner on Dec 3, 2024. It is now read-only.
generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathbucketaccessrequest.go
208 lines (177 loc) · 8.78 KB
/
bucketaccessrequest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package bucketaccessrequest
import (
"context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/kubernetes-sigs/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
bucketclientset "github.com/kubernetes-sigs/container-object-storage-interface-api/clientset"
bucketcontroller "github.com/kubernetes-sigs/container-object-storage-interface-api/controller"
"github.com/kubernetes-sigs/container-object-storage-interface-controller/pkg/util"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
type bucketAccessRequestListener struct {
kubeClient kubeclientset.Interface
bucketClient bucketclientset.Interface
}
func NewListener() bucketcontroller.BucketAccessRequestListener {
return &bucketAccessRequestListener{}
}
func (b *bucketAccessRequestListener) InitializeKubeClient(k kubeclientset.Interface) {
b.kubeClient = k
}
func (b *bucketAccessRequestListener) InitializeBucketClient(bc bucketclientset.Interface) {
b.bucketClient = bc
}
// Add is in response to user adding a BucketAccessRequest. The call here will respond by creating a BucketAccess Object.
func (b *bucketAccessRequestListener) Add(ctx context.Context, obj *v1alpha1.BucketAccessRequest) error {
glog.V(1).Infof("Add called for BucketAccessRequest %s", obj.Name)
bucketAccessRequest := obj
err := b.provisionBucketAccess(ctx, bucketAccessRequest)
if err != nil {
// Provisioning is 100% finished / not in progress.
switch err {
case util.ErrInvalidBucketAccessClass:
glog.V(1).Infof("BucketAccessClass specified does not exist while processing BucketAccessRequest %v.", bucketAccessRequest.Name)
err = nil
case util.ErrBucketAccessAlreadyExists:
glog.V(1).Infof("BucketAccess already exist for this BucketAccessRequest %v.", bucketAccessRequest.Name)
err = nil
default:
glog.V(1).Infof("Error occurred processing BucketAccessRequest %v: %v", bucketAccessRequest.Name, err)
}
return err
}
glog.V(1).Infof("BucketAccessRequest %v is successfully processed.", bucketAccessRequest.Name)
return nil
}
// Update is called in response to a change to BucketAccessRequesst. At this point
// BucketAccess cannot be changed once created as the Provisioner might have already acted upon the create BucketAccess and created the backend Bucket Credentials
// Changes to Protocol, Provisioner, BucketInstanceName, BucketRequest cannot be allowed. Best is to delete and recreate a new BucketAccessRequest
// Changes to ServiceAccount, PolicyActionsConfigMapData and Parameters should be considered in lieu with sidecar implementation
func (b *bucketAccessRequestListener) Update(ctx context.Context, old, new *v1alpha1.BucketAccessRequest) error {
glog.V(1).Infof("Update called for BucketAccessRequest %v", old.Name)
if (old.ObjectMeta.DeletionTimestamp == nil) &&
(new.ObjectMeta.DeletionTimestamp != nil) {
// BucketAccessRequest is being deleted, check and remove finalizer once BA is deleted
return b.removeBucketAccess(ctx, new)
}
return nil
}
// Delete is in response to user deleting a BucketAccessRequest. The call here will respond by deleting a BucketAccess Object.
func (b *bucketAccessRequestListener) Delete(ctx context.Context, bucketAccessRequest *v1alpha1.BucketAccessRequest) error {
glog.V(1).Infof("Delete called for BucketAccessRequest %v", bucketAccessRequest.Name)
return nil
}
// provisionBucketAccess attempts to provision a BucketAccess for the given bucketAccessRequest.
// Returns nil error only when the bucketaccess was provisioned. An error is return if we cannot create bucket access.
// A normal error is returned when bucket acess was not provisioned and provisioning should be retried (requeue the bucketAccessRequest),
// or a special error errBucketAccessAlreadyExists, errInvalidBucketAccessClass is returned when provisioning was impossible and
// no further attempts to provision should be tried.
func (b *bucketAccessRequestListener) provisionBucketAccess(ctx context.Context, bucketAccessRequest *v1alpha1.BucketAccessRequest) error {
bucketAccessClassName := bucketAccessRequest.Spec.BucketAccessClassName
bucketaccess := b.FindBucketAccess(ctx, bucketAccessRequest)
if bucketaccess != nil {
// bucketaccess has provisioned, nothing to do.
return util.ErrBucketAccessAlreadyExists
}
bucketAccessClass, err := b.bucketClient.ObjectstorageV1alpha1().BucketAccessClasses().Get(ctx, bucketAccessClassName, metav1.GetOptions{})
if bucketAccessClass == nil {
// bucket access class is invalid or not specified, cannot continue with provisioning.
return util.ErrInvalidBucketAccessClass
}
bucketRequest, err := b.bucketClient.ObjectstorageV1alpha1().BucketRequests(bucketAccessRequest.Namespace).Get(ctx, bucketAccessRequest.Spec.BucketRequestName, metav1.GetOptions{})
if bucketRequest == nil {
// bucket request does not exist, we have to reject this provision.
return util.ErrInvalidBucketRequest
}
if err != nil {
return err
}
if bucketRequest.Spec.BucketInstanceName == "" {
return util.ErrWaitForBucketProvisioning
}
sa, err := b.kubeClient.CoreV1().ServiceAccounts(bucketAccessRequest.Namespace).Get(ctx, bucketAccessRequest.Spec.ServiceAccountName, metav1.GetOptions{})
if err != nil {
return err
}
bucketaccess = &v1alpha1.BucketAccess{}
bucketaccess.Name = util.GetUUID()
bucketaccess.Spec.BucketInstanceName = bucketRequest.Spec.BucketInstanceName
bucketaccess.Spec.BucketAccessRequest = &v1.ObjectReference{
Name: bucketAccessRequest.Name,
Namespace: bucketAccessRequest.Namespace,
UID: bucketAccessRequest.ObjectMeta.UID}
bucketaccess.Spec.ServiceAccount = &v1.ObjectReference{
Name: sa.Name,
Namespace: sa.Namespace,
UID: sa.ObjectMeta.UID}
// bucketaccess.Spec.MintedSecretName - set by the driver
bucketaccess.Spec.PolicyActionsConfigMapData, err = util.ReadConfigData(b.kubeClient, bucketAccessClass.PolicyActionsConfigMap)
if err != nil && err != util.ErrNilConfigMap {
return err
}
// bucketaccess.Spec.Principal - set by the driver
bucketaccess.Spec.Provisioner = bucketAccessClass.Provisioner
bucketaccess.Spec.Parameters = util.CopySS(bucketAccessClass.Parameters)
bucketaccess, err = b.bucketClient.ObjectstorageV1alpha1().BucketAccesses().Create(context.Background(), bucketaccess, metav1.CreateOptions{})
if err != nil {
return err
}
if !util.CheckFinalizer(bucketAccessRequest, util.BARDeleteFinalizer) {
bucketAccessRequest.ObjectMeta.Finalizers = append(bucketAccessRequest.ObjectMeta.Finalizers, util.BARDeleteFinalizer)
}
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
bucketAccessRequest.Spec.BucketAccessName = bucketaccess.Name
_, err := b.bucketClient.ObjectstorageV1alpha1().BucketAccessRequests(bucketAccessRequest.Namespace).Update(ctx, bucketAccessRequest, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
glog.Infof("Finished creating BucketAccess %v", bucketaccess.Name)
return nil
}
func (b *bucketAccessRequestListener) removeBucketAccess(ctx context.Context, bucketAccessRequest *v1alpha1.BucketAccessRequest) error {
bucketaccess := b.FindBucketAccess(ctx, bucketAccessRequest)
if bucketaccess == nil {
// bucketaccess for this BucketAccessRequest is not found
return util.ErrBucketAccessDoesNotExist
}
// time to delete the BucketAccess Object
err := b.bucketClient.ObjectstorageV1alpha1().BucketAccesses().Delete(context.Background(), bucketaccess.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
// we can safely remove the finalizer
return b.removeBARDeleteFinalizer(ctx, bucketAccessRequest)
}
func (b *bucketAccessRequestListener) FindBucketAccess(ctx context.Context, bucketAccessRequest *v1alpha1.BucketAccessRequest) *v1alpha1.BucketAccess {
bucketAccessList, err := b.bucketClient.ObjectstorageV1alpha1().BucketAccesses().List(ctx, metav1.ListOptions{})
if err != nil || len(bucketAccessList.Items) <= 0 {
return nil
}
for _, bucketaccess := range bucketAccessList.Items {
if bucketaccess.Spec.BucketAccessRequest.Name == bucketAccessRequest.Name &&
bucketaccess.Spec.BucketAccessRequest.Namespace == bucketAccessRequest.Namespace &&
bucketaccess.Spec.BucketAccessRequest.UID == bucketAccessRequest.UID {
return &bucketaccess
}
}
return nil
}
func (b *bucketAccessRequestListener) removeBARDeleteFinalizer(ctx context.Context, bucketAccessRequest *v1alpha1.BucketAccessRequest) error {
newFinalizers := []string{}
for _, finalizer := range bucketAccessRequest.ObjectMeta.Finalizers {
if finalizer != util.BARDeleteFinalizer {
newFinalizers = append(newFinalizers, finalizer)
}
}
bucketAccessRequest.ObjectMeta.Finalizers = newFinalizers
_, err := b.bucketClient.ObjectstorageV1alpha1().BucketAccessRequests(bucketAccessRequest.Namespace).Update(ctx, bucketAccessRequest, metav1.UpdateOptions{})
return err
}