Skip to content

Commit 1b1d139

Browse files
authored
Adds Health gRPC Server and Refactors Main() (#148)
* Add health gRPC server and refactors main() - Introduced a health gRPC server to handle liveness and readiness probes. - Refactored main() to manage server goroutines. - Added graceful shutdown for servers and controller manager. - Improved logging consistency and ensured. - Validates CLI flags. Signed-off-by: Daneyon Hansen <[email protected]> * Refactors health server to use data store Signed-off-by: Daneyon Hansen <[email protected]> --------- Signed-off-by: Daneyon Hansen <[email protected]>
1 parent f1dda9f commit 1b1d139

File tree

5 files changed

+199
-75
lines changed

5 files changed

+199
-75
lines changed

pkg/ext-proc/backend/datastore.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
5252
func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
5353
ds.poolMu.RLock()
5454
defer ds.poolMu.RUnlock()
55-
if ds.inferencePool == nil {
56-
return nil, errors.New("InferencePool hasn't been initialized yet")
55+
if !ds.HasSynced() {
56+
return nil, errors.New("InferencePool is not initialized in data store")
5757
}
5858
return ds.inferencePool, nil
5959
}
@@ -75,6 +75,13 @@ func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.I
7575
return
7676
}
7777

78+
// HasSynced returns true if InferencePool is set in the data store.
79+
func (ds *K8sDatastore) HasSynced() bool {
80+
ds.poolMu.RLock()
81+
defer ds.poolMu.RUnlock()
82+
return ds.inferencePool != nil
83+
}
84+
7885
func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
7986
var weights int32
8087

pkg/ext-proc/backend/datastore_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,47 @@ import (
44
"testing"
55

66
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
7+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
78
)
89

10+
func TestHasSynced(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
inferencePool *v1alpha1.InferencePool
14+
hasSynced bool
15+
}{
16+
{
17+
name: "Ready when InferencePool exists in data store",
18+
inferencePool: &v1alpha1.InferencePool{
19+
ObjectMeta: v1.ObjectMeta{
20+
Name: "test-pool",
21+
Namespace: "default",
22+
},
23+
},
24+
hasSynced: true,
25+
},
26+
{
27+
name: "Not ready when InferencePool is nil in data store",
28+
inferencePool: nil,
29+
hasSynced: false,
30+
},
31+
}
32+
for _, tt := range tests {
33+
t.Run(tt.name, func(t *testing.T) {
34+
datastore := NewK8sDataStore()
35+
// Set the inference pool
36+
if tt.inferencePool != nil {
37+
datastore.setInferencePool(tt.inferencePool)
38+
}
39+
// Check if the data store has been initialized
40+
hasSynced := datastore.HasSynced()
41+
if hasSynced != tt.hasSynced {
42+
t.Errorf("IsInitialized() = %v, want %v", hasSynced, tt.hasSynced)
43+
}
44+
})
45+
}
46+
}
47+
948
func TestRandomWeightedDraw(t *testing.T) {
1049
tests := []struct {
1150
name string

pkg/ext-proc/health.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/grpc/codes"
7+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
8+
"google.golang.org/grpc/status"
9+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
10+
klog "k8s.io/klog/v2"
11+
)
12+
13+
type healthServer struct {
14+
datastore *backend.K8sDatastore
15+
}
16+
17+
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
18+
if !s.datastore.HasSynced() {
19+
klog.Infof("gRPC health check not serving: %s", in.String())
20+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
21+
}
22+
klog.Infof("gRPC health check serving: %s", in.String())
23+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
24+
}
25+
26+
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
27+
return status.Error(codes.Unimplemented, "Watch is not implemented")
28+
}

pkg/ext-proc/main.go

+106-72
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package main
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"net"
8-
"os"
9-
"os/signal"
10-
"syscall"
117
"time"
128

139
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1410
"google.golang.org/grpc"
15-
"google.golang.org/grpc/codes"
1611
healthPb "google.golang.org/grpc/health/grpc_health_v1"
17-
"google.golang.org/grpc/status"
1812
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1913
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2014
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
@@ -29,10 +23,14 @@ import (
2923
)
3024

3125
var (
32-
port = flag.Int(
33-
"port",
26+
grpcPort = flag.Int(
27+
"grpcPort",
3428
9002,
35-
"gRPC port")
29+
"The gRPC port used for communicating with Envoy proxy")
30+
grpcHealthPort = flag.Int(
31+
"grpcHealthPort",
32+
9003,
33+
"The port used for gRPC liveness and readiness probes")
3634
targetPodHeader = flag.String(
3735
"targetPodHeader",
3836
"target-pod",
@@ -65,55 +63,39 @@ var (
6563
scheme = runtime.NewScheme()
6664
)
6765

68-
type healthServer struct{}
69-
70-
func (s *healthServer) Check(
71-
ctx context.Context,
72-
in *healthPb.HealthCheckRequest,
73-
) (*healthPb.HealthCheckResponse, error) {
74-
klog.Infof("Handling grpc Check request + %s", in.String())
75-
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
76-
}
77-
78-
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
79-
return status.Error(codes.Unimplemented, "Watch is not implemented")
80-
}
81-
8266
func init() {
8367
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
8468
utilruntime.Must(v1alpha1.AddToScheme(scheme))
8569
}
8670

8771
func main() {
88-
8972
klog.InitFlags(nil)
9073
flag.Parse()
9174

9275
ctrl.SetLogger(klog.TODO())
9376

77+
// Validate flags
78+
if err := validateFlags(); err != nil {
79+
klog.Fatalf("Failed to validate flags: %v", err)
80+
}
81+
9482
// Print all flag values
9583
flags := "Flags: "
9684
flag.VisitAll(func(f *flag.Flag) {
9785
flags += fmt.Sprintf("%s=%v; ", f.Name, f.Value)
9886
})
9987
klog.Info(flags)
10088

101-
klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port))
102-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
89+
// Create a new manager to manage controllers
90+
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
10391
if err != nil {
104-
klog.Fatalf("failed to listen: %v", err)
92+
klog.Fatalf("Failed to create controller manager: %v", err)
10593
}
10694

95+
// Create the data store used to cache watched resources
10796
datastore := backend.NewK8sDataStore()
10897

109-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
110-
Scheme: scheme,
111-
})
112-
if err != nil {
113-
klog.Error(err, "unable to start manager")
114-
os.Exit(1)
115-
}
116-
98+
// Create the controllers and register them with the manager
11799
if err := (&backend.InferencePoolReconciler{
118100
Datastore: datastore,
119101
Scheme: mgr.GetScheme(),
@@ -124,7 +106,7 @@ func main() {
124106
},
125107
Record: mgr.GetEventRecorderFor("InferencePool"),
126108
}).SetupWithManager(mgr); err != nil {
127-
klog.Error(err, "Error setting up InferencePoolReconciler")
109+
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
128110
}
129111

130112
if err := (&backend.InferenceModelReconciler{
@@ -137,7 +119,7 @@ func main() {
137119
},
138120
Record: mgr.GetEventRecorderFor("InferenceModel"),
139121
}).SetupWithManager(mgr); err != nil {
140-
klog.Error(err, "Error setting up InferenceModelReconciler")
122+
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
141123
}
142124

143125
if err := (&backend.EndpointSliceReconciler{
@@ -148,53 +130,105 @@ func main() {
148130
ServiceName: *serviceName,
149131
Zone: *zone,
150132
}).SetupWithManager(mgr); err != nil {
151-
klog.Error(err, "Error setting up EndpointSliceReconciler")
133+
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
134+
}
135+
136+
// Start health and ext-proc servers in goroutines
137+
healthSvr := startHealthServer(datastore, *grpcHealthPort)
138+
extProcSvr := startExternalProcessorServer(
139+
datastore,
140+
*grpcPort,
141+
*refreshPodsInterval,
142+
*refreshMetricsInterval,
143+
*targetPodHeader,
144+
)
145+
146+
// Start the controller manager. Blocking and will return when shutdown is complete.
147+
klog.Infof("Starting controller manager")
148+
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
149+
klog.Fatalf("Error starting controller manager: %v", err)
150+
}
151+
klog.Info("Controller manager shutting down")
152+
153+
// Gracefully shutdown servers
154+
if healthSvr != nil {
155+
klog.Info("Health server shutting down")
156+
healthSvr.GracefulStop()
152157
}
158+
if extProcSvr != nil {
159+
klog.Info("Ext-proc server shutting down")
160+
extProcSvr.GracefulStop()
161+
}
162+
163+
klog.Info("All components shutdown")
164+
}
165+
166+
// startHealthServer starts the gRPC health probe server in a goroutine.
167+
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
168+
svr := grpc.NewServer()
169+
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
153170

154-
errChan := make(chan error)
155171
go func() {
156-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
157-
klog.Error(err, "Error running manager")
158-
errChan <- err
172+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
173+
if err != nil {
174+
klog.Fatalf("Health server failed to listen: %v", err)
159175
}
176+
klog.Infof("Health server listening on port: %d", port)
177+
178+
// Blocking and will return when shutdown is complete.
179+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
180+
klog.Fatalf("Health server failed: %v", err)
181+
}
182+
klog.Info("Health server shutting down")
160183
}()
184+
return svr
185+
}
161186

162-
s := grpc.NewServer()
187+
// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
188+
func startExternalProcessorServer(
189+
datastore *backend.K8sDatastore,
190+
port int,
191+
refreshPodsInterval, refreshMetricsInterval time.Duration,
192+
targetPodHeader string,
193+
) *grpc.Server {
194+
svr := grpc.NewServer()
163195

164-
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
165-
if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil {
166-
klog.Fatalf("failed to initialize: %v", err)
167-
}
168-
extProcPb.RegisterExternalProcessorServer(
169-
s,
170-
handlers.NewServer(
171-
pp,
172-
scheduling.NewScheduler(pp),
173-
*targetPodHeader,
174-
datastore))
175-
healthPb.RegisterHealthServer(s, &healthServer{})
176-
177-
klog.Infof("Starting gRPC server on port :%v", *port)
178-
179-
// shutdown
180-
var gracefulStop = make(chan os.Signal, 1)
181-
signal.Notify(gracefulStop, syscall.SIGTERM)
182-
signal.Notify(gracefulStop, syscall.SIGINT)
183196
go func() {
184-
select {
185-
case sig := <-gracefulStop:
186-
klog.Infof("caught sig: %+v", sig)
187-
os.Exit(0)
188-
case err := <-errChan:
189-
klog.Infof("caught error in controller: %+v", err)
190-
os.Exit(0)
197+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
198+
if err != nil {
199+
klog.Fatalf("Ext-proc server failed to listen: %v", err)
191200
}
201+
klog.Infof("Ext-proc server listening on port: %d", port)
192202

203+
// Initialize backend provider
204+
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
205+
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
206+
klog.Fatalf("Failed to initialize backend provider: %v", err)
207+
}
208+
209+
// Register ext_proc handlers
210+
extProcPb.RegisterExternalProcessorServer(
211+
svr,
212+
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
213+
)
214+
215+
// Blocking and will return when shutdown is complete.
216+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
217+
klog.Fatalf("Ext-proc server failed: %v", err)
218+
}
219+
klog.Info("Ext-proc server shutting down")
193220
}()
221+
return svr
222+
}
194223

195-
err = s.Serve(lis)
196-
if err != nil {
197-
klog.Fatalf("Ext-proc failed with the err: %v", err)
224+
func validateFlags() error {
225+
if *poolName == "" {
226+
return fmt.Errorf("required %q flag not set", "poolName")
227+
}
228+
229+
if *serviceName == "" {
230+
return fmt.Errorf("required %q flag not set", "serviceName")
198231
}
199232

233+
return nil
200234
}

pkg/manifests/ext_proc.yaml

+17-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ roleRef:
2828
kind: ClusterRole
2929
name: pod-read
3030
---
31-
3231
apiVersion: apps/v1
3332
kind: Deployment
3433
metadata:
@@ -57,8 +56,25 @@ spec:
5756
- "3"
5857
- -serviceName
5958
- "vllm-llama2-7b-pool"
59+
- -grpcPort
60+
- "9002"
61+
- -grpcHealthPort
62+
- "9003"
6063
ports:
6164
- containerPort: 9002
65+
- containerPort: 9003
66+
livenessProbe:
67+
grpc:
68+
port: 9003
69+
service: inference-extension
70+
initialDelaySeconds: 5
71+
periodSeconds: 10
72+
readinessProbe:
73+
grpc:
74+
port: 9003
75+
service: inference-extension
76+
initialDelaySeconds: 5
77+
periodSeconds: 10
6278
---
6379
apiVersion: v1
6480
kind: Service

0 commit comments

Comments
 (0)