Skip to content

Commit f46c9ec

Browse files
committed
Add health gRPC server and refactors main()
- Introduced a health gRPC server to handle liveness and readiness probes. - Refactored main() to manage server goroutines. - Added graceful shutdown for servers and controller manager. - Improved logging consistency and ensured. - Validates CLI flags. Signed-off-by: Daneyon Hansen <[email protected]>
1 parent 38cddf0 commit f46c9ec

File tree

3 files changed

+198
-73
lines changed

3 files changed

+198
-73
lines changed

pkg/ext-proc/health.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"google.golang.org/grpc/codes"
8+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
9+
"google.golang.org/grpc/status"
10+
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
11+
klog "k8s.io/klog/v2"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
)
14+
15+
type healthServer struct {
16+
client.Client
17+
}
18+
19+
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
20+
if err := s.checkResources(); err != nil {
21+
klog.Infof("gRPC health check not serving: %s", in.String())
22+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
23+
}
24+
klog.Infof("gRPC health check serving: %s", in.String())
25+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
26+
}
27+
28+
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
29+
return status.Error(codes.Unimplemented, "Watch is not implemented")
30+
}
31+
32+
// checkResources uses a client to list all InferenceModels in the configured namespace
33+
// and gets the configured InferencePool by name and namespace. If any client calls fail,
34+
// no InferenceModels exist, or the InferencePool does not exist, an error is returned.
35+
func (s *healthServer) checkResources() error {
36+
ctx := context.Background()
37+
var infPool v1alpha1.InferencePool
38+
if err := s.Client.Get(
39+
ctx,
40+
client.ObjectKey{Name: *poolName, Namespace: *poolNamespace},
41+
&infPool,
42+
); err != nil {
43+
return fmt.Errorf("failed to get InferencePool %s/%s: %v", *poolNamespace, *poolName, err)
44+
}
45+
klog.Infof("Successfully retrieved InferencePool %s/%s", *poolNamespace, *poolName)
46+
47+
var modelList v1alpha1.InferenceModelList
48+
if err := s.Client.List(ctx, &modelList, client.InNamespace(*poolNamespace)); err != nil {
49+
return fmt.Errorf("failed to list InferenceModels in namespace %s: %v", *poolNamespace, err)
50+
}
51+
52+
// Ensure at least 1 InferenceModel
53+
if len(modelList.Items) == 0 {
54+
return fmt.Errorf("no InferenceModels exist in namespace %s", *poolNamespace)
55+
}
56+
klog.Infof("Found %d InferenceModels in namespace %s", len(modelList.Items), *poolNamespace)
57+
58+
return nil
59+
}

pkg/ext-proc/main.go

+122-71
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package main
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"net"
8-
"os"
9-
"os/signal"
10-
"syscall"
117
"time"
128

139
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1410
"google.golang.org/grpc"
15-
"google.golang.org/grpc/codes"
1611
healthPb "google.golang.org/grpc/health/grpc_health_v1"
17-
"google.golang.org/grpc/status"
1812
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1913
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2014
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
@@ -28,10 +22,14 @@ import (
2822
)
2923

3024
var (
31-
port = flag.Int(
32-
"port",
25+
grpcPort = flag.Int(
26+
"grpcPort",
3327
9002,
34-
"gRPC port")
28+
"The gRPC port used for communicating with Envoy proxy")
29+
grpcHealthPort = flag.Int(
30+
"grpcHealthPort",
31+
9003,
32+
"The port used for gRPC liveness and readiness probes")
3533
targetPodHeader = flag.String(
3634
"targetPodHeader",
3735
"target-pod",
@@ -64,55 +62,39 @@ var (
6462
scheme = runtime.NewScheme()
6563
)
6664

67-
type healthServer struct{}
68-
69-
func (s *healthServer) Check(
70-
ctx context.Context,
71-
in *healthPb.HealthCheckRequest,
72-
) (*healthPb.HealthCheckResponse, error) {
73-
klog.Infof("Handling grpc Check request + %s", in.String())
74-
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
75-
}
76-
77-
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
78-
return status.Error(codes.Unimplemented, "Watch is not implemented")
79-
}
80-
8165
func init() {
8266
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
8367
utilruntime.Must(v1alpha1.AddToScheme(scheme))
8468
}
8569

8670
func main() {
87-
8871
klog.InitFlags(nil)
8972
flag.Parse()
9073

9174
ctrl.SetLogger(klog.TODO())
9275

76+
// Validate flags
77+
if err := validateFlags(); err != nil {
78+
klog.Fatalf("flag validation failed: %v", err)
79+
}
80+
9381
// Print all flag values
9482
flags := "Flags: "
9583
flag.VisitAll(func(f *flag.Flag) {
9684
flags += fmt.Sprintf("%s=%v; ", f.Name, f.Value)
9785
})
9886
klog.Info(flags)
9987

100-
klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port))
101-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
88+
// Create a new manager to manage controllers
89+
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
10290
if err != nil {
103-
klog.Fatalf("failed to listen: %v", err)
91+
klog.Fatalf("failed to start manager: %v", err)
10492
}
10593

94+
// Create the data store used to cache watched resources
10695
datastore := backend.NewK8sDataStore()
10796

108-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
109-
Scheme: scheme,
110-
})
111-
if err != nil {
112-
klog.Error(err, "unable to start manager")
113-
os.Exit(1)
114-
}
115-
97+
// Create the controllers and register them with the manager
11698
if err := (&backend.InferencePoolReconciler{
11799
Datastore: datastore,
118100
Scheme: mgr.GetScheme(),
@@ -121,7 +103,7 @@ func main() {
121103
PoolNamespace: *poolNamespace,
122104
Record: mgr.GetEventRecorderFor("InferencePool"),
123105
}).SetupWithManager(mgr); err != nil {
124-
klog.Error(err, "Error setting up InferencePoolReconciler")
106+
klog.Fatalf("Error setting up InferencePoolReconciler: %v", err)
125107
}
126108

127109
if err := (&backend.InferenceModelReconciler{
@@ -132,7 +114,7 @@ func main() {
132114
PoolNamespace: *poolNamespace,
133115
Record: mgr.GetEventRecorderFor("InferenceModel"),
134116
}).SetupWithManager(mgr); err != nil {
135-
klog.Error(err, "Error setting up InferenceModelReconciler")
117+
klog.Fatalf("Error setting up InferenceModelReconciler: %v", err)
136118
}
137119

138120
if err := (&backend.EndpointSliceReconciler{
@@ -143,53 +125,122 @@ func main() {
143125
ServiceName: *serviceName,
144126
Zone: *zone,
145127
}).SetupWithManager(mgr); err != nil {
146-
klog.Error(err, "Error setting up EndpointSliceReconciler")
128+
klog.Fatalf("Error setting up EndpointSliceReconciler: %v", err)
129+
}
130+
131+
// Channel to handle error signals for goroutines
132+
errChan := make(chan error, 1)
133+
134+
// Start each component in its own goroutine
135+
startControllerManager(mgr, errChan)
136+
healthSvr := startHealthServer(mgr, errChan, *grpcHealthPort)
137+
extProcSvr := startExternalProcessorServer(
138+
errChan,
139+
datastore,
140+
*grpcPort,
141+
*refreshPodsInterval,
142+
*refreshMetricsInterval,
143+
*targetPodHeader,
144+
)
145+
146+
// Wait for first error from any goroutine
147+
err = <-errChan
148+
if err != nil {
149+
klog.Errorf("goroutine failed: %v", err)
150+
} else {
151+
klog.Infof("Manager exited gracefully")
147152
}
148153

149-
errChan := make(chan error)
154+
// Gracefully shutdown components
155+
if healthSvr != nil {
156+
klog.Info("Health server shutting down...")
157+
healthSvr.GracefulStop()
158+
}
159+
if extProcSvr != nil {
160+
klog.Info("Ext-proc server shutting down...")
161+
extProcSvr.GracefulStop()
162+
}
163+
164+
klog.Info("All components stopped gracefully")
165+
}
166+
167+
// startControllerManager runs the controller manager in a goroutine.
168+
func startControllerManager(mgr ctrl.Manager, errChan chan<- error) {
150169
go func() {
170+
// Blocking and will return when shutdown is complete.
151171
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
152-
klog.Error(err, "Error running manager")
153-
errChan <- err
172+
errChan <- fmt.Errorf("controller manager failed to start: %w", err)
154173
}
174+
// Manager exited gracefully
175+
klog.Info("Controller manager shutting down...")
176+
errChan <- nil
155177
}()
178+
}
156179

157-
s := grpc.NewServer()
180+
// startHealthServer starts the gRPC health probe server in a goroutine.
181+
func startHealthServer(mgr ctrl.Manager, errChan chan<- error, port int) *grpc.Server {
182+
healthSvr := grpc.NewServer()
183+
healthPb.RegisterHealthServer(healthSvr, &healthServer{Client: mgr.GetClient()})
158184

159-
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
160-
if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil {
161-
klog.Fatalf("failed to initialize: %v", err)
162-
}
163-
extProcPb.RegisterExternalProcessorServer(
164-
s,
165-
handlers.NewServer(
166-
pp,
167-
scheduling.NewScheduler(pp),
168-
*targetPodHeader,
169-
datastore))
170-
healthPb.RegisterHealthServer(s, &healthServer{})
171-
172-
klog.Infof("Starting gRPC server on port :%v", *port)
173-
174-
// shutdown
175-
var gracefulStop = make(chan os.Signal, 1)
176-
signal.Notify(gracefulStop, syscall.SIGTERM)
177-
signal.Notify(gracefulStop, syscall.SIGINT)
178185
go func() {
179-
select {
180-
case sig := <-gracefulStop:
181-
klog.Infof("caught sig: %+v", sig)
182-
os.Exit(0)
183-
case err := <-errChan:
184-
klog.Infof("caught error in controller: %+v", err)
185-
os.Exit(0)
186+
healthLis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
187+
if err != nil {
188+
errChan <- fmt.Errorf("health server failed to listen: %w", err)
186189
}
190+
klog.Infof("Health server listening on port: %d", port)
187191

192+
// Blocking and will return when shutdown is complete.
193+
if serveErr := healthSvr.Serve(healthLis); serveErr != nil && serveErr != grpc.ErrServerStopped {
194+
errChan <- fmt.Errorf("health server failed: %w", serveErr)
195+
}
188196
}()
197+
return healthSvr
198+
}
189199

190-
err = s.Serve(lis)
191-
if err != nil {
192-
klog.Fatalf("Ext-proc failed with the err: %v", err)
200+
// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
201+
func startExternalProcessorServer(
202+
errChan chan<- error,
203+
datastore *backend.K8sDatastore,
204+
port int,
205+
refreshPodsInterval, refreshMetricsInterval time.Duration,
206+
targetPodHeader string,
207+
) *grpc.Server {
208+
extSvr := grpc.NewServer()
209+
go func() {
210+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
211+
if err != nil {
212+
errChan <- fmt.Errorf("ext-proc server failed to listen: %w", err)
213+
}
214+
klog.Infof("Ext-proc server listening on port: %d", port)
215+
216+
// Initialize backend provider
217+
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
218+
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
219+
errChan <- fmt.Errorf("failed to initialize backend provider: %w", err)
220+
}
221+
222+
// Register ext_proc handlers
223+
extProcPb.RegisterExternalProcessorServer(
224+
extSvr,
225+
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
226+
)
227+
228+
// Blocking and will return when shutdown is complete.
229+
if serveErr := extSvr.Serve(lis); serveErr != nil && serveErr != grpc.ErrServerStopped {
230+
errChan <- fmt.Errorf("ext-proc server failed: %w", serveErr)
231+
}
232+
}()
233+
return extSvr
234+
}
235+
236+
func validateFlags() error {
237+
if *poolName == "" {
238+
return fmt.Errorf("required %q flag not set", "poolName")
239+
}
240+
241+
if *serviceName == "" {
242+
return fmt.Errorf("required %q flag not set", "serviceName")
193243
}
194244

245+
return nil
195246
}

pkg/manifests/ext_proc.yaml

+17-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ roleRef:
2828
kind: ClusterRole
2929
name: pod-read
3030
---
31-
3231
apiVersion: apps/v1
3332
kind: Deployment
3433
metadata:
@@ -57,9 +56,25 @@ spec:
5756
- "3"
5857
- -serviceName
5958
- "vllm-llama2-7b-pool"
59+
- -grpcPort
60+
- "9002"
61+
- -grpcHealthPort
62+
- "9003"
6063
ports:
6164
- containerPort: 9002
62-
65+
- containerPort: 9003
66+
livenessProbe:
67+
grpc:
68+
port: 9003
69+
service: inference-extension
70+
initialDelaySeconds: 5
71+
periodSeconds: 10
72+
readinessProbe:
73+
grpc:
74+
port: 9003
75+
service: inference-extension
76+
initialDelaySeconds: 5
77+
periodSeconds: 10
6378
- name: curl
6479
image: curlimages/curl
6580
command: ["sleep", "3600"]

0 commit comments

Comments
 (0)