diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 51bb9deea6..5674593389 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -23,6 +23,7 @@ import ( "fmt" "net" "net/http" + "net/http/pprof" "sync" "sync/atomic" "time" @@ -106,6 +107,9 @@ type controllerManager struct { // Healthz probe handler healthzHandler *healthz.Handler + // pprofListener is used to serve pprof + pprofListener net.Listener + // controllerConfig are the global controller options. controllerConfig config.Controller @@ -343,6 +347,24 @@ func (cm *controllerManager) serveHealthProbes() { go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener) } +func (cm *controllerManager) addPprofServer() error { + mux := http.NewServeMux() + srv := httpserver.New(mux) + + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + return cm.add(&server{ + Kind: "pprof", + Log: cm.logger, + Server: srv, + Listener: cm.pprofListener, + }) +} + func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) { log = log.WithValues("kind", kind, "addr", ln.Addr()) @@ -440,6 +462,13 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { cm.serveHealthProbes() } + // Add pprof server + if cm.pprofListener != nil { + if err := cm.addPprofServer(); err != nil { + return fmt.Errorf("failed to add pprof server: %w", err) + } + } + // First start any webhook servers, which includes conversion, validation, and defaulting // webhooks that are registered. // diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 0f1cfea364..d2a2d2b154 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -233,6 +233,13 @@ type Options struct { // Liveness probe endpoint name, defaults to "healthz" LivenessEndpointName string + // PprofBindAddress is the TCP address that the controller should bind to + // for serving pprof. + // It can be set to "" or "0" to disable the pprof serving. + // Since pprof may contain sensitive information, make sure to protect it + // before exposing it to public. + PprofBindAddress string + // Port is the port that the webhook server serves at. // It is used to set webhook.Server.Port if WebhookServer is not set. Port int @@ -308,6 +315,7 @@ type Options struct { newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) newMetricsListener func(addr string) (net.Listener, error) newHealthProbeListener func(addr string) (net.Listener, error) + newPprofListener func(addr string) (net.Listener, error) } // BaseContextFunc is a function used to provide a base Context to Runnables @@ -417,6 +425,13 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + // Create pprof listener. This will throw an error if the bind + // address is invalid or already in use. + pprofListener, err := options.newPprofListener(options.PprofBindAddress) + if err != nil { + return nil, fmt.Errorf("failed to new pprof listener: %w", err) + } + errChan := make(chan error) runnables := newRunnables(options.BaseContext, errChan) @@ -444,6 +459,7 @@ func New(config *rest.Config, options Options) (Manager, error) { healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, + pprofListener: pprofListener, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), @@ -574,6 +590,19 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) { return ln, nil } +// defaultPprofListener creates the default pprof listener bound to the given address. +func defaultPprofListener(addr string) (net.Listener, error) { + if addr == "" || addr == "0" { + return nil, nil + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("error listening on %s: %w", addr, err) + } + return ln, nil +} + // defaultBaseContext is used as the BaseContext value in Options if one // has not already been set. func defaultBaseContext() context.Context { @@ -634,6 +663,10 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.newPprofListener == nil { + options.newPprofListener = defaultPprofListener + } + if options.GracefulShutdownTimeout == nil { gracefulShutdownTimeout := defaultGracefulShutdownPeriod options.GracefulShutdownTimeout = &gracefulShutdownTimeout diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index ccade8a9b2..2c695cc56f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -148,6 +148,7 @@ var _ = Describe("manger.Manager", func() { LeaderElectionID: "test-leader-election-id-2", HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).To(BeNil()) @@ -193,6 +194,7 @@ var _ = Describe("manger.Manager", func() { LeaderElectionID: "test-leader-election-id-3", HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).To(BeNil()) @@ -227,6 +229,7 @@ var _ = Describe("manger.Manager", func() { }, HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).ToNot(HaveOccurred()) Expect(m1).ToNot(BeNil()) @@ -247,6 +250,7 @@ var _ = Describe("manger.Manager", func() { }, HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).ToNot(HaveOccurred()) Expect(m2).ToNot(BeNil()) @@ -1280,6 +1284,99 @@ var _ = Describe("manger.Manager", func() { }) }) + Context("should start serving pprof", func() { + var listener net.Listener + var opts Options + + BeforeEach(func() { + listener = nil + opts = Options{ + newPprofListener: func(addr string) (net.Listener, error) { + var err error + listener, err = defaultPprofListener(addr) + return listener, err + }, + } + }) + + AfterEach(func() { + if listener != nil { + listener.Close() + } + }) + + It("should stop serving pprof when stop is called", func() { + opts.PprofBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + // Check the pprof started + endpoint := fmt.Sprintf("http://%s", listener.Addr().String()) + _, err = http.Get(endpoint) + Expect(err).NotTo(HaveOccurred()) + + // Shutdown the server + cancel() + + // Expect the pprof server to shutdown + Eventually(func() error { + _, err = http.Get(endpoint) + return err + }, 10*time.Second).ShouldNot(Succeed()) + }) + + It("should serve pprof endpoints", func() { + opts.PprofBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + pprofIndexEndpoint := fmt.Sprintf("http://%s/debug/pprof/", listener.Addr().String()) + resp, err := http.Get(pprofIndexEndpoint) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + pprofCmdlineEndpoint := fmt.Sprintf("http://%s/debug/pprof/cmdline", listener.Addr().String()) + resp, err = http.Get(pprofCmdlineEndpoint) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + pprofProfileEndpoint := fmt.Sprintf("http://%s/debug/pprof/profile", listener.Addr().String()) + resp, err = http.Get(pprofProfileEndpoint) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + pprofSymbolEndpoint := fmt.Sprintf("http://%s/debug/pprof/symbol", listener.Addr().String()) + resp, err = http.Get(pprofSymbolEndpoint) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + pprofTraceEndpoint := fmt.Sprintf("http://%s/debug/pprof/trace", listener.Addr().String()) + resp, err = http.Get(pprofTraceEndpoint) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + }) + }) + Describe("Add", func() { It("should immediately start the Component if the Manager has already Started another Component", func() { diff --git a/pkg/manager/server.go b/pkg/manager/server.go new file mode 100644 index 0000000000..b6509f48f2 --- /dev/null +++ b/pkg/manager/server.go @@ -0,0 +1,61 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "errors" + "net" + "net/http" + + "github.com/go-logr/logr" +) + +// server is a general purpose HTTP server Runnable for a manager +// to serve some internal handlers such as health probes, metrics and profiling. +type server struct { + Kind string + Log logr.Logger + Server *http.Server + Listener net.Listener +} + +func (s *server) Start(ctx context.Context) error { + log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr()) + + serverShutdown := make(chan struct{}) + go func() { + <-ctx.Done() + log.Info("shutting down server") + if err := s.Server.Shutdown(context.Background()); err != nil { + log.Error(err, "error shutting down server") + } + close(serverShutdown) + }() + + log.Info("starting server") + if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + + <-serverShutdown + return nil +} + +func (s *server) NeedLeaderElection() bool { + return false +}