Skip to content

Commit fb783a7

Browse files
authored
Fix race on the string interning (#6408)
Signed-off-by: alanprot <[email protected]>
1 parent 7e444e7 commit fb783a7

File tree

6 files changed

+426
-65
lines changed

6 files changed

+426
-65
lines changed

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ require (
8080
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
8181
github.com/cespare/xxhash/v2 v2.3.0
8282
github.com/google/go-cmp v0.6.0
83+
github.com/hashicorp/golang-lru/v2 v2.0.7
8384
github.com/sercand/kuberesolver/v5 v5.1.1
8485
github.com/tjhop/slog-gokit v0.1.2
8586
go.opentelemetry.io/collector/pdata v1.21.0
@@ -164,7 +165,6 @@ require (
164165
github.com/hashicorp/go-multierror v1.1.1 // indirect
165166
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
166167
github.com/hashicorp/golang-lru v0.6.0 // indirect
167-
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
168168
github.com/hashicorp/serf v0.10.1 // indirect
169169
github.com/jessevdk/go-flags v1.5.0 // indirect
170170
github.com/jmespath/go-jmespath v0.4.0 // indirect

Diff for: pkg/ingester/ingester.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
440440
return err
441441
}
442442

443+
if u.labelsStringInterningEnabled {
444+
metric.InternStrings(u.interner.Intern)
445+
}
446+
443447
return nil
444448
}
445449

@@ -454,9 +458,6 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
454458
}
455459
u.seriesInMetric.increaseSeriesForMetric(metricName)
456460
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
457-
if u.labelsStringInterningEnabled {
458-
metric.InternStrings(u.interner.Intern)
459-
}
460461

461462
if u.postingCache != nil {
462463
u.postingCache.ExpireSeries(metric)
@@ -475,9 +476,6 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
475476
}
476477
u.seriesInMetric.decreaseSeriesForMetric(metricName)
477478
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
478-
if u.labelsStringInterningEnabled {
479-
metric.ReleaseStrings(u.interner.Release)
480-
}
481479
if u.postingCache != nil {
482480
u.postingCache.ExpireSeries(metric)
483481
}
@@ -1233,7 +1231,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12331231
} else {
12341232
// Copy the label set because both TSDB and the active series tracker may retain it.
12351233
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
1236-
12371234
// Retain the reference in case there are multiple samples for the series.
12381235
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
12391236
succeededSamplesCount++
@@ -2201,7 +2198,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
22012198

22022199
instanceLimitsFn: i.getInstanceLimits,
22032200
instanceSeriesCount: &i.TSDBState.seriesCount,
2204-
interner: util.NewInterner(),
2201+
interner: util.NewLruInterner(),
22052202
labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled,
22062203

22072204
blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(),

Diff for: pkg/ingester/ingester_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,70 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
405405

406406
}
407407

408+
func TestPushRace(t *testing.T) {
409+
cfg := defaultIngesterTestConfig(t)
410+
cfg.LabelsStringInterningEnabled = true
411+
cfg.LifecyclerConfig.JoinAfter = 0
412+
dir := t.TempDir()
413+
blocksDir := filepath.Join(dir, "blocks")
414+
415+
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))
416+
417+
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, blocksDir, prometheus.NewRegistry(), true)
418+
require.NoError(t, err)
419+
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
420+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
421+
// Wait until it's ACTIVE
422+
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
423+
return ing.lifecycler.GetState()
424+
})
425+
426+
ctx := user.InjectOrgID(context.Background(), userID)
427+
sample1 := cortexpb.Sample{
428+
TimestampMs: 0,
429+
Value: 1,
430+
}
431+
432+
concurrentRequest := 100
433+
numberOfSeries := 100
434+
wg := sync.WaitGroup{}
435+
wg.Add(numberOfSeries * concurrentRequest)
436+
for k := 0; k < numberOfSeries; k++ {
437+
for i := 0; i < concurrentRequest; i++ {
438+
go func() {
439+
defer wg.Done()
440+
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
441+
require.NoError(t, err)
442+
}()
443+
}
444+
}
445+
446+
wg.Wait()
447+
448+
db := ing.getTSDB(userID)
449+
ir, err := db.db.Head().Index()
450+
require.NoError(t, err)
451+
452+
p, err := ir.Postings(ctx, "", "")
453+
require.NoError(t, err)
454+
p = ir.SortedPostings(p)
455+
total := 0
456+
var builder labels.ScratchBuilder
457+
458+
for p.Next() {
459+
total++
460+
err = ir.Series(p.At(), &builder, nil)
461+
require.NoError(t, err)
462+
lbls := builder.Labels()
463+
require.Equal(t, "foo", lbls.Get(labels.MetricName))
464+
require.Equal(t, "1", lbls.Get("userId"))
465+
require.NotEmpty(t, lbls.Get("k"))
466+
builder.Reset()
467+
}
468+
require.Equal(t, numberOfSeries, total)
469+
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
470+
}
471+
408472
func TestIngesterUserLimitExceeded(t *testing.T) {
409473
limits := defaultLimitsTestConfig()
410474
limits.MaxLocalSeriesPerUser = 1

Diff for: pkg/util/strings.go

+17-56
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,18 @@ package util
33
import (
44
"context"
55
"sync"
6+
"time"
67
"unsafe"
78

89
"github.com/bboreham/go-loser"
9-
"go.uber.org/atomic"
10+
"github.com/hashicorp/golang-lru/v2/expirable"
11+
)
12+
13+
const (
14+
// Max size is ser to 2M.
15+
maxInternerLruCacheSize = 2e6
16+
// TTL should be similar to the head compaction interval
17+
internerLruCacheTTL = time.Hour * 2
1018
)
1119

1220
// StringsContain returns true if the search value is within the list of input values.
@@ -145,30 +153,18 @@ func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) {
145153

146154
type Interner interface {
147155
Intern(s string) string
148-
Release(s string)
149156
}
150157

151-
// NewInterner returns a new Interner to be used to intern strings.
152-
// Based on https://github.com/prometheus/prometheus/blob/726ed124e4468d0274ba89b0934a6cc8c975532d/storage/remote/intern.go#L51
153-
func NewInterner() Interner {
158+
// NewLruInterner returns a new Interner to be used to intern strings.
159+
// The interner will use a LRU cache to return the deduplicated strings
160+
func NewLruInterner() Interner {
154161
return &pool{
155-
pool: map[string]*entry{},
162+
lru: expirable.NewLRU[string, string](maxInternerLruCacheSize, nil, internerLruCacheTTL),
156163
}
157164
}
158165

159166
type pool struct {
160-
mtx sync.RWMutex
161-
pool map[string]*entry
162-
}
163-
164-
type entry struct {
165-
refs atomic.Int64
166-
167-
s string
168-
}
169-
170-
func newEntry(s string) *entry {
171-
return &entry{s: s}
167+
lru *expirable.LRU[string, string]
172168
}
173169

174170
// Intern returns the interned string. It returns the canonical representation of string.
@@ -177,45 +173,10 @@ func (p *pool) Intern(s string) string {
177173
return ""
178174
}
179175

180-
p.mtx.RLock()
181-
interned, ok := p.pool[s]
182-
p.mtx.RUnlock()
176+
interned, ok := p.lru.Get(s)
183177
if ok {
184-
interned.refs.Inc()
185-
return interned.s
178+
return interned
186179
}
187-
p.mtx.Lock()
188-
defer p.mtx.Unlock()
189-
if interned, ok := p.pool[s]; ok {
190-
interned.refs.Inc()
191-
return interned.s
192-
}
193-
194-
p.pool[s] = newEntry(s)
195-
p.pool[s].refs.Store(1)
180+
p.lru.Add(s, s)
196181
return s
197182
}
198-
199-
// Release releases a reference of the string `s`.
200-
// If the reference count become 0, the string `s` is removed from the memory
201-
func (p *pool) Release(s string) {
202-
p.mtx.RLock()
203-
interned, ok := p.pool[s]
204-
p.mtx.RUnlock()
205-
206-
if !ok {
207-
return
208-
}
209-
210-
refs := interned.refs.Dec()
211-
if refs > 0 {
212-
return
213-
}
214-
215-
p.mtx.Lock()
216-
defer p.mtx.Unlock()
217-
if interned.refs.Load() != 0 {
218-
return
219-
}
220-
delete(p.pool, s)
221-
}

0 commit comments

Comments
 (0)