Skip to content

Commit 4ba2c7c

Browse files
authored
[pkg/stanza] Add monitoring metrics for open and harvested files in fileconsumer (#31544)
Blocked on #31618 **Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> This PR adds support for filelog receiver to emit observable metrics about its current state: how many files are opened, and harvested. **Link to tracking Issue:** #31256 **Testing:** <Describe what testing was performed and which tests were added.> #### How to test this manually 1. Use the following collector config: ```yaml receivers: filelog: start_at: end include: - /var/log/busybox/monitoring/*.log exporters: debug: verbosity: detailed service: telemetry: metrics: level: detailed address: ":8888" pipelines: logs: receivers: [filelog] exporters: [debug] processors: [] ``` 2. Build and run the collector: `make otelcontribcol && ./bin/otelcontribcol_linux_amd64 --config ~/otelcol/monitoring_telemetry/config.yaml` 3. Produce some logs: ```console echo 'some line' >> /var/log/busybox/monitoring/1.log while true; do echo -e "This is a log line" >> /var/log/busybox/monitoring/2.log; done ``` 4. Verify that metrics are produced: ```console curl 0.0.0.0:8888/metrics | grep _files % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 4002 0 4002 0 0 1954k 0 --:--:-- --:--:-- --:--:-- 1954k # HELP otelcol_fileconsumer_open_files Number of open files # TYPE otelcol_fileconsumer_open_files gauge otelcol_fileconsumer_open_files{service_instance_id="72b4899d-6ce3-41de-a25b-8f0370e22ec1",service_name="otelcontribcol",service_version="0.99.0-dev"} 2 # HELP otelcol_fileconsumer_reading_files Number of open files that are being read # TYPE otelcol_fileconsumer_reading_files gauge otelcol_fileconsumer_reading_files{service_instance_id="72b4899d-6ce3-41de-a25b-8f0370e22ec1",service_name="otelcontribcol",service_version="0.99.0-dev"} 1 ``` **Documentation:** <Describe the documentation added.> Added a respective section in Filelog receiver's docs. --------- Signed-off-by: ChrsMark <[email protected]>
1 parent 483a201 commit 4ba2c7c

File tree

9 files changed

+100
-20
lines changed

9 files changed

+100
-20
lines changed

.chloggen/add_filelog_metrics.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add monitoring metrics for open and harvested files in fileconsumer
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31256]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/fileconsumer/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"go.opentelemetry.io/collector/component"
1414
"go.opentelemetry.io/collector/featuregate"
15+
"go.opentelemetry.io/otel/metric"
1516
"go.uber.org/zap"
1617
"golang.org/x/text/encoding"
1718

@@ -34,6 +35,8 @@ const (
3435
defaultMaxConcurrentFiles = 1024
3536
defaultEncoding = "utf-8"
3637
defaultPollInterval = 200 * time.Millisecond
38+
openFilesMetric = "fileconsumer/open_files"
39+
readingFilesMetric = "fileconsumer/reading_files"
3740
)
3841

3942
var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
@@ -172,6 +175,25 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
172175
t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2)
173176
}
174177
set.Logger = set.Logger.With(zap.String("component", "fileconsumer"))
178+
179+
meter := set.MeterProvider.Meter("otelcol/fileconsumer")
180+
181+
openFiles, err := meter.Int64UpDownCounter(
182+
openFilesMetric,
183+
metric.WithDescription("Number of open files"),
184+
metric.WithUnit("1"),
185+
)
186+
if err != nil {
187+
return nil, err
188+
}
189+
readingFiles, err := meter.Int64UpDownCounter(
190+
readingFilesMetric,
191+
metric.WithDescription("Number of open files that are being read"),
192+
metric.WithUnit("1"),
193+
)
194+
if err != nil {
195+
return nil, err
196+
}
175197
return &Manager{
176198
set: set,
177199
readerFactory: readerFactory,
@@ -180,6 +202,8 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
180202
maxBatchFiles: c.MaxConcurrentFiles / 2,
181203
maxBatches: c.MaxBatches,
182204
tracker: t,
205+
openFiles: openFiles,
206+
readingFiles: readingFiles,
183207
}, nil
184208
}
185209

pkg/stanza/fileconsumer/file.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/otel/metric"
1415
"go.uber.org/zap"
1516

1617
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
@@ -37,6 +38,9 @@ type Manager struct {
3738
persister operator.Persister
3839
maxBatches int
3940
maxBatchFiles int
41+
42+
openFiles metric.Int64UpDownCounter
43+
readingFiles metric.Int64UpDownCounter
4044
}
4145

4246
func (m *Manager) Start(persister operator.Persister) error {
@@ -73,7 +77,7 @@ func (m *Manager) Stop() error {
7377
m.cancel = nil
7478
}
7579
m.wg.Wait()
76-
m.tracker.ClosePreviousFiles()
80+
m.openFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
7781
if m.persister != nil {
7882
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
7983
m.set.Logger.Error("save offsets", zap.Error(err))
@@ -146,7 +150,7 @@ func (m *Manager) poll(ctx context.Context) {
146150

147151
func (m *Manager) consume(ctx context.Context, paths []string) {
148152
m.set.Logger.Debug("Consuming files", zap.Strings("paths", paths))
149-
m.makeReaders(paths)
153+
m.makeReaders(ctx, paths)
150154

151155
m.readLostFiles(ctx)
152156

@@ -156,12 +160,14 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
156160
wg.Add(1)
157161
go func(r *reader.Reader) {
158162
defer wg.Done()
163+
m.readingFiles.Add(ctx, 1)
159164
r.ReadToEnd(ctx)
165+
m.readingFiles.Add(ctx, -1)
160166
}(r)
161167
}
162168
wg.Wait()
163169

164-
m.tracker.EndConsume()
170+
m.openFiles.Add(ctx, int64(0-m.tracker.EndConsume()))
165171
}
166172

167173
func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
@@ -192,7 +198,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
192198
// makeReader take a file path, then creates reader,
193199
// discarding any that have a duplicate fingerprint to other files that have already
194200
// been read this polling interval
195-
func (m *Manager) makeReaders(paths []string) {
201+
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
196202
for _, path := range paths {
197203
fp, file := m.makeFingerprint(path)
198204
if fp == nil {
@@ -210,7 +216,7 @@ func (m *Manager) makeReaders(paths []string) {
210216
continue
211217
}
212218

213-
r, err := m.newReader(file, fp)
219+
r, err := m.newReader(ctx, file, fp)
214220
if err != nil {
215221
m.set.Logger.Error("Failed to create reader", zap.Error(err))
216222
continue
@@ -220,18 +226,28 @@ func (m *Manager) makeReaders(paths []string) {
220226
}
221227
}
222228

223-
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
229+
func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
224230
// Check previous poll cycle for match
225231
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
226232
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
227233
}
228234

229235
// Check for closed files for match
230236
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
231-
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
237+
r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
238+
if err != nil {
239+
return nil, err
240+
}
241+
m.openFiles.Add(ctx, 1)
242+
return r, nil
232243
}
233244

234245
// If we don't match any previously known files, create a new reader from scratch
235246
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
236-
return m.readerFactory.NewReader(file, fp)
247+
r, err := m.readerFactory.NewReader(file, fp)
248+
if err != nil {
249+
return nil, err
250+
}
251+
m.openFiles.Add(ctx, 1)
252+
return r, nil
237253
}

pkg/stanza/fileconsumer/file_other.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ OUTER:
5050
lostWG.Add(1)
5151
go func(r *reader.Reader) {
5252
defer lostWG.Done()
53+
m.readingFiles.Add(ctx, 1)
5354
r.ReadToEnd(ctx)
55+
m.readingFiles.Add(ctx, -1)
5456
}(lostReader)
5557
}
5658
lostWG.Wait()

pkg/stanza/fileconsumer/internal/tracker/tracker.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ type Tracker interface {
2222
LoadMetadata(metadata []*reader.Metadata)
2323
CurrentPollFiles() []*reader.Reader
2424
PreviousPollFiles() []*reader.Reader
25-
ClosePreviousFiles()
25+
ClosePreviousFiles() int
2626
EndPoll()
27-
EndConsume()
27+
EndConsume() int
2828
TotalReaders() int
2929
}
3030

@@ -101,12 +101,13 @@ func (t *fileTracker) PreviousPollFiles() []*reader.Reader {
101101
return t.previousPollFiles.Get()
102102
}
103103

104-
func (t *fileTracker) ClosePreviousFiles() {
104+
func (t *fileTracker) ClosePreviousFiles() (filesClosed int) {
105105
// t.previousPollFiles -> t.knownFiles[0]
106-
107106
for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() {
108107
t.knownFiles[0].Add(r.Close())
108+
filesClosed++
109109
}
110+
return
110111
}
111112

112113
func (t *fileTracker) EndPoll() {
@@ -155,10 +156,12 @@ func (t *noStateTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Rea
155156
return t.currentPollFiles.Match(fp, fileset.Equal)
156157
}
157158

158-
func (t *noStateTracker) EndConsume() {
159+
func (t *noStateTracker) EndConsume() (filesClosed int) {
159160
for r, _ := t.currentPollFiles.Pop(); r != nil; r, _ = t.currentPollFiles.Pop() {
160161
r.Close()
162+
filesClosed++
161163
}
164+
return
162165
}
163166

164167
func (t *noStateTracker) GetOpenFile(_ *fingerprint.Fingerprint) *reader.Reader { return nil }
@@ -171,7 +174,7 @@ func (t *noStateTracker) LoadMetadata(_ []*reader.Metadata) {}
171174

172175
func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil }
173176

174-
func (t *noStateTracker) ClosePreviousFiles() {}
177+
func (t *noStateTracker) ClosePreviousFiles() int { return 0 }
175178

176179
func (t *noStateTracker) EndPoll() {}
177180

pkg/stanza/fileconsumer/internal/tracker/tracker_other.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212

1313
// On non-windows platforms, we keep files open between poll cycles so that we can detect
1414
// and read "lost" files, which have been moved out of the matching pattern.
15-
func (t *fileTracker) EndConsume() {
16-
t.ClosePreviousFiles()
15+
func (t *fileTracker) EndConsume() (filesClosed int) {
16+
filesClosed = t.ClosePreviousFiles()
1717

1818
// t.currentPollFiles -> t.previousPollFiles
1919
t.previousPollFiles = t.currentPollFiles
2020
t.currentPollFiles = fileset.New[*reader.Reader](t.maxBatchFiles)
21+
return
2122
}

pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import (
1212
)
1313

1414
// On windows, we close files immediately after reading because they cannot be moved while open.
15-
func (t *fileTracker) EndConsume() {
15+
func (t *fileTracker) EndConsume() (filesClosed int) {
1616
// t.currentPollFiles -> t.previousPollFiles
1717
t.previousPollFiles = t.currentPollFiles
18-
t.ClosePreviousFiles()
18+
filesClosed = t.ClosePreviousFiles()
1919
t.currentPollFiles = fileset.New[*reader.Reader](t.maxBatchFiles)
20+
return
2021
}

pkg/stanza/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
go.opentelemetry.io/collector/featuregate v1.8.0
2424
go.opentelemetry.io/collector/pdata v1.8.0
2525
go.opentelemetry.io/collector/receiver v0.101.0
26+
go.opentelemetry.io/otel/metric v1.26.0
2627
go.uber.org/goleak v1.3.0
2728
go.uber.org/multierr v1.11.0
2829
go.uber.org/zap v1.27.0
@@ -62,7 +63,6 @@ require (
6263
go.opentelemetry.io/collector/config/configtelemetry v0.101.0 // indirect
6364
go.opentelemetry.io/otel v1.26.0 // indirect
6465
go.opentelemetry.io/otel/exporters/prometheus v0.48.0 // indirect
65-
go.opentelemetry.io/otel/metric v1.26.0 // indirect
6666
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
6767
go.opentelemetry.io/otel/sdk/metric v1.26.0 // indirect
6868
go.opentelemetry.io/otel/trace v1.26.0 // indirect

receiver/filelogreceiver/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,10 @@ Exactly how this information is serialized depends on the type of storage being
199199
200200
### Tracking symlinked files
201201
If the receiver is being used to track a symlinked file and the symlink target is expected to change frequently, make sure
202-
to set the value of the `poll_interval` setting to something lower than the symlink update frequency.
202+
to set the value of the `poll_interval` setting to something lower than the symlink update frequency.
203+
204+
### Telemetry metrics
205+
Enabling [Collector metrics](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/troubleshooting.md#metrics)
206+
will also provide telemetry metrics for the state of the receiver's file consumption.
207+
Specifically, the `otelcol_fileconsumer_open_files` and `otelcol_fileconsumer_reading_files` metrics
208+
are provided.

0 commit comments

Comments
 (0)