Skip to content

Commit f3f77c7

Browse files
authored
Merge pull request kubernetes-retired#9 from rrati/bucket-controller
Added bucket controller
2 parents 45858ea + 9195ed3 commit f3f77c7

File tree

2 files changed

+681
-0
lines changed

2 files changed

+681
-0
lines changed
+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package bucket
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
utilversion "k8s.io/apimachinery/pkg/util/version"
27+
28+
kubeclientset "k8s.io/client-go/kubernetes"
29+
"k8s.io/client-go/util/retry"
30+
"k8s.io/client-go/util/workqueue"
31+
32+
"github.com/kubernetes-sigs/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
33+
bucketclientset "github.com/kubernetes-sigs/container-object-storage-interface-api/clientset"
34+
"github.com/kubernetes-sigs/container-object-storage-interface-api/controller"
35+
36+
osspec "github.com/kubernetes-sigs/container-object-storage-interface-spec"
37+
38+
"k8s.io/klog"
39+
40+
"golang.org/x/time/rate"
41+
)
42+
43+
// bucketListener manages Bucket objects
44+
type bucketListener struct {
45+
kubeClient kubeclientset.Interface
46+
bucketClient bucketclientset.Interface
47+
provisionerClient osspec.ProvisionerClient
48+
49+
// The name of the provisioner for which this controller dynamically
50+
// provisions buckets.
51+
provisionerName string
52+
kubeVersion *utilversion.Version
53+
}
54+
55+
// NewBucketController returns a controller that manages Bucket objects
56+
func NewBucketController(provisionerName string, client osspec.ProvisionerClient) (*controller.ObjectStorageController, error) {
57+
rateLimit := workqueue.NewMaxOfRateLimiter(
58+
workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 60*time.Minute),
59+
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
60+
)
61+
62+
identity := fmt.Sprintf("object-storage-sidecar-%s", provisionerName)
63+
bc, err := controller.NewObjectStorageController(identity, "bucket-controller", 5, rateLimit)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
bl := bucketListener{
69+
provisionerName: provisionerName,
70+
provisionerClient: client,
71+
}
72+
bc.AddBucketListener(&bl)
73+
74+
return bc, nil
75+
}
76+
77+
// InitializeKubeClient initializes the kubernetes client
78+
func (bl *bucketListener) InitializeKubeClient(k kubeclientset.Interface) {
79+
bl.kubeClient = k
80+
81+
serverVersion, err := k.Discovery().ServerVersion()
82+
if err != nil {
83+
klog.Errorf("unable to get server version: %v", err)
84+
} else {
85+
bl.kubeVersion = utilversion.MustParseSemantic(serverVersion.GitVersion)
86+
}
87+
}
88+
89+
// InitializeBucketClient initializes the object storage bucket client
90+
func (bl *bucketListener) InitializeBucketClient(bc bucketclientset.Interface) {
91+
bl.bucketClient = bc
92+
}
93+
94+
// Add will call the provisioner and add a bucket
95+
func (bl *bucketListener) Add(ctx context.Context, obj *v1alpha1.Bucket) error {
96+
klog.V(1).Infof("bucketListener: add called for bucket %s", obj.Name)
97+
98+
// Verify this bucket is for this provisioner
99+
if !strings.EqualFold(obj.Spec.Provisioner, bl.provisionerName) {
100+
return nil
101+
}
102+
103+
req := osspec.ProvisionerCreateBucketRequest{
104+
BucketName: obj.Name,
105+
BucketContext: obj.Spec.Parameters,
106+
}
107+
108+
req.BucketContext["ProtocolVersion"] = obj.Spec.Protocol.Version
109+
110+
if obj.Spec.AnonymousAccessMode.Private {
111+
req.AnonymousBucketAccessMode = osspec.ProvisionerCreateBucketRequest_BUCKET_PRIVATE
112+
} else if obj.Spec.AnonymousAccessMode.PublicReadOnly {
113+
req.AnonymousBucketAccessMode = osspec.ProvisionerCreateBucketRequest_BUCKET_READ_ONLY
114+
} else if obj.Spec.AnonymousAccessMode.PublicReadWrite {
115+
req.AnonymousBucketAccessMode = osspec.ProvisionerCreateBucketRequest_BUCKET_WRITE_ONLY
116+
} else if obj.Spec.AnonymousAccessMode.PublicWriteOnly {
117+
req.AnonymousBucketAccessMode = osspec.ProvisionerCreateBucketRequest_BUCKET_READ_WRITE
118+
}
119+
120+
// TODO set grpc timeout
121+
rsp, err := bl.provisionerClient.ProvisionerCreateBucket(ctx, &req)
122+
if err != nil {
123+
klog.Errorf("error calling ProvisionerCreateBucket: %v", err)
124+
return err
125+
}
126+
klog.V(1).Infof("provisioner returned create bucket response %v", rsp)
127+
128+
// TODO update the bucket protocol in the bucket spec
129+
130+
// update bucket availability to true
131+
return bl.updateStatus(ctx, obj.Name, "Bucket Provisioned", true)
132+
}
133+
134+
// Update does nothing
135+
func (bl *bucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket) error {
136+
klog.V(1).Infof("bucketListener: update called for bucket %s", old.Name)
137+
return nil
138+
}
139+
140+
// Delete will call the provisioner and delete a bucket
141+
func (bl *bucketListener) Delete(ctx context.Context, obj *v1alpha1.Bucket) error {
142+
klog.V(1).Infof("bucketListener: delete called for bucket %s", obj.Name)
143+
144+
// Verify this bucket is for this provisioner
145+
if !strings.EqualFold(obj.Spec.Provisioner, bl.provisionerName) {
146+
return nil
147+
}
148+
149+
req := osspec.ProvisionerDeleteBucketRequest{
150+
BucketContext: obj.Spec.Parameters,
151+
}
152+
153+
switch obj.Spec.Protocol.Name {
154+
case v1alpha1.ProtocolNameS3:
155+
req.BucketName = obj.Spec.Protocol.S3.BucketName
156+
req.BucketContext["Region"] = obj.Spec.Protocol.S3.Region
157+
req.BucketContext["SignatureVersion"] = string(obj.Spec.Protocol.S3.SignatureVersion)
158+
req.BucketContext["Endpoint"] = obj.Spec.Protocol.S3.Endpoint
159+
case v1alpha1.ProtocolNameAzure:
160+
req.BucketName = obj.Spec.Protocol.AzureBlob.ContainerName
161+
req.BucketContext["StorageAccount"] = obj.Spec.Protocol.AzureBlob.StorageAccount
162+
case v1alpha1.ProtocolNameGCS:
163+
req.BucketName = obj.Spec.Protocol.GCS.BucketName
164+
req.BucketContext["ServiceAccount"] = obj.Spec.Protocol.GCS.ServiceAccount
165+
req.BucketContext["PrivateKeyName"] = obj.Spec.Protocol.GCS.PrivateKeyName
166+
req.BucketContext["ProjectID"] = obj.Spec.Protocol.GCS.ProjectID
167+
default:
168+
return fmt.Errorf("unknown protocol: %s", obj.Spec.Protocol.Name)
169+
}
170+
171+
req.BucketContext["ProtocolVersion"] = obj.Spec.Protocol.Version
172+
173+
// TODO set grpc timeout
174+
rsp, err := bl.provisionerClient.ProvisionerDeleteBucket(ctx, &req)
175+
if err != nil {
176+
klog.Errorf("error calling ProvisionerDeleteBucket: %v", err)
177+
obj.Status.Message = "Bucket Deleting"
178+
obj.Status.BucketAvailable = false
179+
_, err = bl.bucketClient.ObjectstorageV1alpha1().Buckets().UpdateStatus(ctx, obj, metav1.UpdateOptions{})
180+
return err
181+
}
182+
klog.V(1).Infof("provisioner returned delete bucket response %v", rsp)
183+
184+
// update bucket availability to false
185+
return bl.updateStatus(ctx, obj.Name, "Bucket Deleted", false)
186+
}
187+
188+
func (bl *bucketListener) updateStatus(ctx context.Context, name, msg string, state bool) error {
189+
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
190+
bucket, err := bl.bucketClient.ObjectstorageV1alpha1().Buckets().Get(ctx, name, metav1.GetOptions{})
191+
if err != nil {
192+
return err
193+
}
194+
bucket.Status.Message = msg
195+
bucket.Status.BucketAvailable = state
196+
_, err = bl.bucketClient.ObjectstorageV1alpha1().Buckets().UpdateStatus(ctx, bucket, metav1.UpdateOptions{})
197+
return err
198+
})
199+
return err
200+
}

0 commit comments

Comments
 (0)