@@ -21,9 +21,12 @@ import (
21
21
"errors"
22
22
"fmt"
23
23
"math/rand"
24
+ "net/http"
24
25
"reflect"
25
26
goruntime "runtime"
26
27
"strconv"
28
+ "sync"
29
+ "sync/atomic"
27
30
"syscall"
28
31
"testing"
29
32
"time"
@@ -1753,6 +1756,152 @@ func TestReflectorListExtract(t *testing.T) {
1753
1756
}
1754
1757
}
1755
1758
1759
+ func TestReflectorReplacesStoreOnUnsafeDelete (t * testing.T ) {
1760
+ mkPod := func (id string , rv string ) * v1.Pod {
1761
+ return & v1.Pod {ObjectMeta : metav1.ObjectMeta {Namespace : "ns" , Name : id , ResourceVersion : rv }}
1762
+ }
1763
+ mkList := func (rv string , pods ... * v1.Pod ) * v1.PodList {
1764
+ list := & v1.PodList {ListMeta : metav1.ListMeta {ResourceVersion : rv }}
1765
+ for _ , pod := range pods {
1766
+ list .Items = append (list .Items , * pod )
1767
+ }
1768
+ return list
1769
+ }
1770
+ makeStatus := func () * metav1.Status {
1771
+ return & metav1.Status {
1772
+ Status : metav1 .StatusFailure ,
1773
+ Code : http .StatusInternalServerError ,
1774
+ Reason : metav1 .StatusReasonStoreReadError ,
1775
+ Message : "failed to prepare current and previous objects: corrupt object has been deleted" ,
1776
+ }
1777
+ }
1778
+
1779
+ // these pods preexist and never get updated/deleted
1780
+ preExisting := mkPod ("foo-1" , "1" )
1781
+ pods := []* v1.Pod {preExisting , mkPod ("foo-2" , "2" ), mkPod ("foo-3" , "3" )}
1782
+ lastExpectedRV := "5"
1783
+ lists := []* v1.PodList {
1784
+ mkList ("3" , pods ... ), // initial list
1785
+ mkList (lastExpectedRV , pods ... ), // re-list due to watch error
1786
+ }
1787
+ corruptObj := mkPod ("foo" , "4" )
1788
+ events := []watch.Event {
1789
+ {Type : watch .Added , Object : corruptObj },
1790
+ // the object becomes corrupt, and it gets unsafe-deleted, and
1791
+ // watch sends the following Error event, note the RV has
1792
+ // advanced to "5" in the storage due to the delete operation
1793
+ {Type : watch .Error , Object : makeStatus ()},
1794
+ }
1795
+
1796
+ s := NewFIFO (MetaNamespaceKeyFunc )
1797
+ var replaceInvoked atomic.Int32
1798
+ store := & fakeStore {
1799
+ Store : s ,
1800
+ beforeReplace : func (list []interface {}, rv string ) {
1801
+ // interested in the Replace call that happens after the Error event
1802
+ if rv == lastExpectedRV {
1803
+ replaceInvoked .Add (1 )
1804
+ _ , exists , err := s .Get (corruptObj )
1805
+ if err != nil || ! exists {
1806
+ t .Errorf ("expected the object to exist in the store, exists: %t, err: %v" , exists , err )
1807
+ }
1808
+ _ , exists , err = s .Get (preExisting )
1809
+ if err != nil || ! exists {
1810
+ t .Errorf ("expected the pre-existing object to be in the store, exists: %t, err: %v" , exists , err )
1811
+ }
1812
+ }
1813
+ },
1814
+ afterReplace : func (rv string , err error ) {
1815
+ if rv == lastExpectedRV {
1816
+ replaceInvoked .Add (1 )
1817
+ if err != nil {
1818
+ t .Errorf ("expected Replace to have succeeded, but got error: %v" , err )
1819
+ }
1820
+ _ , exists , err := s .Get (corruptObj )
1821
+ if err != nil || exists {
1822
+ t .Errorf ("expected the object to have been removed from the store, exists: %t, err: %v" , exists , err )
1823
+ }
1824
+ // show that a pre-existing pod is still in the cache
1825
+ _ , exists , err = s .Get (preExisting )
1826
+ if err != nil || ! exists {
1827
+ t .Errorf ("expected the pre-existing object to be in the store, exists: %t, err: %v" , exists , err )
1828
+ }
1829
+ }
1830
+ },
1831
+ }
1832
+
1833
+ var once sync.Once
1834
+ lw := & testLW {
1835
+ WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
1836
+ fw := watch .NewFake ()
1837
+ go func () {
1838
+ once .Do (func () {
1839
+ for _ , e := range events {
1840
+ fw .Action (e .Type , e .Object )
1841
+ }
1842
+ })
1843
+ }()
1844
+ return fw , nil
1845
+ },
1846
+ ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
1847
+ var list runtime.Object
1848
+ if len (lists ) > 0 {
1849
+ list = lists [0 ]
1850
+ lists = lists [1 :]
1851
+ }
1852
+ return list , nil
1853
+ },
1854
+ }
1855
+
1856
+ r := NewReflector (lw , & v1.Pod {}, store , 0 )
1857
+ doneCh , stopCh := make (chan struct {}), make (chan struct {})
1858
+ go func () {
1859
+ defer close (doneCh )
1860
+ r .Run (stopCh )
1861
+ }()
1862
+
1863
+ // wait for the RV to sync to the version returned by the final list
1864
+ err := wait .PollUntilContextTimeout (context .Background (), 100 * time .Millisecond , wait .ForeverTestTimeout , true , func (ctx context.Context ) (done bool , err error ) {
1865
+ if rv := r .LastSyncResourceVersion (); rv == lastExpectedRV {
1866
+ return true , nil
1867
+ }
1868
+ return false , nil
1869
+ })
1870
+ if err != nil {
1871
+ t .Fatalf ("reflector never caught up with expected revision: %q, err: %v" , lastExpectedRV , err )
1872
+ }
1873
+
1874
+ if want , got := lastExpectedRV , r .LastSyncResourceVersion (); want != got {
1875
+ t .Errorf ("expected LastSyncResourceVersion to be %q, but got: %q" , want , got )
1876
+ }
1877
+ if want , got := 2 , int (replaceInvoked .Load ()); want != got {
1878
+ t .Errorf ("expected store Delete hooks to be invoked %d times, but got: %d" , want , got )
1879
+ }
1880
+ if want , got := len (pods ), len (s .List ()); want != got {
1881
+ t .Errorf ("expected the store to have %d objects, but got: %d" , want , got )
1882
+ }
1883
+
1884
+ close (stopCh )
1885
+ select {
1886
+ case <- doneCh :
1887
+ case <- time .After (wait .ForeverTestTimeout ):
1888
+ t .Errorf ("timed out waiting for Run to return" )
1889
+ }
1890
+ }
1891
+
1892
+ type fakeStore struct {
1893
+ Store
1894
+ beforeReplace func (list []interface {}, s string )
1895
+ afterReplace func (rv string , err error )
1896
+ }
1897
+
1898
+ func (f * fakeStore ) Replace (list []interface {}, rv string ) error {
1899
+ f .beforeReplace (list , rv )
1900
+ err := f .Store .Replace (list , rv )
1901
+ f .afterReplace (rv , err )
1902
+ return err
1903
+ }
1904
+
1756
1905
func BenchmarkExtractList (b * testing.B ) {
1757
1906
_ , _ , podList := getPodListItems (0 , fakeItemsNum )
1758
1907
_ , _ , configMapList := getConfigmapListItems (0 , fakeItemsNum )
0 commit comments