Skip to content

Commit 6fc3229

Browse files
committed
Add health gRPC server and refactors main()
- Introduced a health gRPC server to handle liveness and readiness probes. - Refactored main() to manage server goroutines. - Added graceful shutdown for servers and controller manager. - Improved logging consistency and ensured. - Validates CLI flags. Signed-off-by: Daneyon Hansen <[email protected]>
1 parent f1dda9f commit 6fc3229

File tree

3 files changed

+198
-72
lines changed

3 files changed

+198
-72
lines changed

pkg/ext-proc/health.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"google.golang.org/grpc/codes"
8+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
9+
"google.golang.org/grpc/status"
10+
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
11+
klog "k8s.io/klog/v2"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
)
14+
15+
type healthServer struct {
16+
client.Client
17+
}
18+
19+
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
20+
if err := s.checkResources(); err != nil {
21+
klog.Infof("gRPC health check not serving: %s", in.String())
22+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
23+
}
24+
klog.Infof("gRPC health check serving: %s", in.String())
25+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
26+
}
27+
28+
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
29+
return status.Error(codes.Unimplemented, "Watch is not implemented")
30+
}
31+
32+
// checkResources uses a client to list all InferenceModels in the configured namespace
33+
// and gets the configured InferencePool by name and namespace. If any client calls fail,
34+
// no InferenceModels exist, or the InferencePool does not exist, an error is returned.
35+
func (s *healthServer) checkResources() error {
36+
ctx := context.Background()
37+
var infPool v1alpha1.InferencePool
38+
if err := s.Client.Get(
39+
ctx,
40+
client.ObjectKey{Name: *poolName, Namespace: *poolNamespace},
41+
&infPool,
42+
); err != nil {
43+
return fmt.Errorf("failed to get InferencePool %s/%s: %v", *poolNamespace, *poolName, err)
44+
}
45+
klog.Infof("Successfully retrieved InferencePool %s/%s", *poolNamespace, *poolName)
46+
47+
var modelList v1alpha1.InferenceModelList
48+
if err := s.Client.List(ctx, &modelList, client.InNamespace(*poolNamespace)); err != nil {
49+
return fmt.Errorf("failed to list InferenceModels in namespace %s: %v", *poolNamespace, err)
50+
}
51+
52+
// Ensure at least 1 InferenceModel
53+
if len(modelList.Items) == 0 {
54+
return fmt.Errorf("no InferenceModels exist in namespace %s", *poolNamespace)
55+
}
56+
klog.Infof("Found %d InferenceModels in namespace %s", len(modelList.Items), *poolNamespace)
57+
58+
return nil
59+
}

pkg/ext-proc/main.go

+122-71
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package main
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"net"
8-
"os"
9-
"os/signal"
10-
"syscall"
117
"time"
128

139
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1410
"google.golang.org/grpc"
15-
"google.golang.org/grpc/codes"
1611
healthPb "google.golang.org/grpc/health/grpc_health_v1"
17-
"google.golang.org/grpc/status"
1812
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1913
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2014
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
@@ -29,10 +23,14 @@ import (
2923
)
3024

3125
var (
32-
port = flag.Int(
33-
"port",
26+
grpcPort = flag.Int(
27+
"grpcPort",
3428
9002,
35-
"gRPC port")
29+
"The gRPC port used for communicating with Envoy proxy")
30+
grpcHealthPort = flag.Int(
31+
"grpcHealthPort",
32+
9003,
33+
"The port used for gRPC liveness and readiness probes")
3634
targetPodHeader = flag.String(
3735
"targetPodHeader",
3836
"target-pod",
@@ -65,55 +63,39 @@ var (
6563
scheme = runtime.NewScheme()
6664
)
6765

68-
type healthServer struct{}
69-
70-
func (s *healthServer) Check(
71-
ctx context.Context,
72-
in *healthPb.HealthCheckRequest,
73-
) (*healthPb.HealthCheckResponse, error) {
74-
klog.Infof("Handling grpc Check request + %s", in.String())
75-
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
76-
}
77-
78-
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
79-
return status.Error(codes.Unimplemented, "Watch is not implemented")
80-
}
81-
8266
func init() {
8367
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
8468
utilruntime.Must(v1alpha1.AddToScheme(scheme))
8569
}
8670

8771
func main() {
88-
8972
klog.InitFlags(nil)
9073
flag.Parse()
9174

9275
ctrl.SetLogger(klog.TODO())
9376

77+
// Validate flags
78+
if err := validateFlags(); err != nil {
79+
klog.Fatalf("flag validation failed: %v", err)
80+
}
81+
9482
// Print all flag values
9583
flags := "Flags: "
9684
flag.VisitAll(func(f *flag.Flag) {
9785
flags += fmt.Sprintf("%s=%v; ", f.Name, f.Value)
9886
})
9987
klog.Info(flags)
10088

101-
klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port))
102-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
89+
// Create a new manager to manage controllers
90+
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
10391
if err != nil {
104-
klog.Fatalf("failed to listen: %v", err)
92+
klog.Fatalf("failed to start manager: %v", err)
10593
}
10694

95+
// Create the data store used to cache watched resources
10796
datastore := backend.NewK8sDataStore()
10897

109-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
110-
Scheme: scheme,
111-
})
112-
if err != nil {
113-
klog.Error(err, "unable to start manager")
114-
os.Exit(1)
115-
}
116-
98+
// Create the controllers and register them with the manager
11799
if err := (&backend.InferencePoolReconciler{
118100
Datastore: datastore,
119101
Scheme: mgr.GetScheme(),
@@ -124,7 +106,7 @@ func main() {
124106
},
125107
Record: mgr.GetEventRecorderFor("InferencePool"),
126108
}).SetupWithManager(mgr); err != nil {
127-
klog.Error(err, "Error setting up InferencePoolReconciler")
109+
klog.Fatalf("Error setting up InferencePoolReconciler: %v", err)
128110
}
129111

130112
if err := (&backend.InferenceModelReconciler{
@@ -137,7 +119,7 @@ func main() {
137119
},
138120
Record: mgr.GetEventRecorderFor("InferenceModel"),
139121
}).SetupWithManager(mgr); err != nil {
140-
klog.Error(err, "Error setting up InferenceModelReconciler")
122+
klog.Fatalf("Error setting up InferenceModelReconciler: %v", err)
141123
}
142124

143125
if err := (&backend.EndpointSliceReconciler{
@@ -148,53 +130,122 @@ func main() {
148130
ServiceName: *serviceName,
149131
Zone: *zone,
150132
}).SetupWithManager(mgr); err != nil {
151-
klog.Error(err, "Error setting up EndpointSliceReconciler")
133+
klog.Fatalf("Error setting up EndpointSliceReconciler: %v", err)
134+
}
135+
136+
// Channel to handle error signals for goroutines
137+
errChan := make(chan error, 1)
138+
139+
// Start each component in its own goroutine
140+
startControllerManager(mgr, errChan)
141+
healthSvr := startHealthServer(mgr, errChan, *grpcHealthPort)
142+
extProcSvr := startExternalProcessorServer(
143+
errChan,
144+
datastore,
145+
*grpcPort,
146+
*refreshPodsInterval,
147+
*refreshMetricsInterval,
148+
*targetPodHeader,
149+
)
150+
151+
// Wait for first error from any goroutine
152+
err = <-errChan
153+
if err != nil {
154+
klog.Errorf("goroutine failed: %v", err)
155+
} else {
156+
klog.Infof("Manager exited gracefully")
152157
}
153158

154-
errChan := make(chan error)
159+
// Gracefully shutdown components
160+
if healthSvr != nil {
161+
klog.Info("Health server shutting down...")
162+
healthSvr.GracefulStop()
163+
}
164+
if extProcSvr != nil {
165+
klog.Info("Ext-proc server shutting down...")
166+
extProcSvr.GracefulStop()
167+
}
168+
169+
klog.Info("All components stopped gracefully")
170+
}
171+
172+
// startControllerManager runs the controller manager in a goroutine.
173+
func startControllerManager(mgr ctrl.Manager, errChan chan<- error) {
155174
go func() {
175+
// Blocking and will return when shutdown is complete.
156176
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
157-
klog.Error(err, "Error running manager")
158-
errChan <- err
177+
errChan <- fmt.Errorf("controller manager failed to start: %w", err)
159178
}
179+
// Manager exited gracefully
180+
klog.Info("Controller manager shutting down...")
181+
errChan <- nil
160182
}()
183+
}
161184

162-
s := grpc.NewServer()
185+
// startHealthServer starts the gRPC health probe server in a goroutine.
186+
func startHealthServer(mgr ctrl.Manager, errChan chan<- error, port int) *grpc.Server {
187+
healthSvr := grpc.NewServer()
188+
healthPb.RegisterHealthServer(healthSvr, &healthServer{Client: mgr.GetClient()})
163189

164-
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
165-
if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil {
166-
klog.Fatalf("failed to initialize: %v", err)
167-
}
168-
extProcPb.RegisterExternalProcessorServer(
169-
s,
170-
handlers.NewServer(
171-
pp,
172-
scheduling.NewScheduler(pp),
173-
*targetPodHeader,
174-
datastore))
175-
healthPb.RegisterHealthServer(s, &healthServer{})
176-
177-
klog.Infof("Starting gRPC server on port :%v", *port)
178-
179-
// shutdown
180-
var gracefulStop = make(chan os.Signal, 1)
181-
signal.Notify(gracefulStop, syscall.SIGTERM)
182-
signal.Notify(gracefulStop, syscall.SIGINT)
183190
go func() {
184-
select {
185-
case sig := <-gracefulStop:
186-
klog.Infof("caught sig: %+v", sig)
187-
os.Exit(0)
188-
case err := <-errChan:
189-
klog.Infof("caught error in controller: %+v", err)
190-
os.Exit(0)
191+
healthLis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
192+
if err != nil {
193+
errChan <- fmt.Errorf("health server failed to listen: %w", err)
191194
}
195+
klog.Infof("Health server listening on port: %d", port)
192196

197+
// Blocking and will return when shutdown is complete.
198+
if serveErr := healthSvr.Serve(healthLis); serveErr != nil && serveErr != grpc.ErrServerStopped {
199+
errChan <- fmt.Errorf("health server failed: %w", serveErr)
200+
}
193201
}()
202+
return healthSvr
203+
}
194204

195-
err = s.Serve(lis)
196-
if err != nil {
197-
klog.Fatalf("Ext-proc failed with the err: %v", err)
205+
// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
206+
func startExternalProcessorServer(
207+
errChan chan<- error,
208+
datastore *backend.K8sDatastore,
209+
port int,
210+
refreshPodsInterval, refreshMetricsInterval time.Duration,
211+
targetPodHeader string,
212+
) *grpc.Server {
213+
extSvr := grpc.NewServer()
214+
go func() {
215+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
216+
if err != nil {
217+
errChan <- fmt.Errorf("ext-proc server failed to listen: %w", err)
218+
}
219+
klog.Infof("Ext-proc server listening on port: %d", port)
220+
221+
// Initialize backend provider
222+
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
223+
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
224+
errChan <- fmt.Errorf("failed to initialize backend provider: %w", err)
225+
}
226+
227+
// Register ext_proc handlers
228+
extProcPb.RegisterExternalProcessorServer(
229+
extSvr,
230+
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
231+
)
232+
233+
// Blocking and will return when shutdown is complete.
234+
if serveErr := extSvr.Serve(lis); serveErr != nil && serveErr != grpc.ErrServerStopped {
235+
errChan <- fmt.Errorf("ext-proc server failed: %w", serveErr)
236+
}
237+
}()
238+
return extSvr
239+
}
240+
241+
func validateFlags() error {
242+
if *poolName == "" {
243+
return fmt.Errorf("required %q flag not set", "poolName")
244+
}
245+
246+
if *serviceName == "" {
247+
return fmt.Errorf("required %q flag not set", "serviceName")
198248
}
199249

250+
return nil
200251
}

pkg/manifests/ext_proc.yaml

+17-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ roleRef:
2828
kind: ClusterRole
2929
name: pod-read
3030
---
31-
3231
apiVersion: apps/v1
3332
kind: Deployment
3433
metadata:
@@ -57,8 +56,25 @@ spec:
5756
- "3"
5857
- -serviceName
5958
- "vllm-llama2-7b-pool"
59+
- -grpcPort
60+
- "9002"
61+
- -grpcHealthPort
62+
- "9003"
6063
ports:
6164
- containerPort: 9002
65+
- containerPort: 9003
66+
livenessProbe:
67+
grpc:
68+
port: 9003
69+
service: inference-extension
70+
initialDelaySeconds: 5
71+
periodSeconds: 10
72+
readinessProbe:
73+
grpc:
74+
port: 9003
75+
service: inference-extension
76+
initialDelaySeconds: 5
77+
periodSeconds: 10
6278
---
6379
apiVersion: v1
6480
kind: Service

0 commit comments

Comments
 (0)