@@ -99,6 +99,7 @@ const (
99
99
var (
100
100
errExemplarRef = errors .New ("exemplars not ingested because series not already present" )
101
101
errIngesterStopping = errors .New ("ingester stopping" )
102
+ errNoUserDb = errors .New ("no user db" )
102
103
103
104
tsChunksPool zeropool.Pool [[]client.TimeSeriesChunk ]
104
105
)
@@ -988,8 +989,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
988
989
989
990
func (i * Ingester ) updateUserTSDBConfigs () {
990
991
for _ , userID := range i .getTSDBUsers () {
991
- userDB := i .getTSDB (userID )
992
- if userDB == nil {
992
+ userDB , err := i .getTSDB (userID )
993
+ if err != nil || userDB == nil {
993
994
continue
994
995
}
995
996
@@ -1005,7 +1006,7 @@ func (i *Ingester) updateUserTSDBConfigs() {
1005
1006
}
1006
1007
1007
1008
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
1008
- err : = userDB .db .ApplyConfig (cfg )
1009
+ err = userDB .db .ApplyConfig (cfg )
1009
1010
if err != nil {
1010
1011
level .Error (logutil .WithUserID (userID , i .logger )).Log ("msg" , "failed to update user tsdb configuration." )
1011
1012
}
@@ -1029,8 +1030,8 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
1029
1030
purgeTime := time .Now ().Add (- i .cfg .ActiveSeriesMetricsIdleTimeout )
1030
1031
1031
1032
for _ , userID := range i .getTSDBUsers () {
1032
- userDB := i .getTSDB (userID )
1033
- if userDB == nil {
1033
+ userDB , err := i .getTSDB (userID )
1034
+ if err != nil || userDB == nil {
1034
1035
continue
1035
1036
}
1036
1037
@@ -1045,8 +1046,8 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
1045
1046
func (i * Ingester ) updateLabelSetMetrics () {
1046
1047
activeUserSet := make (map [string ]map [uint64 ]struct {})
1047
1048
for _ , userID := range i .getTSDBUsers () {
1048
- userDB := i .getTSDB (userID )
1049
- if userDB == nil {
1049
+ userDB , err := i .getTSDB (userID )
1050
+ if err != nil || userDB == nil {
1050
1051
continue
1051
1052
}
1052
1053
@@ -1549,8 +1550,8 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
1549
1550
1550
1551
i .metrics .queries .Inc ()
1551
1552
1552
- db := i .getTSDB (userID )
1553
- if db == nil {
1553
+ db , err := i .getTSDB (userID )
1554
+ if err != nil || db == nil {
1554
1555
return & client.ExemplarQueryResponse {}, nil
1555
1556
}
1556
1557
@@ -1642,8 +1643,8 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
1642
1643
return nil , cleanup , err
1643
1644
}
1644
1645
1645
- db := i .getTSDB (userID )
1646
- if db == nil {
1646
+ db , err := i .getTSDB (userID )
1647
+ if err != nil || db == nil {
1647
1648
return & client.LabelValuesResponse {}, cleanup , nil
1648
1649
}
1649
1650
@@ -1732,8 +1733,8 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
1732
1733
return nil , cleanup , err
1733
1734
}
1734
1735
1735
- db := i .getTSDB (userID )
1736
- if db == nil {
1736
+ db , err := i .getTSDB (userID )
1737
+ if err != nil || db == nil {
1737
1738
return & client.LabelNamesResponse {}, cleanup , nil
1738
1739
}
1739
1740
@@ -1830,8 +1831,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
1830
1831
return cleanup , err
1831
1832
}
1832
1833
1833
- db := i .getTSDB (userID )
1834
- if db == nil {
1834
+ db , err := i .getTSDB (userID )
1835
+ if err != nil || db == nil {
1835
1836
return cleanup , nil
1836
1837
}
1837
1838
@@ -1944,8 +1945,8 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)
1944
1945
return nil , err
1945
1946
}
1946
1947
1947
- db := i .getTSDB (userID )
1948
- if db == nil {
1948
+ db , err := i .getTSDB (userID )
1949
+ if err != nil || db == nil {
1949
1950
return & client.UserStatsResponse {}, nil
1950
1951
}
1951
1952
@@ -2064,8 +2065,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
2064
2065
2065
2066
i .metrics .queries .Inc ()
2066
2067
2067
- db := i .getTSDB (userID )
2068
- if db == nil {
2068
+ db , err := i .getTSDB (userID )
2069
+ if err != nil || db == nil {
2069
2070
return nil
2070
2071
}
2071
2072
@@ -2216,11 +2217,14 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
2216
2217
return numSeries , numSamples , totalBatchSizeBytes , numChunks , nil
2217
2218
}
2218
2219
2219
- func (i * Ingester ) getTSDB (userID string ) * userTSDB {
2220
+ func (i * Ingester ) getTSDB (userID string ) ( * userTSDB , error ) {
2220
2221
i .stoppedMtx .RLock ()
2221
2222
defer i .stoppedMtx .RUnlock ()
2222
2223
db := i .TSDBState .dbs [userID ]
2223
- return db
2224
+ if db == nil {
2225
+ return nil , errNoUserDb
2226
+ }
2227
+ return db , nil
2224
2228
}
2225
2229
2226
2230
// List all users for which we have a TSDB. We do it here in order
@@ -2238,8 +2242,11 @@ func (i *Ingester) getTSDBUsers() []string {
2238
2242
}
2239
2243
2240
2244
func (i * Ingester ) getOrCreateTSDB (userID string , force bool ) (* userTSDB , error ) {
2241
- db := i .getTSDB (userID )
2245
+ db , err := i .getTSDB (userID )
2242
2246
if db != nil {
2247
+ if err != nil {
2248
+ level .Warn (i .logger ).Log ("msg" , "error getting user DB but userDB is not null" , "err" , err , "userID" , userID )
2249
+ }
2243
2250
return db , nil
2244
2251
}
2245
2252
@@ -2271,7 +2278,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
2271
2278
}
2272
2279
2273
2280
// Create the database and a shipper for a user
2274
- db , err : = i .createTSDB (userID )
2281
+ db , err = i .createTSDB (userID )
2275
2282
if err != nil {
2276
2283
return nil , err
2277
2284
}
@@ -2285,10 +2292,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
2285
2292
2286
2293
func (i * Ingester ) blockChunkQuerierFunc (userId string ) tsdb.BlockChunkQuerierFunc {
2287
2294
return func (b tsdb.BlockReader , mint , maxt int64 ) (storage.ChunkQuerier , error ) {
2288
- db := i .getTSDB (userId )
2295
+ db , err := i .getTSDB (userId )
2289
2296
2290
2297
var postingCache cortex_tsdb.ExpandedPostingsCache
2291
- if db != nil {
2298
+ if err == nil && db != nil {
2292
2299
postingCache = db .postingCache
2293
2300
}
2294
2301
@@ -2650,8 +2657,8 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
2650
2657
}
2651
2658
2652
2659
// Get the user's DB. If the user doesn't exist, we skip it.
2653
- userDB := i .getTSDB (userID )
2654
- if userDB == nil || userDB .shipper == nil {
2660
+ userDB , err := i .getTSDB (userID )
2661
+ if err != nil || userDB == nil || userDB .shipper == nil {
2655
2662
return nil
2656
2663
}
2657
2664
@@ -2762,8 +2769,8 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.
2762
2769
return nil
2763
2770
}
2764
2771
2765
- userDB := i .getTSDB (userID )
2766
- if userDB == nil {
2772
+ userDB , err := i .getTSDB (userID )
2773
+ if err != nil || userDB == nil {
2767
2774
return nil
2768
2775
}
2769
2776
@@ -2773,8 +2780,6 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.
2773
2780
return nil
2774
2781
}
2775
2782
2776
- var err error
2777
-
2778
2783
i .TSDBState .compactionsTriggered .Inc ()
2779
2784
2780
2785
reason := ""
@@ -2823,8 +2828,8 @@ func (i *Ingester) expirePostingsCache(ctx context.Context) error {
2823
2828
if ctx .Err () != nil {
2824
2829
return nil
2825
2830
}
2826
- userDB := i .getTSDB (userID )
2827
- if userDB == nil || userDB .postingCache == nil {
2831
+ userDB , err := i .getTSDB (userID )
2832
+ if err != nil || userDB == nil || userDB .postingCache == nil {
2828
2833
continue
2829
2834
}
2830
2835
userDB .postingCache .PurgeExpiredItems ()
@@ -2834,8 +2839,8 @@ func (i *Ingester) expirePostingsCache(ctx context.Context) error {
2834
2839
}
2835
2840
2836
2841
func (i * Ingester ) closeAndDeleteUserTSDBIfIdle (userID string ) tsdbCloseCheckResult {
2837
- userDB := i .getTSDB (userID )
2838
- if userDB == nil || userDB .shipper == nil {
2842
+ userDB , err := i .getTSDB (userID )
2843
+ if err != nil || userDB == nil || userDB .shipper == nil {
2839
2844
// We will not delete local data when not using shipping to storage.
2840
2845
return tsdbShippingDisabled
2841
2846
}
0 commit comments