Skip to content

Add Slowset Utility From external-resizer #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions slowset/slowset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package slowset

import (
"sync"
"time"
)

// SlowSet is a set of API objects that should be synced at slower rate. Key is typically the object
// namespace + name and value is timestamp when the object was added to the set.
type SlowSet struct {
sync.RWMutex
// retentionTime is the time after which an item will be removed from the set
// this indicates, how long before an operation on pvc can be retried.
retentionTime time.Duration

resyncPeriod time.Duration
workSet map[string]ObjectData
}

type ObjectData struct {
Timestamp time.Time
StorageClassUID string
}

func NewSlowSet(retTime time.Duration) *SlowSet {
return &SlowSet{
retentionTime: retTime,
resyncPeriod: 100 * time.Millisecond,
workSet: make(map[string]ObjectData),
}
}

func (s *SlowSet) Add(key string, info ObjectData) bool {
s.Lock()
defer s.Unlock()

if _, ok := s.workSet[key]; ok {
return false
}

s.workSet[key] = info
return true
}

func (s *SlowSet) Get(key string) (ObjectData, bool) {
s.RLock()
defer s.RUnlock()

info, ok := s.workSet[key]
return info, ok
}

func (s *SlowSet) Contains(key string) bool {
s.RLock()
defer s.RUnlock()

info, ok := s.workSet[key]
if ok && time.Since(info.Timestamp) < s.retentionTime {
return true
}
return false
}

func (s *SlowSet) Remove(key string) {
s.Lock()
defer s.Unlock()

delete(s.workSet, key)
}

func (s *SlowSet) TimeRemaining(key string) time.Duration {
s.RLock()
defer s.RUnlock()

if info, ok := s.workSet[key]; ok {
return s.retentionTime - time.Since(info.Timestamp)
}
return 0
}

func (s *SlowSet) removeAllExpired() {
s.Lock()
defer s.Unlock()
for key, info := range s.workSet {
if time.Since(info.Timestamp) > s.retentionTime {
delete(s.workSet, key)
}
}
}

func (s *SlowSet) Run(stopCh <-chan struct{}) {
ticker := time.NewTicker(s.resyncPeriod)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
s.removeAllExpired()
}
}
}
118 changes: 118 additions & 0 deletions slowset/slowset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package slowset

import (
"testing"
"time"
)

func TestSlowSet(t *testing.T) {
tests := []struct {
name string
retentionTime time.Duration
resyncPeriod time.Duration
testFunc func(*SlowSet) bool
}{
{
name: "Should not change time of a key if added multiple times",
resyncPeriod: 100 * time.Millisecond,
testFunc: func(s *SlowSet) bool {
key := "key"
info := ObjectData{
Timestamp: time.Now(),
}
s.Add(key, info)
time1 := s.workSet[key]
s.Add(key, info)
time2 := s.workSet[key]
return time1 == time2
},
},
{
name: "Should remove key after retention time",
retentionTime: 200 * time.Millisecond,
resyncPeriod: 100 * time.Millisecond,
testFunc: func(s *SlowSet) bool {
key := "key"
info := ObjectData{
Timestamp: time.Now(),
}
s.Add(key, info)
time.Sleep(300 * time.Millisecond)
return !s.Contains(key)
},
},
{
name: "Should not remove key before retention time",
retentionTime: 200 * time.Millisecond,
resyncPeriod: 100 * time.Millisecond,
testFunc: func(s *SlowSet) bool {
key := "key"
info := ObjectData{
Timestamp: time.Now(),
}
s.Add(key, info)
time.Sleep(100 * time.Millisecond)
return s.Contains(key)
},
},
{
name: "Should return time remaining for added keys",
retentionTime: 300 * time.Millisecond,
resyncPeriod: 100 * time.Millisecond,
testFunc: func(s *SlowSet) bool {
key := "key"
info := ObjectData{
Timestamp: time.Now(),
}
s.Add(key, info)
time.Sleep(100 * time.Millisecond)
timeRemaining := s.TimeRemaining(key)
return timeRemaining > 0 && timeRemaining < 300*time.Millisecond
},
},
{
name: "should return false for Contains if key is present but expired",
resyncPeriod: 200 * time.Millisecond,
retentionTime: 300 * time.Millisecond,
testFunc: func(s *SlowSet) bool {
key := "key"
info := ObjectData{
Timestamp: time.Now(),
}
s.Add(key, info)
time.Sleep(301 * time.Millisecond)
return !s.Contains(key)
},
},
}

for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
s := NewSlowSet(test.retentionTime)
s.resyncPeriod = test.resyncPeriod
stopCh := make(chan struct{}, 1)
go s.Run(stopCh)
defer close(stopCh)
if !test.testFunc(s) {
t.Errorf("Test failed")
}
})
}
}