1
1
package main
2
2
3
3
import (
4
- "context"
5
4
"flag"
6
5
"fmt"
7
6
"net"
8
- "os"
9
- "os/signal"
10
- "syscall"
11
7
"time"
12
8
13
9
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
14
10
"google.golang.org/grpc"
15
- "google.golang.org/grpc/codes"
16
11
healthPb "google.golang.org/grpc/health/grpc_health_v1"
17
- "google.golang.org/grpc/status"
18
12
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
19
13
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
20
14
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
@@ -29,10 +23,14 @@ import (
29
23
)
30
24
31
25
var (
32
- port = flag .Int (
33
- "port " ,
26
+ grpcPort = flag .Int (
27
+ "grpcPort " ,
34
28
9002 ,
35
- "gRPC port" )
29
+ "The gRPC port used for communicating with Envoy proxy" )
30
+ grpcHealthPort = flag .Int (
31
+ "grpcHealthPort" ,
32
+ 9003 ,
33
+ "The port used for gRPC liveness and readiness probes" )
36
34
targetPodHeader = flag .String (
37
35
"targetPodHeader" ,
38
36
"target-pod" ,
@@ -65,55 +63,39 @@ var (
65
63
scheme = runtime .NewScheme ()
66
64
)
67
65
68
- type healthServer struct {}
69
-
70
- func (s * healthServer ) Check (
71
- ctx context.Context ,
72
- in * healthPb.HealthCheckRequest ,
73
- ) (* healthPb.HealthCheckResponse , error ) {
74
- klog .Infof ("Handling grpc Check request + %s" , in .String ())
75
- return & healthPb.HealthCheckResponse {Status : healthPb .HealthCheckResponse_SERVING }, nil
76
- }
77
-
78
- func (s * healthServer ) Watch (in * healthPb.HealthCheckRequest , srv healthPb.Health_WatchServer ) error {
79
- return status .Error (codes .Unimplemented , "Watch is not implemented" )
80
- }
81
-
82
66
func init () {
83
67
utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
84
68
utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
85
69
}
86
70
87
71
func main () {
88
-
89
72
klog .InitFlags (nil )
90
73
flag .Parse ()
91
74
92
75
ctrl .SetLogger (klog .TODO ())
93
76
77
+ // Validate flags
78
+ if err := validateFlags (); err != nil {
79
+ klog .Fatalf ("Failed to validate flags: %v" , err )
80
+ }
81
+
94
82
// Print all flag values
95
83
flags := "Flags: "
96
84
flag .VisitAll (func (f * flag.Flag ) {
97
85
flags += fmt .Sprintf ("%s=%v; " , f .Name , f .Value )
98
86
})
99
87
klog .Info (flags )
100
88
101
- klog . Infof ( "Listening on %q" , fmt . Sprintf ( ":%d" , * port ))
102
- lis , err := net . Listen ( "tcp" , fmt . Sprintf ( ":%d" , * port ) )
89
+ // Create a new manager to manage controllers
90
+ mgr , err := ctrl . NewManager ( ctrl . GetConfigOrDie (), ctrl. Options { Scheme : scheme } )
103
91
if err != nil {
104
- klog .Fatalf ("failed to listen : %v" , err )
92
+ klog .Fatalf ("Failed to create controller manager : %v" , err )
105
93
}
106
94
95
+ // Create the data store used to cache watched resources
107
96
datastore := backend .NewK8sDataStore ()
108
97
109
- mgr , err := ctrl .NewManager (ctrl .GetConfigOrDie (), ctrl.Options {
110
- Scheme : scheme ,
111
- })
112
- if err != nil {
113
- klog .Error (err , "unable to start manager" )
114
- os .Exit (1 )
115
- }
116
-
98
+ // Create the controllers and register them with the manager
117
99
if err := (& backend.InferencePoolReconciler {
118
100
Datastore : datastore ,
119
101
Scheme : mgr .GetScheme (),
@@ -124,7 +106,7 @@ func main() {
124
106
},
125
107
Record : mgr .GetEventRecorderFor ("InferencePool" ),
126
108
}).SetupWithManager (mgr ); err != nil {
127
- klog .Error ( err , "Error setting up InferencePoolReconciler" )
109
+ klog .Fatalf ( "Failed setting up InferencePoolReconciler: %v" , err )
128
110
}
129
111
130
112
if err := (& backend.InferenceModelReconciler {
@@ -137,7 +119,7 @@ func main() {
137
119
},
138
120
Record : mgr .GetEventRecorderFor ("InferenceModel" ),
139
121
}).SetupWithManager (mgr ); err != nil {
140
- klog .Error ( err , "Error setting up InferenceModelReconciler" )
122
+ klog .Fatalf ( "Failed setting up InferenceModelReconciler: %v" , err )
141
123
}
142
124
143
125
if err := (& backend.EndpointSliceReconciler {
@@ -148,53 +130,105 @@ func main() {
148
130
ServiceName : * serviceName ,
149
131
Zone : * zone ,
150
132
}).SetupWithManager (mgr ); err != nil {
151
- klog .Error (err , "Error setting up EndpointSliceReconciler" )
133
+ klog .Fatalf ("Failed setting up EndpointSliceReconciler: %v" , err )
134
+ }
135
+
136
+ // Start health and ext-proc servers in goroutines
137
+ healthSvr := startHealthServer (datastore , * grpcHealthPort )
138
+ extProcSvr := startExternalProcessorServer (
139
+ datastore ,
140
+ * grpcPort ,
141
+ * refreshPodsInterval ,
142
+ * refreshMetricsInterval ,
143
+ * targetPodHeader ,
144
+ )
145
+
146
+ // Start the controller manager. Blocking and will return when shutdown is complete.
147
+ klog .Infof ("Starting controller manager" )
148
+ if err := mgr .Start (ctrl .SetupSignalHandler ()); err != nil {
149
+ klog .Fatalf ("Error starting controller manager: %v" , err )
150
+ }
151
+ klog .Info ("Controller manager shutting down" )
152
+
153
+ // Gracefully shutdown servers
154
+ if healthSvr != nil {
155
+ klog .Info ("Health server shutting down" )
156
+ healthSvr .GracefulStop ()
152
157
}
158
+ if extProcSvr != nil {
159
+ klog .Info ("Ext-proc server shutting down" )
160
+ extProcSvr .GracefulStop ()
161
+ }
162
+
163
+ klog .Info ("All components shutdown" )
164
+ }
165
+
166
+ // startHealthServer starts the gRPC health probe server in a goroutine.
167
+ func startHealthServer (ds * backend.K8sDatastore , port int ) * grpc.Server {
168
+ svr := grpc .NewServer ()
169
+ healthPb .RegisterHealthServer (svr , & healthServer {datastore : ds })
153
170
154
- errChan := make (chan error )
155
171
go func () {
156
- if err := mgr . Start ( ctrl . SetupSignalHandler ()); err != nil {
157
- klog . Error ( err , "Error running manager" )
158
- errChan <- err
172
+ lis , err := net . Listen ( "tcp" , fmt . Sprintf ( ":%d" , port ))
173
+ if err != nil {
174
+ klog . Fatalf ( "Health server failed to listen: %v" , err )
159
175
}
176
+ klog .Infof ("Health server listening on port: %d" , port )
177
+
178
+ // Blocking and will return when shutdown is complete.
179
+ if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
180
+ klog .Fatalf ("Health server failed: %v" , err )
181
+ }
182
+ klog .Info ("Health server shutting down" )
160
183
}()
184
+ return svr
185
+ }
161
186
162
- s := grpc .NewServer ()
187
+ // startExternalProcessorServer starts the Envoy external processor server in a goroutine.
188
+ func startExternalProcessorServer (
189
+ datastore * backend.K8sDatastore ,
190
+ port int ,
191
+ refreshPodsInterval , refreshMetricsInterval time.Duration ,
192
+ targetPodHeader string ,
193
+ ) * grpc.Server {
194
+ svr := grpc .NewServer ()
163
195
164
- pp := backend .NewProvider (& vllm.PodMetricsClientImpl {}, datastore )
165
- if err := pp .Init (* refreshPodsInterval , * refreshMetricsInterval ); err != nil {
166
- klog .Fatalf ("failed to initialize: %v" , err )
167
- }
168
- extProcPb .RegisterExternalProcessorServer (
169
- s ,
170
- handlers .NewServer (
171
- pp ,
172
- scheduling .NewScheduler (pp ),
173
- * targetPodHeader ,
174
- datastore ))
175
- healthPb .RegisterHealthServer (s , & healthServer {})
176
-
177
- klog .Infof ("Starting gRPC server on port :%v" , * port )
178
-
179
- // shutdown
180
- var gracefulStop = make (chan os.Signal , 1 )
181
- signal .Notify (gracefulStop , syscall .SIGTERM )
182
- signal .Notify (gracefulStop , syscall .SIGINT )
183
196
go func () {
184
- select {
185
- case sig := <- gracefulStop :
186
- klog .Infof ("caught sig: %+v" , sig )
187
- os .Exit (0 )
188
- case err := <- errChan :
189
- klog .Infof ("caught error in controller: %+v" , err )
190
- os .Exit (0 )
197
+ lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
198
+ if err != nil {
199
+ klog .Fatalf ("Ext-proc server failed to listen: %v" , err )
191
200
}
201
+ klog .Infof ("Ext-proc server listening on port: %d" , port )
192
202
203
+ // Initialize backend provider
204
+ pp := backend .NewProvider (& vllm.PodMetricsClientImpl {}, datastore )
205
+ if err := pp .Init (refreshPodsInterval , refreshMetricsInterval ); err != nil {
206
+ klog .Fatalf ("Failed to initialize backend provider: %v" , err )
207
+ }
208
+
209
+ // Register ext_proc handlers
210
+ extProcPb .RegisterExternalProcessorServer (
211
+ svr ,
212
+ handlers .NewServer (pp , scheduling .NewScheduler (pp ), targetPodHeader , datastore ),
213
+ )
214
+
215
+ // Blocking and will return when shutdown is complete.
216
+ if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
217
+ klog .Fatalf ("Ext-proc server failed: %v" , err )
218
+ }
219
+ klog .Info ("Ext-proc server shutting down" )
193
220
}()
221
+ return svr
222
+ }
194
223
195
- err = s .Serve (lis )
196
- if err != nil {
197
- klog .Fatalf ("Ext-proc failed with the err: %v" , err )
224
+ func validateFlags () error {
225
+ if * poolName == "" {
226
+ return fmt .Errorf ("required %q flag not set" , "poolName" )
227
+ }
228
+
229
+ if * serviceName == "" {
230
+ return fmt .Errorf ("required %q flag not set" , "serviceName" )
198
231
}
199
232
233
+ return nil
200
234
}
0 commit comments