diff --git a/slowset/slowset.go b/slowset/slowset.go new file mode 100644 index 00000000..d48b4c95 --- /dev/null +++ b/slowset/slowset.go @@ -0,0 +1,118 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slowset + +import ( + "sync" + "time" +) + +// SlowSet is a set of API objects that should be synced at slower rate. Key is typically the object +// namespace + name and value is timestamp when the object was added to the set. +type SlowSet struct { + sync.RWMutex + // retentionTime is the time after which an item will be removed from the set + // this indicates, how long before an operation on pvc can be retried. + retentionTime time.Duration + + resyncPeriod time.Duration + workSet map[string]ObjectData +} + +type ObjectData struct { + Timestamp time.Time + StorageClassUID string +} + +func NewSlowSet(retTime time.Duration) *SlowSet { + return &SlowSet{ + retentionTime: retTime, + resyncPeriod: 100 * time.Millisecond, + workSet: make(map[string]ObjectData), + } +} + +func (s *SlowSet) Add(key string, info ObjectData) bool { + s.Lock() + defer s.Unlock() + + if _, ok := s.workSet[key]; ok { + return false + } + + s.workSet[key] = info + return true +} + +func (s *SlowSet) Get(key string) (ObjectData, bool) { + s.RLock() + defer s.RUnlock() + + info, ok := s.workSet[key] + return info, ok +} + +func (s *SlowSet) Contains(key string) bool { + s.RLock() + defer s.RUnlock() + + info, ok := s.workSet[key] + if ok && time.Since(info.Timestamp) < s.retentionTime { + return true + } + return false +} + +func (s *SlowSet) Remove(key string) { + s.Lock() + defer s.Unlock() + + delete(s.workSet, key) +} + +func (s *SlowSet) TimeRemaining(key string) time.Duration { + s.RLock() + defer s.RUnlock() + + if info, ok := s.workSet[key]; ok { + return s.retentionTime - time.Since(info.Timestamp) + } + return 0 +} + +func (s *SlowSet) removeAllExpired() { + s.Lock() + defer s.Unlock() + for key, info := range s.workSet { + if time.Since(info.Timestamp) > s.retentionTime { + delete(s.workSet, key) + } + } +} + +func (s *SlowSet) Run(stopCh <-chan struct{}) { + ticker := time.NewTicker(s.resyncPeriod) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + s.removeAllExpired() + } + } +} diff --git a/slowset/slowset_test.go b/slowset/slowset_test.go new file mode 100644 index 00000000..4722bdec --- /dev/null +++ b/slowset/slowset_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slowset + +import ( + "testing" + "time" +) + +func TestSlowSet(t *testing.T) { + tests := []struct { + name string + retentionTime time.Duration + resyncPeriod time.Duration + testFunc func(*SlowSet) bool + }{ + { + name: "Should not change time of a key if added multiple times", + resyncPeriod: 100 * time.Millisecond, + testFunc: func(s *SlowSet) bool { + key := "key" + info := ObjectData{ + Timestamp: time.Now(), + } + s.Add(key, info) + time1 := s.workSet[key] + s.Add(key, info) + time2 := s.workSet[key] + return time1 == time2 + }, + }, + { + name: "Should remove key after retention time", + retentionTime: 200 * time.Millisecond, + resyncPeriod: 100 * time.Millisecond, + testFunc: func(s *SlowSet) bool { + key := "key" + info := ObjectData{ + Timestamp: time.Now(), + } + s.Add(key, info) + time.Sleep(300 * time.Millisecond) + return !s.Contains(key) + }, + }, + { + name: "Should not remove key before retention time", + retentionTime: 200 * time.Millisecond, + resyncPeriod: 100 * time.Millisecond, + testFunc: func(s *SlowSet) bool { + key := "key" + info := ObjectData{ + Timestamp: time.Now(), + } + s.Add(key, info) + time.Sleep(100 * time.Millisecond) + return s.Contains(key) + }, + }, + { + name: "Should return time remaining for added keys", + retentionTime: 300 * time.Millisecond, + resyncPeriod: 100 * time.Millisecond, + testFunc: func(s *SlowSet) bool { + key := "key" + info := ObjectData{ + Timestamp: time.Now(), + } + s.Add(key, info) + time.Sleep(100 * time.Millisecond) + timeRemaining := s.TimeRemaining(key) + return timeRemaining > 0 && timeRemaining < 300*time.Millisecond + }, + }, + { + name: "should return false for Contains if key is present but expired", + resyncPeriod: 200 * time.Millisecond, + retentionTime: 300 * time.Millisecond, + testFunc: func(s *SlowSet) bool { + key := "key" + info := ObjectData{ + Timestamp: time.Now(), + } + s.Add(key, info) + time.Sleep(301 * time.Millisecond) + return !s.Contains(key) + }, + }, + } + + for i := range tests { + test := tests[i] + t.Run(test.name, func(t *testing.T) { + s := NewSlowSet(test.retentionTime) + s.resyncPeriod = test.resyncPeriod + stopCh := make(chan struct{}, 1) + go s.Run(stopCh) + defer close(stopCh) + if !test.testFunc(s) { + t.Errorf("Test failed") + } + }) + } +}