Skip to content

Commit 0f5db9e

Browse files
authored
Merge pull request #94 from Random-Liu/add-multiple-log-monitor-support
Add multiple system log monitor support
2 parents 92e67b8 + 889d9ef commit 0f5db9e

File tree

10 files changed

+160
-15
lines changed

10 files changed

+160
-15
lines changed

Dockerfile.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ RUN test -h /etc/localtime && rm -f /etc/localtime && cp /usr/share/zoneinfo/UTC
2020

2121
ADD ./bin/node-problem-detector /node-problem-detector
2222
ADD config /config
23-
ENTRYPOINT ["/node-problem-detector", "--system-log-monitor=/config/kernel-monitor.json"]
23+
ENTRYPOINT ["/node-problem-detector", "--system-log-monitors=/config/kernel-monitor.json"]

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ List of supported problem daemons:
5454
# Usage
5555
## Flags
5656
* `--version`: Print current version of node-problem-detector.
57-
* `--system-log-monitor`: The configuration used by the system log monitor, e.g.
57+
* `--system-log-monitors`: List of paths to system log monitor configuration files, comma separated, e.g.
5858
[config/kernel-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json).
59+
Node problem detector will start a separate log monitor for each configuration. You can
60+
use different log monitors to monitor different system log.
5961
* `--apiserver-override`: A URI parameter used to customize how node-problem-detector
6062
connects the apiserver. The format is same as the
6163
[`source`](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes)

cmd/node_problem_detector.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,17 @@ func main() {
6767
os.Exit(0)
6868
}
6969

70-
l := systemlogmonitor.NewLogMonitorOrDie(npdo.SystemLogMonitorConfigPath)
70+
monitors := make(map[string]systemlogmonitor.LogMonitor)
71+
for _, config := range npdo.SystemLogMonitorConfigPaths {
72+
if _, ok := monitors[config]; ok {
73+
// Skip the config if it's duplictaed.
74+
glog.Warningf("Duplicated log monitor configuration %q", config)
75+
continue
76+
}
77+
monitors[config] = systemlogmonitor.NewLogMonitorOrDie(config)
78+
}
7179
c := problemclient.NewClientOrDie(npdo)
72-
p := problemdetector.NewProblemDetector(l, c)
80+
p := problemdetector.NewProblemDetector(monitors, c)
7381

7482
// Start http server.
7583
if npdo.ServerPort > 0 {

pkg/options/options.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ import (
3030
type NodeProblemDetectorOptions struct {
3131
// command line options
3232

33-
// SystemLogMonitorConfigPath specifies the path to system log monitor configuration file.
34-
SystemLogMonitorConfigPath string
33+
// SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration
34+
// files.
35+
SystemLogMonitorConfigPaths []string
3536
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
3637
ApiServerOverride string
3738
// PrintVersion is the flag determining whether version information is printed.
@@ -55,8 +56,8 @@ func NewNodeProblemDetectorOptions() *NodeProblemDetectorOptions {
5556

5657
// AddFlags adds node problem detector command line options to pflag.
5758
func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
58-
fs.StringVar(&npdo.SystemLogMonitorConfigPath, "system-log-monitor",
59-
"/config/kernel-monitor.json", "The path to the system log monitor config file")
59+
fs.StringSliceVar(&npdo.SystemLogMonitorConfigPaths, "system-log-monitors",
60+
[]string{}, "List of paths to system log monitor config files, comma separated.")
6061
fs.StringVar(&npdo.ApiServerOverride, "apiserver-override",
6162
"", "Custom URI used to connect to Kubernetes ApiServer")
6263
fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit")

pkg/problemdetector/problem_detector.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package problemdetector
1818

1919
import (
20+
"fmt"
2021
"net/http"
2122

2223
"github.com/golang/glog"
@@ -26,6 +27,7 @@ import (
2627
"k8s.io/node-problem-detector/pkg/condition"
2728
"k8s.io/node-problem-detector/pkg/problemclient"
2829
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
30+
"k8s.io/node-problem-detector/pkg/types"
2931
"k8s.io/node-problem-detector/pkg/util"
3032
)
3133

@@ -38,28 +40,39 @@ type ProblemDetector interface {
3840
type problemDetector struct {
3941
client problemclient.Client
4042
conditionManager condition.ConditionManager
41-
// TODO(random-liu): Use slices of problem daemons if multiple monitors are needed in the future
42-
monitor systemlogmonitor.LogMonitor
43+
monitors map[string]systemlogmonitor.LogMonitor
4344
}
4445

4546
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
4647
// in the future we may want to let the problem daemons register themselves.
47-
func NewProblemDetector(monitor systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector {
48+
func NewProblemDetector(monitors map[string]systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector {
4849
return &problemDetector{
4950
client: client,
5051
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
51-
monitor: monitor,
52+
monitors: monitors,
5253
}
5354
}
5455

5556
// Run starts the problem detector.
5657
func (p *problemDetector) Run() error {
5758
p.conditionManager.Start()
58-
ch, err := p.monitor.Start()
59-
if err != nil {
60-
return err
59+
// Start the log monitors one by one.
60+
var chans []<-chan *types.Status
61+
for cfg, m := range p.monitors {
62+
ch, err := m.Start()
63+
if err != nil {
64+
// Do not return error and keep on trying the following config files.
65+
glog.Errorf("Failed to start log monitor %q: %v", cfg, err)
66+
continue
67+
}
68+
chans = append(chans, ch)
69+
}
70+
if len(chans) == 0 {
71+
return fmt.Errorf("no log montior is successfully setup")
6172
}
73+
ch := groupChannel(chans)
6274
glog.Info("Problem detector started")
75+
6376
for {
6477
select {
6578
case status := <-ch:
@@ -80,3 +93,15 @@ func (p *problemDetector) RegisterHTTPHandlers() {
8093
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
8194
})
8295
}
96+
97+
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
98+
statuses := make(chan *types.Status)
99+
for _, ch := range chans {
100+
go func(c <-chan *types.Status) {
101+
for status := range c {
102+
statuses <- status
103+
}
104+
}(ch)
105+
}
106+
return statuses
107+
}

pkg/systemlogmonitor/log_monitor_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@ limitations under the License.
1717
package systemlogmonitor
1818

1919
import (
20+
"fmt"
2021
"reflect"
22+
"runtime"
2123
"testing"
2224
"time"
2325

26+
"github.com/stretchr/testify/assert"
27+
28+
watchertest "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/testing"
2429
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
2530
"k8s.io/node-problem-detector/pkg/types"
2631
)
@@ -131,3 +136,13 @@ func TestGenerateStatus(t *testing.T) {
131136
}
132137
}
133138
}
139+
140+
func TestGoroutineLeak(t *testing.T) {
141+
orignal := runtime.NumGoroutine()
142+
f := watchertest.NewFakeLogWatcher(10)
143+
f.InjectError(fmt.Errorf("unexpected error"))
144+
l := &logMonitor{watcher: f}
145+
_, err := l.Start()
146+
assert.Error(t, err)
147+
assert.Equal(t, orignal, runtime.NumGoroutine())
148+
}

pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package filelog
1919
import (
2020
"io/ioutil"
2121
"os"
22+
"runtime"
2223
"testing"
2324
"time"
2425

@@ -170,3 +171,16 @@ Jan 2 03:04:05 kernel: [2.000000] 3
170171
}
171172
}
172173
}
174+
175+
func TestGoroutineLeak(t *testing.T) {
176+
orignal := runtime.NumGoroutine()
177+
w := NewSyslogWatcherOrDie(types.WatcherConfig{
178+
Plugin: "filelog",
179+
PluginConfig: getTestPluginConfig(),
180+
LogPath: "/not/exist/path",
181+
Lookback: "10m",
182+
})
183+
_, err := w.Watch()
184+
assert.Error(t, err)
185+
assert.Equal(t, orignal, runtime.NumGoroutine())
186+
}

pkg/systemlogmonitor/logwatchers/journald/log_watcher.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package journald
2020

2121
import (
2222
"fmt"
23+
"os"
2324
"strings"
2425
"time"
2526

@@ -134,6 +135,11 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
134135
if err != nil {
135136
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err)
136137
}
138+
// If the path doesn't present, NewJournalFromDir will create it instead of
139+
// returning error. So check the path existence ourselves.
140+
if _, err := os.Stat(path); err != nil {
141+
return nil, fmt.Errorf("failed to stat the log path %q: %v", path, err)
142+
}
137143
// Get journal client from the log path.
138144
journal, err := sdjournal.NewJournalFromDir(path)
139145
if err != nil {

pkg/systemlogmonitor/logwatchers/journald/log_watcher_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ limitations under the License.
1919
package journald
2020

2121
import (
22+
"runtime"
2223
"testing"
2324
"time"
2425

2526
"github.com/coreos/go-systemd/sdjournal"
2627
"github.com/stretchr/testify/assert"
2728

29+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
2830
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
2931
)
3032

@@ -62,3 +64,16 @@ func TestTranslate(t *testing.T) {
6264
assert.Equal(t, test.log, translate(test.entry))
6365
}
6466
}
67+
68+
func TestGoroutineLeak(t *testing.T) {
69+
orignal := runtime.NumGoroutine()
70+
w := NewJournaldWatcher(types.WatcherConfig{
71+
Plugin: "journald",
72+
PluginConfig: map[string]string{"source": "not-exist-service"},
73+
LogPath: "/not/exist/path",
74+
Lookback: "10m",
75+
})
76+
_, err := w.Watch()
77+
assert.Error(t, err)
78+
assert.Equal(t, orignal, runtime.NumGoroutine())
79+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package testing
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
23+
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
24+
)
25+
26+
// FakeLogWatcher is a fake mock of log watcher.
27+
type FakeLogWatcher struct {
28+
sync.Mutex
29+
buf chan *logtypes.Log
30+
err error
31+
}
32+
33+
var _ types.LogWatcher = &FakeLogWatcher{}
34+
35+
func NewFakeLogWatcher(bufferSize int) *FakeLogWatcher {
36+
return &FakeLogWatcher{buf: make(chan *logtypes.Log, bufferSize)}
37+
}
38+
39+
// InjectLog injects a fake log into the watch channel
40+
func (f *FakeLogWatcher) InjectLog(log *logtypes.Log) {
41+
f.buf <- log
42+
}
43+
44+
// InjectError injects an error of Watch function.
45+
func (f *FakeLogWatcher) InjectError(err error) {
46+
f.Lock()
47+
defer f.Unlock()
48+
f.err = err
49+
}
50+
51+
// Watch is the fake watch function.
52+
func (f *FakeLogWatcher) Watch() (<-chan *logtypes.Log, error) {
53+
return f.buf, f.err
54+
}
55+
56+
// Stop is the fake stop function.
57+
func (f *FakeLogWatcher) Stop() {
58+
close(f.buf)
59+
}

0 commit comments

Comments
 (0)