Skip to content

Commit a6e11e8

Browse files
committed
Refactors health server to use data store
Signed-off-by: Daneyon Hansen <[email protected]>
1 parent 6fc3229 commit a6e11e8

File tree

4 files changed

+85
-87
lines changed

4 files changed

+85
-87
lines changed

pkg/ext-proc/backend/datastore.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
5252
func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
5353
ds.poolMu.RLock()
5454
defer ds.poolMu.RUnlock()
55-
if ds.inferencePool == nil {
56-
return nil, errors.New("InferencePool hasn't been initialized yet")
55+
if !ds.HasSynced() {
56+
return nil, errors.New("InferencePool is not initialized in data store")
5757
}
5858
return ds.inferencePool, nil
5959
}
@@ -75,6 +75,13 @@ func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.I
7575
return
7676
}
7777

78+
// HasSynced returns true if InferencePool is set in the data store.
79+
func (ds *K8sDatastore) HasSynced() bool {
80+
ds.poolMu.RLock()
81+
defer ds.poolMu.RUnlock()
82+
return ds.inferencePool != nil
83+
}
84+
7885
func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
7986
var weights int32
8087

pkg/ext-proc/backend/datastore_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,47 @@ import (
44
"testing"
55

66
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
7+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
78
)
89

10+
func TestHasSynced(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
inferencePool *v1alpha1.InferencePool
14+
hasSynced bool
15+
}{
16+
{
17+
name: "Ready when InferencePool exists in data store",
18+
inferencePool: &v1alpha1.InferencePool{
19+
ObjectMeta: v1.ObjectMeta{
20+
Name: "test-pool",
21+
Namespace: "default",
22+
},
23+
},
24+
hasSynced: true,
25+
},
26+
{
27+
name: "Not ready when InferencePool is nil in data store",
28+
inferencePool: nil,
29+
hasSynced: false,
30+
},
31+
}
32+
for _, tt := range tests {
33+
t.Run(tt.name, func(t *testing.T) {
34+
datastore := NewK8sDataStore()
35+
// Set the inference pool
36+
if tt.inferencePool != nil {
37+
datastore.setInferencePool(tt.inferencePool)
38+
}
39+
// Check if the data store has been initialized
40+
hasSynced := datastore.HasSynced()
41+
if hasSynced != tt.hasSynced {
42+
t.Errorf("IsInitialized() = %v, want %v", hasSynced, tt.hasSynced)
43+
}
44+
})
45+
}
46+
}
47+
948
func TestRandomWeightedDraw(t *testing.T) {
1049
tests := []struct {
1150
name string

pkg/ext-proc/health.go

+3-34
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,20 @@ package main
22

33
import (
44
"context"
5-
"fmt"
65

76
"google.golang.org/grpc/codes"
87
healthPb "google.golang.org/grpc/health/grpc_health_v1"
98
"google.golang.org/grpc/status"
10-
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
9+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1110
klog "k8s.io/klog/v2"
12-
"sigs.k8s.io/controller-runtime/pkg/client"
1311
)
1412

1513
type healthServer struct {
16-
client.Client
14+
datastore *backend.K8sDatastore
1715
}
1816

1917
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
20-
if err := s.checkResources(); err != nil {
18+
if !s.datastore.HasSynced() {
2119
klog.Infof("gRPC health check not serving: %s", in.String())
2220
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
2321
}
@@ -28,32 +26,3 @@ func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckReques
2826
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
2927
return status.Error(codes.Unimplemented, "Watch is not implemented")
3028
}
31-
32-
// checkResources uses a client to list all InferenceModels in the configured namespace
33-
// and gets the configured InferencePool by name and namespace. If any client calls fail,
34-
// no InferenceModels exist, or the InferencePool does not exist, an error is returned.
35-
func (s *healthServer) checkResources() error {
36-
ctx := context.Background()
37-
var infPool v1alpha1.InferencePool
38-
if err := s.Client.Get(
39-
ctx,
40-
client.ObjectKey{Name: *poolName, Namespace: *poolNamespace},
41-
&infPool,
42-
); err != nil {
43-
return fmt.Errorf("failed to get InferencePool %s/%s: %v", *poolNamespace, *poolName, err)
44-
}
45-
klog.Infof("Successfully retrieved InferencePool %s/%s", *poolNamespace, *poolName)
46-
47-
var modelList v1alpha1.InferenceModelList
48-
if err := s.Client.List(ctx, &modelList, client.InNamespace(*poolNamespace)); err != nil {
49-
return fmt.Errorf("failed to list InferenceModels in namespace %s: %v", *poolNamespace, err)
50-
}
51-
52-
// Ensure at least 1 InferenceModel
53-
if len(modelList.Items) == 0 {
54-
return fmt.Errorf("no InferenceModels exist in namespace %s", *poolNamespace)
55-
}
56-
klog.Infof("Found %d InferenceModels in namespace %s", len(modelList.Items), *poolNamespace)
57-
58-
return nil
59-
}

pkg/ext-proc/main.go

+34-51
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func main() {
7676

7777
// Validate flags
7878
if err := validateFlags(); err != nil {
79-
klog.Fatalf("flag validation failed: %v", err)
79+
klog.Fatalf("Failed to validate flags: %v", err)
8080
}
8181

8282
// Print all flag values
@@ -89,7 +89,7 @@ func main() {
8989
// Create a new manager to manage controllers
9090
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
9191
if err != nil {
92-
klog.Fatalf("failed to start manager: %v", err)
92+
klog.Fatalf("Failed to create controller manager: %v", err)
9393
}
9494

9595
// Create the data store used to cache watched resources
@@ -106,7 +106,7 @@ func main() {
106106
},
107107
Record: mgr.GetEventRecorderFor("InferencePool"),
108108
}).SetupWithManager(mgr); err != nil {
109-
klog.Fatalf("Error setting up InferencePoolReconciler: %v", err)
109+
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
110110
}
111111

112112
if err := (&backend.InferenceModelReconciler{
@@ -119,7 +119,7 @@ func main() {
119119
},
120120
Record: mgr.GetEventRecorderFor("InferenceModel"),
121121
}).SetupWithManager(mgr); err != nil {
122-
klog.Fatalf("Error setting up InferenceModelReconciler: %v", err)
122+
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
123123
}
124124

125125
if err := (&backend.EndpointSliceReconciler{
@@ -130,112 +130,95 @@ func main() {
130130
ServiceName: *serviceName,
131131
Zone: *zone,
132132
}).SetupWithManager(mgr); err != nil {
133-
klog.Fatalf("Error setting up EndpointSliceReconciler: %v", err)
133+
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
134134
}
135135

136-
// Channel to handle error signals for goroutines
137-
errChan := make(chan error, 1)
138-
139-
// Start each component in its own goroutine
140-
startControllerManager(mgr, errChan)
141-
healthSvr := startHealthServer(mgr, errChan, *grpcHealthPort)
136+
// Start health and ext-proc servers in goroutines
137+
healthSvr := startHealthServer(datastore, *grpcHealthPort)
142138
extProcSvr := startExternalProcessorServer(
143-
errChan,
144139
datastore,
145140
*grpcPort,
146141
*refreshPodsInterval,
147142
*refreshMetricsInterval,
148143
*targetPodHeader,
149144
)
150145

151-
// Wait for first error from any goroutine
152-
err = <-errChan
153-
if err != nil {
154-
klog.Errorf("goroutine failed: %v", err)
155-
} else {
156-
klog.Infof("Manager exited gracefully")
146+
// Start the controller manager. Blocking and will return when shutdown is complete.
147+
klog.Infof("Starting controller manager")
148+
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
149+
klog.Fatalf("Error starting controller manager: %v", err)
157150
}
151+
klog.Info("Controller manager shutting down")
158152

159-
// Gracefully shutdown components
153+
// Gracefully shutdown servers
160154
if healthSvr != nil {
161-
klog.Info("Health server shutting down...")
155+
klog.Info("Health server shutting down")
162156
healthSvr.GracefulStop()
163157
}
164158
if extProcSvr != nil {
165-
klog.Info("Ext-proc server shutting down...")
159+
klog.Info("Ext-proc server shutting down")
166160
extProcSvr.GracefulStop()
167161
}
168162

169-
klog.Info("All components stopped gracefully")
170-
}
171-
172-
// startControllerManager runs the controller manager in a goroutine.
173-
func startControllerManager(mgr ctrl.Manager, errChan chan<- error) {
174-
go func() {
175-
// Blocking and will return when shutdown is complete.
176-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
177-
errChan <- fmt.Errorf("controller manager failed to start: %w", err)
178-
}
179-
// Manager exited gracefully
180-
klog.Info("Controller manager shutting down...")
181-
errChan <- nil
182-
}()
163+
klog.Info("All components shutdown")
183164
}
184165

185166
// startHealthServer starts the gRPC health probe server in a goroutine.
186-
func startHealthServer(mgr ctrl.Manager, errChan chan<- error, port int) *grpc.Server {
187-
healthSvr := grpc.NewServer()
188-
healthPb.RegisterHealthServer(healthSvr, &healthServer{Client: mgr.GetClient()})
167+
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
168+
svr := grpc.NewServer()
169+
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
189170

190171
go func() {
191-
healthLis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
172+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
192173
if err != nil {
193-
errChan <- fmt.Errorf("health server failed to listen: %w", err)
174+
klog.Fatalf("Health server failed to listen: %v", err)
194175
}
195176
klog.Infof("Health server listening on port: %d", port)
196177

197178
// Blocking and will return when shutdown is complete.
198-
if serveErr := healthSvr.Serve(healthLis); serveErr != nil && serveErr != grpc.ErrServerStopped {
199-
errChan <- fmt.Errorf("health server failed: %w", serveErr)
179+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
180+
klog.Fatalf("Health server failed: %v", err)
200181
}
182+
klog.Info("Health server shutting down")
201183
}()
202-
return healthSvr
184+
return svr
203185
}
204186

205187
// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
206188
func startExternalProcessorServer(
207-
errChan chan<- error,
208189
datastore *backend.K8sDatastore,
209190
port int,
210191
refreshPodsInterval, refreshMetricsInterval time.Duration,
211192
targetPodHeader string,
212193
) *grpc.Server {
213-
extSvr := grpc.NewServer()
194+
svr := grpc.NewServer()
195+
214196
go func() {
215197
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
216198
if err != nil {
217-
errChan <- fmt.Errorf("ext-proc server failed to listen: %w", err)
199+
klog.Fatalf("Ext-proc server failed to listen: %v", err)
218200
}
219201
klog.Infof("Ext-proc server listening on port: %d", port)
220202

221203
// Initialize backend provider
222204
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
223205
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
224-
errChan <- fmt.Errorf("failed to initialize backend provider: %w", err)
206+
klog.Fatalf("Failed to initialize backend provider: %v", err)
225207
}
226208

227209
// Register ext_proc handlers
228210
extProcPb.RegisterExternalProcessorServer(
229-
extSvr,
211+
svr,
230212
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
231213
)
232214

233215
// Blocking and will return when shutdown is complete.
234-
if serveErr := extSvr.Serve(lis); serveErr != nil && serveErr != grpc.ErrServerStopped {
235-
errChan <- fmt.Errorf("ext-proc server failed: %w", serveErr)
216+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
217+
klog.Fatalf("Ext-proc server failed: %v", err)
236218
}
219+
klog.Info("Ext-proc server shutting down")
237220
}()
238-
return extSvr
221+
return svr
239222
}
240223

241224
func validateFlags() error {

0 commit comments

Comments
 (0)