-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucketrequest.go
180 lines (155 loc) · 6.44 KB
/
bucketrequest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package bucketrequest
import (
"context"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"github.com/kubernetes-sigs/container-object-storage-interface-controller/pkg/util"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset"
bucketcontroller "sigs.k8s.io/container-object-storage-interface-api/controller"
"github.com/golang/glog"
)
type bucketRequestListener struct {
kubeClient kubeclientset.Interface
bucketClient bucketclientset.Interface
}
func NewListener() bucketcontroller.BucketRequestListener {
return &bucketRequestListener{}
}
func (b *bucketRequestListener) InitializeKubeClient(k kubeclientset.Interface) {
b.kubeClient = k
}
func (b *bucketRequestListener) InitializeBucketClient(bc bucketclientset.Interface) {
b.bucketClient = bc
}
// Add creates a bucket in response to a bucketrequest
func (b *bucketRequestListener) Add(ctx context.Context, obj *v1alpha1.BucketRequest) error {
glog.V(3).Infof("Add called for BucketRequest %s", obj.Name)
bucketRequest := obj
err := b.provisionBucketRequestOperation(ctx, bucketRequest)
if err != nil {
// Provisioning is 100% finished / not in progress.
switch err {
case util.ErrInvalidBucketClass:
glog.V(1).Infof("BucketClass specified does not exist while processing BucketRequest %v.", bucketRequest.Name)
err = nil
case util.ErrBucketAlreadyExists:
glog.V(1).Infof("Bucket already exist for this bucket request %v.", bucketRequest.Name)
err = nil
default:
glog.V(1).Infof("Error occurred processing BucketRequest %v: %v", bucketRequest.Name, err)
}
return err
}
glog.V(1).Infof("BucketRequest %v is successfully processed.", bucketRequest.Name)
return nil
}
// update processes any updates made to the bucket request
func (b *bucketRequestListener) Update(ctx context.Context, old, new *v1alpha1.BucketRequest) error {
glog.V(3).Infof("Update called for BucketRequest %v", old.Name)
return nil
}
// Delete processes a bucket for which bucket request is deleted
func (b *bucketRequestListener) Delete(ctx context.Context, obj *v1alpha1.BucketRequest) error {
glog.V(3).Infof("Delete called for BucketRequest %v", obj.Name)
return nil
}
// provisionBucketRequestOperation attempts to provision a bucket for the given bucketRequest.
// Returns nil error only when the bucket was provisioned, an error it set appropriately if not.
// Returns a normal error when the bucket was not provisioned and provisioning should be retried (requeue the bucketRequest),
// or the special error errBucketAlreadyExists, errInvalidBucketClass, when provisioning was impossible and
// no further attempts to provision should be tried.
func (b *bucketRequestListener) provisionBucketRequestOperation(ctx context.Context, bucketRequest *v1alpha1.BucketRequest) error {
// Most code here is identical to that found in controller.go of kube's controller...
bucketClassName := b.GetBucketClass(bucketRequest)
bucketClass, err := b.bucketClient.ObjectstorageV1alpha1().BucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{})
if err != nil {
glog.Errorf("error getting bucketclass: [%v] %v", bucketClassName, err)
return util.ErrInvalidBucketClass
}
name := bucketRequest.Spec.BucketPrefix
if name != "" {
name = name + "-"
}
name = name + string(bucketRequest.GetUID())
bucket, err := b.bucketClient.ObjectstorageV1alpha1().Buckets().Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
// anything other than 404
if !errors.IsNotFound(err) {
glog.Errorf("error fetching bucket: %v", err)
return err
}
} else { // if bucket found
return nil
}
// create bucket
bucket = &v1alpha1.Bucket{}
bucket.Name = name
bucket.Spec.Provisioner = bucketClass.Provisioner
bucket.Spec.RetentionPolicy = bucketClass.RetentionPolicy
bucket.Spec.AnonymousAccessMode = bucketClass.AnonymousAccessMode
bucket.Spec.BucketClassName = bucketClass.Name
bucket.Spec.BucketRequest = &v1.ObjectReference{
Name: bucketRequest.Name,
Namespace: bucketRequest.Namespace,
UID: bucketRequest.ObjectMeta.UID}
bucket.Spec.AllowedNamespaces = util.CopyStrings(bucketClass.AllowedNamespaces)
bucket.Spec.Protocol = *bucketClass.Protocol.DeepCopy()
bucket.Spec.Parameters = util.CopySS(bucketClass.Parameters)
bucket, err = b.bucketClient.ObjectstorageV1alpha1().Buckets().Create(context.Background(), bucket, metav1.CreateOptions{})
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
glog.V(5).Infof("Error occurred when creating Bucket %v", err)
return err
}
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
bucketRequest.Spec.BucketInstanceName = bucket.Name
_, err := b.bucketClient.ObjectstorageV1alpha1().BucketRequests(bucketRequest.Namespace).Update(ctx, bucketRequest, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
glog.Infof("Finished creating Bucket %v", bucket.Name)
return nil
}
// GetBucketClass returns BucketClassName. If no bucket class was in the request it returns empty
// TODO this methods can be more sophisticate to address bucketClass overrides using annotations just like SC.
func (b *bucketRequestListener) GetBucketClass(bucketRequest *v1alpha1.BucketRequest) string {
if bucketRequest.Spec.BucketClassName != "" {
return bucketRequest.Spec.BucketClassName
}
return ""
}
func (b *bucketRequestListener) FindBucket(ctx context.Context, br *v1alpha1.BucketRequest) *v1alpha1.Bucket {
bucketList, err := b.bucketClient.ObjectstorageV1alpha1().Buckets().List(ctx, metav1.ListOptions{})
if err != nil {
return nil
}
if len(bucketList.Items) > 0 {
for _, bucket := range bucketList.Items {
if strings.HasPrefix(bucket.Name, br.Spec.BucketPrefix) &&
bucket.Spec.BucketClassName == br.Spec.BucketClassName &&
bucket.Spec.BucketRequest.Name == br.Name &&
bucket.Spec.BucketRequest.Namespace == br.Namespace &&
bucket.Spec.BucketRequest.UID == br.ObjectMeta.UID {
return &bucket
}
}
}
return nil
}
// cloneTheBucket clones a bucket to a different namespace when a BR is for brownfield.
func (b *bucketRequestListener) cloneTheBucket(bucketRequest *v1alpha1.BucketRequest) error {
glog.V(1).Infof("Clone called for Bucket %s", bucketRequest.Spec.BucketInstanceName)
return util.ErrNotImplemented
}