diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index ecc40294c..544f6c107 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -30,13 +30,19 @@ import ( "k8s.io/node-problem-detector/pkg/util/tomb" ) +const ( + reviveRetries = 10 + reviveDuration = 5 * time.Second +) + type kernelLogWatcher struct { cfg types.WatcherConfig startTime time.Time logCh chan *logtypes.Log tomb *tomb.Tomb - kmsgParser kmsgparser.Parser + kmsgParser kmsgparser.Parser + reviveCount int } // NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg @@ -55,22 +61,17 @@ func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher { startTime: startTime, tomb: tomb.NewTomb(), // Arbitrary capacity - logCh: make(chan *logtypes.Log, 100), + logCh: make(chan *logtypes.Log, 100), + reviveCount: 0, } } var _ types.WatcherCreateFunc = NewKmsgWatcher func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) { - if k.kmsgParser == nil { - // nil-check to make mocking easier - parser, err := kmsgparser.NewParser() - if err != nil { - return nil, fmt.Errorf("failed to create kmsg parser: %v", err) - } - k.kmsgParser = parser + if err := k.SetKmsgParser(); err != nil { + return nil, err } - go k.watchLoop() return k.logCh, nil } @@ -99,6 +100,9 @@ func (k *kernelLogWatcher) watchLoop() { return case msg, ok := <-kmsgs: if !ok { + if val, ok := k.cfg.PluginConfig["revive"]; ok && val == "true" { + k.reviveMyself() + } klog.Error("Kmsg channel closed") return } @@ -120,3 +124,35 @@ func (k *kernelLogWatcher) watchLoop() { } } } + +// create a new kmsg parser and sets it to the watcher. +func (k *kernelLogWatcher) SetKmsgParser() error { + parser, err := kmsgparser.NewParser() + if err != nil { + return fmt.Errorf("failed to create kmsg parser: %v", err) + } + k.kmsgParser = parser + return nil +} + +// revive ourselves if the kmsg channel is closed +// close the old kmsg parser and create a new one +// enter the watch loop again +func (k *kernelLogWatcher) reviveMyself() { + // if k.reviveCount >= reviveRetries { + // klog.Errorf("Failed to revive kmsg parser after %d retries", reviveRetries) + // return + // } + // klog.Infof("Reviving kmsg parser, attempt %d of %d", k.reviveCount, reviveRetries) + klog.Infof("Reviving kmsg parser, attempt %d", k.reviveCount) + if err := k.kmsgParser.Close(); err != nil { + klog.Errorf("Failed to close kmsg parser: %v", err) + } + time.Sleep(reviveDuration) + if err := k.SetKmsgParser(); err != nil { + klog.Errorf("Failed to revive kmsg parser: %v", err) + return + } + k.reviveCount++ + k.watchLoop() +}