Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Commit 23ef839

Browse files
committed
Adding the bucket creation flow
1 parent ca1c6f1 commit 23ef839

File tree

5 files changed

+251
-237
lines changed

5 files changed

+251
-237
lines changed

Diff for: container-object-storage-interface-controller/cmd/controller-manager/controller-manager.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
bucketcontroller "sigs.k8s.io/container-object-storage-interface-api/controller"
1414
"sigs.k8s.io/container-object-storage-interface-controller/pkg/bucketaccessrequest"
15-
"sigs.k8s.io/container-object-storage-interface-controller/pkg/bucketrequest"
15+
"sigs.k8s.io/container-object-storage-interface-controller/pkg/bucketclaim"
1616

1717
"k8s.io/klog/v2"
1818
)
@@ -67,7 +67,7 @@ func run(ctx context.Context, args []string) error {
6767
if err != nil {
6868
return err
6969
}
70-
ctrl.AddBucketRequestListener(bucketrequest.NewBucketRequestListener())
70+
ctrl.AddBucketClaimListener(bucketclaim.NewBucketClaimListener())
7171
ctrl.AddBucketAccessRequestListener(bucketaccessrequest.NewListener())
7272
return ctrl.Run(ctx)
7373
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package bucketclaim
2+
3+
import (
4+
"context"
5+
6+
"github.com/google/uuid"
7+
v1 "k8s.io/api/core/v1"
8+
"k8s.io/apimachinery/pkg/api/errors"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
kubeclientset "k8s.io/client-go/kubernetes"
11+
"k8s.io/klog/v2"
12+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
13+
14+
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
15+
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset"
16+
objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/clientset/typed/objectstorage.k8s.io/v1alpha1"
17+
18+
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
19+
)
20+
21+
const (
22+
finalizer = "cosi.objectstorage.k8s.io/bucketclaim-protection"
23+
)
24+
25+
// bucketClaimListener is a resource handler for bucket requests objects
26+
type bucketClaimListener struct {
27+
kubeClient kubeclientset.Interface
28+
bucketClient bucketclientset.Interface
29+
}
30+
31+
func NewBucketClaimListener() *bucketClaimListener {
32+
return &bucketClaimListener{}
33+
}
34+
35+
// Add creates a bucket in response to a bucketClaim
36+
func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
37+
klog.V(3).InfoS("Add BucketClaim",
38+
"name", bucketClaim.Name,
39+
"ns", bucketClaim.Namespace,
40+
"bucketClass", bucketClaim.Spec.BucketClassName,
41+
"bucketPrefix", bucketClaim.Spec.BucketPrefix,
42+
)
43+
44+
err := b.provisionBucketClaimOperation(ctx, bucketClaim)
45+
if err != nil {
46+
switch err {
47+
case util.ErrInvalidBucketClass:
48+
klog.ErrorS(util.ErrInvalidBucketClass,
49+
"bucketClaim", bucketClaim.Name,
50+
"ns", bucketClaim.Namespace,
51+
"bucketClassName", bucketClaim.Spec.BucketClassName)
52+
case util.ErrBucketAlreadyExists:
53+
klog.V(3).InfoS("Bucket already exists",
54+
"bucketClaim", bucketClaim.Name,
55+
"ns", bucketClaim.Namespace,
56+
)
57+
return nil
58+
default:
59+
klog.ErrorS(err,
60+
"name", bucketClaim.Name,
61+
"ns", bucketClaim.Namespace,
62+
"err", err)
63+
}
64+
return err
65+
}
66+
67+
klog.V(3).InfoS("Add BucketClaim success",
68+
"name", bucketClaim.Name,
69+
"ns", bucketClaim.Namespace)
70+
return nil
71+
}
72+
73+
// update processes any updates made to the bucket request
74+
func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
75+
klog.V(3).InfoS("Update BucketClaim",
76+
"name", old.Name,
77+
"ns", old.Namespace)
78+
return nil
79+
}
80+
81+
// Delete processes a bucket for which bucket request is deleted
82+
func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
83+
klog.V(3).Infof("Delete BucketClaim %v",
84+
"name", bucketClaim.Name,
85+
"ns", bucketClaim.Namespace)
86+
87+
return nil
88+
}
89+
90+
// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
91+
// Return values
92+
// nil - BucketClaim successfully processed
93+
// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
94+
// ErrBucketAlreadyExists - BucketClaim already processed
95+
// non-nil err - Internal error [requeue'd with exponential backoff]
96+
func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
97+
bucketClassName := b.getBucketClass(bucketClaim)
98+
bucketClass, err := b.BucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{})
99+
if err != nil {
100+
klog.ErrorS(err, "Get Bucketclass Error", "name", bucketClassName)
101+
return util.ErrInvalidBucketClass
102+
}
103+
104+
if bucketClaim.Status.BucketReady {
105+
return util.ErrBucketAlreadyExists
106+
}
107+
108+
name = bucketClassName + string(bucketClaim.GetUID())
109+
110+
// create bucket
111+
bucket := &v1alpha1.Bucket{}
112+
bucket.Name = name
113+
bucket.Status.BucketReady = false
114+
bucket.Spec.DriverName = bucketClass.DriverName
115+
bucket.Spec.BucketClassName = bucketClassName
116+
bucket.Spec.DeletionPolicy = bucketClass.DeletionPolicy
117+
bucket.Spec.BucketClaim = &v1.ObjectReference{
118+
Name: bucketClaim.Name,
119+
Namespace: bucketClaim.Namespace,
120+
UID: bucketClaim.ObjectMeta.UID,
121+
}
122+
bucket.Spec.Protocols = *bucketClass.Protocol.DeepCopy()
123+
bucket.Spec.Parameters = util.CopySS(bucketClass.Parameters)
124+
125+
bucket, err = b.Buckets().Create(ctx, bucket, metav1.CreateOptions{})
126+
if err != nil && !errors.IsAlreadyExists(err) {
127+
klog.ErrorS(err, "name", bucket.Name)
128+
return err
129+
}
130+
131+
controllerutil.AddFinalizer(bucketClaim, finalizer)
132+
133+
bucketClaim.Status.BucketName = bucket.Name
134+
bucketClaim.Status.BucketAvailable = true
135+
_, err = b.BucketClaims(bucketClaim.Namespace).UpdateStatus(ctx, bucketClaim, metav1.UpdateOptions{})
136+
if err != nil {
137+
return err
138+
}
139+
140+
klog.Infof("Finished creating Bucket %v", bucket.Name)
141+
return nil
142+
}
143+
144+
// getBucketClass returns BucketClassName. If no bucket class was in the request it returns empty
145+
// TODO this methods can be more sophisticate to address bucketClass overrides using annotations just like SC.
146+
func (b *bucketClaimListener) getBucketClass(bucketClaim *v1alpha1.BucketClaim) string {
147+
if bucketClaim.Spec.BucketClassName != "" {
148+
return bucketClaim.Spec.BucketClassName
149+
}
150+
151+
return ""
152+
}
153+
154+
// cloneTheBucket clones a bucket to a different namespace when a BR is for brownfield.
155+
func (b *bucketClaimListener) cloneTheBucket(bucketClaim *v1alpha1.BucketClaim) error {
156+
klog.InfoS("Cloning Bucket", "name", bucketClaim.Status.BucketName)
157+
return util.ErrNotImplemented
158+
}
159+
160+
func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
161+
b.kubeClient = k
162+
}
163+
164+
func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
165+
b.bucketClient = bc
166+
}
167+
168+
func (b *bucketClaimListener) Buckets() objectstoragev1alpha1.BucketInterface {
169+
if b.bucketClient != nil {
170+
return b.bucketClient.ObjectstorageV1alpha1().Buckets()
171+
}
172+
panic("uninitialized listener")
173+
}
174+
175+
func (b *bucketClaimListener) BucketClasses() objectstoragev1alpha1.BucketClassInterface {
176+
if b.bucketClient != nil {
177+
return b.bucketClient.ObjectstorageV1alpha1().BucketClasses()
178+
}
179+
panic("uninitialized listener")
180+
}
181+
182+
func (b *bucketClaimListener) BucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
183+
if b.bucketClient != nil {
184+
return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace)
185+
}
186+
panic("uninitialized listener")
187+
}

0 commit comments

Comments
 (0)