Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.

Fix: Bucket and BucketAccess object updation facing stale object and improved logging #73

Merged
merged 3 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 57 additions & 14 deletions pkg/bucket/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
return nil
}

if bucket.Spec.ExistingBucketID != "" {
bucket.Status.BucketReady = true
bucket.Status.BucketID = bucket.Spec.ExistingBucketID
bucketReady := false
var bucketID string

if bucket.Spec.ExistingBucketID != "" {
bucketReady = true
bucketID = bucket.Spec.ExistingBucketID
} else {
req := &cosi.DriverCreateBucketRequest{
Parameters: bucket.Spec.Parameters,
Expand All @@ -100,7 +102,7 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
rsp, err := b.provisionerClient.DriverCreateBucket(ctx, req)
if err != nil {
if status.Code(err) != codes.AlreadyExists {
klog.ErrorS(err, "Failed to create bucket",
klog.V(3).ErrorS(err, "Driver failed to create bucket",
"bucket", bucket.ObjectMeta.Name)
return errors.Wrap(err, "Failed to create bucket")
}
Expand All @@ -109,14 +111,17 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)

if rsp == nil {
err = errors.New("DriverCreateBucket returned a nil response")
klog.ErrorS(err, "Internal Error")
klog.V(3).ErrorS(err, "Internal Error from driver",
"bucket", bucket.ObjectMeta.Name)
return err
}

if rsp.BucketId != "" {
bucket.Status.BucketID = rsp.BucketId
bucket.Status.BucketReady = true
bucketID = rsp.BucketId
bucketReady = true
} else {
klog.V(3).ErrorS(err, "DriverCreateBucket returned an empty bucketID",
"bucket", bucket.ObjectMeta.Name)
err = errors.New("DriverCreateBucket returned an empty bucketID")
return err
}
Expand All @@ -126,29 +131,48 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
ref := bucket.Spec.BucketClaim
bucketClaim, err := b.bucketClaims(ref.Namespace).Get(ctx, ref.Name, metav1.GetOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Failed to get bucketClaim",
"bucketClaim", ref.Name,
"bucket", bucket.ObjectMeta.Name)
return err
}

bucketClaim.Status.BucketReady = true
if _, err = b.bucketClaims(bucketClaim.Namespace).Update(ctx, bucketClaim, metav1.UpdateOptions{}); err != nil {
if _, err = b.bucketClaims(bucketClaim.Namespace).UpdateStatus(ctx, bucketClaim, metav1.UpdateOptions{}); err != nil {
klog.V(3).ErrorS(err, "Failed to update bucketClaim",
"bucketClaim", ref.Name,
"bucket", bucket.ObjectMeta.Name)
return err
}

klog.V(5).Infof("Successfully updated status of bucketClaim: %s, bucket: %s", bucketClaim.ObjectMeta.Name, bucket.ObjectMeta.Name)
}
}

controllerutil.AddFinalizer(bucket, consts.BucketFinalizer)
if _, err = b.buckets().Update(ctx, bucket, metav1.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update bucket finalizers", "bucket", bucket.ObjectMeta.Name)
if bucket, err = b.buckets().Update(ctx, bucket, metav1.UpdateOptions{}); err != nil {
klog.V(3).ErrorS(err, "Failed to update bucket finalizers", "bucket", bucket.ObjectMeta.Name)
return errors.Wrap(err, "Failed to update bucket finalizers")
}

klog.V(5).Infof("Successfully added finalizer to bucket: %s", bucket.ObjectMeta.Name)

// Setting the status here so that the updated object is used
bucket.Status.BucketReady = bucketReady
bucket.Status.BucketID = bucketID

// if this step fails, then controller will retry with backoff
if _, err = b.buckets().UpdateStatus(ctx, bucket, metav1.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update bucket status",
klog.V(3).ErrorS(err, "Failed to update bucket status",
"bucket", bucket.ObjectMeta.Name)
return errors.Wrap(err, "Failed to update bucket status")
}

klog.V(3).InfoS("Add Bucket success",
"bucket", bucket.ObjectMeta.Name,
"bucketID", bucketID,
"ns", bucket.ObjectMeta.Namespace)

return nil
}

Expand All @@ -174,12 +198,18 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
if strings.EqualFold(bucketAccess.Spec.BucketClaimName, bucketClaimName) {
err = b.bucketAccesses(bucketClaimNs).Delete(ctx, bucketAccess.Name, metav1.DeleteOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Error deleting BucketAccess",
"name", bucketAccess.Name,
"bucket", bucket.ObjectMeta.Name)
return err
}
}
}

klog.V(5).Infof("Successfully deleted dependent bucketAccess of bucket:%s", bucket.ObjectMeta.Name)

controllerutil.RemoveFinalizer(bucket, consts.BABucketFinalizer)
klog.V(5).Infof("Successfully removed finalizer: %s of bucket: %s", consts.BABucketFinalizer, bucket.ObjectMeta.Name)
}

if controllerutil.ContainsFinalizer(bucket, consts.BucketFinalizer) {
Expand All @@ -190,6 +220,9 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
}
}

klog.V(3).InfoS("Update Bucket success",
"name", bucket.ObjectMeta.Name,
"ns", bucket.ObjectMeta.Namespace)
return nil
}

Expand All @@ -213,7 +246,7 @@ func (b *BucketListener) InitializeKubeClient(k kube.Interface) {

serverVersion, err := k.Discovery().ServerVersion()
if err != nil {
klog.ErrorS(err, "Cannot determine server version")
klog.V(3).ErrorS(err, "Cannot determine server version")
} else {
b.kubeVersion = utilversion.MustParseSemantic(serverVersion.GitVersion)
}
Expand All @@ -226,7 +259,7 @@ func (b *BucketListener) InitializeBucketClient(bc buckets.Interface) {

func (b *BucketListener) deleteBucketOp(ctx context.Context, bucket *v1alpha1.Bucket) error {
if !strings.EqualFold(bucket.Spec.DriverName, b.driverName) {
klog.V(5).InfoS("Skipping bucket for provisiner",
klog.V(5).InfoS("Skipping bucket for provisioner",
"bucket", bucket.ObjectMeta.Name,
"driver", bucket.Spec.DriverName,
)
Expand All @@ -242,26 +275,36 @@ func (b *BucketListener) deleteBucketOp(ctx context.Context, bucket *v1alpha1.Bu

if _, err := b.provisionerClient.DriverDeleteBucket(ctx, req); err != nil {
if status.Code(err) != codes.NotFound {
klog.ErrorS(err, "Failed to delete bucket",
klog.V(3).ErrorS(err, "Failed to delete bucket",
"bucket", bucket.ObjectMeta.Name,
)
return err
}
}

klog.V(5).Infof("Successfully deleted bucketID: %s from the object storage for bucket: %s", bucket.Status.BucketID, bucket.ObjectMeta.Name)
}

if bucket.Spec.BucketClaim != nil {
ref := bucket.Spec.BucketClaim
bucketClaim, err := b.bucketClaims(ref.Namespace).Get(ctx, ref.Name, metav1.GetOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Error fetching bucketClaim",
"bucketClaim", ref.Name,
"bucket", bucket.ObjectMeta.Name)
return err
}

if controllerutil.RemoveFinalizer(bucketClaim, consts.BCFinalizer) {
if _, err := b.bucketClaims(bucketClaim.ObjectMeta.Namespace).Update(ctx, bucketClaim, metav1.UpdateOptions{}); err != nil {
klog.V(3).ErrorS(err, "Error removing finalizer from bucketClaim",
"bucketClaim", bucketClaim.ObjectMeta.Name,
"bucket", bucket.ObjectMeta.Name)
return err
}
}

klog.V(5).Infof("Successfully removed finalizer: %s from bucketClaim: %s for bucket: %s", consts.BCFinalizer, bucketClaim.ObjectMeta.Name, bucket.ObjectMeta.Name)
}

return nil
Expand Down
44 changes: 29 additions & 15 deletions pkg/bucketaccess/bucketaccess_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
namespace := bucketAccess.ObjectMeta.Namespace
bucketClaim, err := bal.bucketClaims(namespace).Get(ctx, bucketClaimName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to fetch bucketClaim", "bucketClaim", bucketClaimName)
klog.V(3).ErrorS(err, "Failed to fetch bucketClaim", "bucketClaim", bucketClaimName)
return errors.Wrap(err, "Failed to fetch bucketClaim")
}

if bucketClaim.Status.BucketName == "" || bucketClaim.Status.BucketReady != true {
err := errors.New("BucketName cannot be empty or BucketNotReady in bucketClaim")
klog.ErrorS(err,
klog.V(3).ErrorS(err,
"Invalid arguments",
"bucketClaim", bucketClaim.Name,
"bucketAccess", bucketAccess.ObjectMeta.Name,
Expand Down Expand Up @@ -142,7 +142,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a

bucket, err := bal.buckets().Get(ctx, bucketClaim.Status.BucketName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to fetch bucket", "bucket", bucketClaim.Status.BucketName)
klog.V(3).ErrorS(err, "Failed to fetch bucket", "bucket", bucketClaim.Status.BucketName)
return errors.Wrap(err, "Failed to fetch bucket")
}

Expand All @@ -163,7 +163,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
rsp, err := bal.provisionerClient.DriverGrantBucketAccess(ctx, req)
if err != nil {
if status.Code(err) != codes.AlreadyExists {
klog.ErrorS(err,
klog.V(3).ErrorS(err,
"Failed to grant access",
"bucketAccess", bucketAccess.ObjectMeta.Name,
"bucketClaim", bucketClaimName,
Expand All @@ -175,14 +175,14 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a

if rsp.AccountId == "" {
err = errors.New("AccountId not defined in DriverGrantBucketAccess")
klog.ErrorS(err, "BucketAccess", bucketAccess.ObjectMeta.Name)
klog.V(3).ErrorS(err, "BucketAccess", bucketAccess.ObjectMeta.Name)
return errors.Wrap(err, fmt.Sprintf("BucketAccess %s", bucketAccess.ObjectMeta.Name))
}

credentials := rsp.Credentials
if len(credentials) != 1 {
err = errors.New("Credentials returned in DriverGrantBucketAccessResponse should be of length 1")
klog.ErrorS(err, "BucketAccess", bucketAccess.ObjectMeta.Name)
klog.V(3).ErrorS(err, "BucketAccess", bucketAccess.ObjectMeta.Name)
return errors.Wrap(err, fmt.Sprintf("BucketAccess %s", bucketAccess.ObjectMeta.Name))
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a

if _, err := bal.secrets(namespace).Get(ctx, secretCredName, metav1.GetOptions{}); err != nil {
if !kubeerrors.IsNotFound(err) {
klog.ErrorS(err,
klog.V(3).ErrorS(err,
"Failed to create secrets",
"bucketAccess", bucketAccess.ObjectMeta.Name,
"bucket", bucket.ObjectMeta.Name)
Expand All @@ -249,7 +249,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
Type: corev1.SecretTypeOpaque,
}, metav1.CreateOptions{}); err != nil {
if !kubeerrors.IsAlreadyExists(err) {
klog.ErrorS(err,
klog.V(3).ErrorS(err,
"Failed to create minted secret",
"bucketAccess", bucketAccess.ObjectMeta.Name,
"bucket", bucket.ObjectMeta.Name)
Expand All @@ -266,8 +266,9 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a
}

if controllerutil.AddFinalizer(bucketAccess, consts.BAFinalizer) {
if _, err = bal.bucketAccesses(bucketAccess.ObjectMeta.Namespace).Update(ctx, bucketAccess, metav1.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update BucketAccess finalizer",
bucketAccess, err = bal.bucketAccesses(bucketAccess.ObjectMeta.Namespace).Update(ctx, bucketAccess, metav1.UpdateOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Failed to update BucketAccess finalizer",
"bucketAccess", bucketAccess.ObjectMeta.Name,
"bucket", bucket.ObjectMeta.Name)
return errors.Wrap(err, fmt.Sprintf("Failed to update BucketAccess finalizer. BucketAccess: %s", bucketAccess.ObjectMeta.Name))
Expand All @@ -279,7 +280,7 @@ func (bal *BucketAccessListener) Add(ctx context.Context, inputBucketAccess *v1a

// if this step fails, then controller will retry with backoff
if _, err := bal.bucketAccesses(bucketAccess.ObjectMeta.Namespace).UpdateStatus(ctx, bucketAccess, metav1.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update BucketAccess Status",
klog.V(3).ErrorS(err, "Failed to update BucketAccess Status",
"bucketAccess", bucketAccess.ObjectMeta.Name,
"bucket", bucket.ObjectMeta.Name)
return errors.Wrap(err, fmt.Sprintf("Failed to update BucketAccess Status. BucketAccess: %s", bucketAccess.ObjectMeta.Name))
Expand All @@ -297,11 +298,15 @@ func (bal *BucketAccessListener) Update(ctx context.Context, old, new *v1alpha1.
"name", old.ObjectMeta.Name)

bucketAccess := new.DeepCopy()
err := bal.deleteBucketAccessOp(ctx, bucketAccess)
if err != nil {
return err
if !bucketAccess.GetDeletionTimestamp().IsZero() {
err := bal.deleteBucketAccessOp(ctx, bucketAccess)
if err != nil {
return err
}
}

klog.V(3).InfoS("Update BucketAccess success",
"name", old.ObjectMeta.Name)
return nil
}

Expand All @@ -328,15 +333,24 @@ func (bal *BucketAccessListener) deleteBucketAccessOp(ctx context.Context, bucke
if controllerutil.RemoveFinalizer(secret, consts.SecretFinalizer) {
_, err = bal.secrets(bucketAccess.ObjectMeta.Namespace).Update(ctx, secret, metav1.UpdateOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Error removing finalizer from secret",
"secret", secret.ObjectMeta.Name,
"bucketAccess", bucketAccess.ObjectMeta.Name)
return err
}

klog.V(5).Infof("Successfully removed finalizer from secret: %s, bucketAccess: %s", secret.ObjectMeta.Name, bucketAccess.ObjectMeta.Name)
}

if controllerutil.RemoveFinalizer(bucketAccess, consts.BAFinalizer) {
_, err = bal.bucketAccesses(bucketAccess.ObjectMeta.Namespace).Update(ctx, bucketAccess, metav1.UpdateOptions{})
if err != nil {
klog.V(3).ErrorS(err, "Error removing finalizer from bucketAccess",
"bucketAccess", bucketAccess.ObjectMeta.Name)
return err
}

klog.V(5).Infof("Successfully removed finalizer from bucketAccess: %s", bucketAccess.ObjectMeta.Name)
}

return nil
Expand Down Expand Up @@ -383,7 +397,7 @@ func (bal *BucketAccessListener) InitializeKubeClient(k kube.Interface) {

serverVersion, err := k.Discovery().ServerVersion()
if err != nil {
klog.ErrorS(err, "Cannot determine server version")
klog.V(3).ErrorS(err, "Cannot determine server version")
} else {
bal.kubeVersion = utilversion.MustParseSemantic(serverVersion.GitVersion)
}
Expand Down