Skip to content

Refactor ext-proc Main with Server Package Add Hermetic Test with k8s API Client for EPP #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 29, 2025
130 changes: 26 additions & 104 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ import (
"net"
"net/http"
"strconv"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -37,7 +33,7 @@ const (
var (
grpcPort = flag.Int(
"grpcPort",
9002,
runserver.DefaultGrpcPort,
"The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int(
"grpcHealthPort",
Expand All @@ -47,31 +43,31 @@ var (
"metricsPort", 9090, "The metrics port")
targetPodHeader = flag.String(
"targetPodHeader",
"target-pod",
runserver.DefaultTargetPodHeader,
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
poolName = flag.String(
"poolName",
"",
runserver.DefaultPoolName,
"Name of the InferencePool this Endpoint Picker is associated with.")
poolNamespace = flag.String(
"poolNamespace",
"default",
runserver.DefaultPoolNamespace,
"Namespace of the InferencePool this Endpoint Picker is associated with.")
serviceName = flag.String(
"serviceName",
"",
runserver.DefaultServiceName,
"Name of the Service that will be used to read EndpointSlices from")
zone = flag.String(
"zone",
"",
runserver.DefaultZone,
"The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ")
refreshPodsInterval = flag.Duration(
"refreshPodsInterval",
10*time.Second,
runserver.DefaultRefreshPodsInterval,
"interval to refresh pods")
refreshMetricsInterval = flag.Duration(
"refreshMetricsInterval",
50*time.Millisecond,
runserver.DefaultRefreshMetricsInterval,
"interval to refresh metrics")

scheme = runtime.NewScheme()
Expand Down Expand Up @@ -103,71 +99,34 @@ func main() {
})
klog.Info(flags)

// Create a new manager to manage controllers
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
if err != nil {
klog.Fatalf("Failed to create controller manager: %v", err)
}

// Create the data store used to cache watched resources
datastore := backend.NewK8sDataStore()

// Create the controllers and register them with the manager
if err := (&backend.InferencePoolReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
},
Record: mgr.GetEventRecorderFor("InferencePool"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
}

if err := (&backend.InferenceModelReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
},
Record: mgr.GetEventRecorderFor("InferenceModel"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
}

if err := (&backend.EndpointSliceReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Record: mgr.GetEventRecorderFor("endpointslice"),
ServiceName: *serviceName,
Zone: *zone,
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
TargetPodHeader: *targetPodHeader,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
ServiceName: *serviceName,
Zone: *zone,
RefreshPodsInterval: *refreshPodsInterval,
RefreshMetricsInterval: *refreshMetricsInterval,
Scheme: scheme,
Config: ctrl.GetConfigOrDie(),
Datastore: datastore,
}
serverRunner.Setup()

// Start health and ext-proc servers in goroutines
healthSvr := startHealthServer(datastore, *grpcHealthPort)
extProcSvr := startExternalProcessorServer(
extProcSvr := serverRunner.Start(
datastore,
*grpcPort,
*refreshPodsInterval,
*refreshMetricsInterval,
*targetPodHeader,
&vllm.PodMetricsClientImpl{},
)
// Start metrics handler
metricsSvr := startMetricsHandler(*metricsPort, cfg)

// Start the controller manager. Blocking and will return when shutdown is complete.
klog.Infof("Starting controller manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Fatalf("Error starting controller manager: %v", err)
}
klog.Info("Controller manager shutting down")
// Start manager, blocking
serverRunner.StartManager()

// Gracefully shutdown servers
if healthSvr != nil {
Expand Down Expand Up @@ -209,43 +168,6 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
return svr
}

// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
func startExternalProcessorServer(
datastore *backend.K8sDatastore,
port int,
refreshPodsInterval, refreshMetricsInterval time.Duration,
targetPodHeader string,
) *grpc.Server {
svr := grpc.NewServer()

go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
klog.Fatalf("Ext-proc server failed to listen: %v", err)
}
klog.Infof("Ext-proc server listening on port: %d", port)

// Initialize backend provider
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Ext-proc server failed: %v", err)
}
klog.Info("Ext-proc server shutting down")
}()
return svr
}

func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
metrics.Register()

Expand Down
156 changes: 156 additions & 0 deletions pkg/ext-proc/server/runserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package server

import (
"fmt"
"net"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
klog "k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
)

// ExtProcServerRunner provides methods to manage an external process server.
type ExtProcServerRunner struct {
GrpcPort int
TargetPodHeader string
PoolName string
PoolNamespace string
ServiceName string
Zone string
RefreshPodsInterval time.Duration
RefreshMetricsInterval time.Duration
Scheme *runtime.Scheme
Config *rest.Config
Datastore *backend.K8sDatastore
manager ctrl.Manager
}

// Default values for CLI flags in main
const (
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultServiceName = "" // required but no default
DefaultZone = "" // default for --zone
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
)

func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
GrpcPort: DefaultGrpcPort,
TargetPodHeader: DefaultTargetPodHeader,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
ServiceName: DefaultServiceName,
Zone: DefaultZone,
RefreshPodsInterval: DefaultRefreshPodsInterval,
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
// Scheme, Config, and Datastore can be assigned later.
}
}

// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
func (r *ExtProcServerRunner) Setup() {
// Create a new manager to manage controllers
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
if err != nil {
klog.Fatalf("Failed to create controller manager: %v", err)
}
r.manager = mgr

// Create the controllers and register them with the manager
if err := (&backend.InferencePoolReconciler{
Datastore: r.Datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: r.PoolName,
Namespace: r.PoolNamespace,
},
Record: mgr.GetEventRecorderFor("InferencePool"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
}

if err := (&backend.InferenceModelReconciler{
Datastore: r.Datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: r.PoolName,
Namespace: r.PoolNamespace,
},
Record: mgr.GetEventRecorderFor("InferenceModel"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
}

if err := (&backend.EndpointSliceReconciler{
Datastore: r.Datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Record: mgr.GetEventRecorderFor("endpointslice"),
ServiceName: r.ServiceName,
Zone: r.Zone,
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
}
}

// Start starts the Envoy external processor server in a goroutine.
func (r *ExtProcServerRunner) Start(
podDatastore *backend.K8sDatastore,
podMetricsClient backend.PodMetricsClient,
) *grpc.Server {
svr := grpc.NewServer()

go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
if err != nil {
klog.Fatalf("Ext-proc server failed to listen: %v", err)
}
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)

// Initialize backend provider
pp := backend.NewProvider(podMetricsClient, podDatastore)
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Ext-proc server failed: %v", err)
}
klog.Info("Ext-proc server shutting down")
}()
return svr
}

func (r *ExtProcServerRunner) StartManager() {
if r.manager == nil {
klog.Fatalf("Runner has no manager setup to run: %v", r)
}
// Start the controller manager. Blocking and will return when shutdown is complete.
klog.Infof("Starting controller manager")
mgr := r.manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Fatalf("Error starting controller manager: %v", err)
}
klog.Info("Controller manager shutting down")
}
Loading