Skip to content

Revert "Refactor ext-proc Main with Server Package Add Hermetic Test with k8s API Client for EPP" #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 104 additions & 26 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import (
"net"
"net/http"
"strconv"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -33,7 +37,7 @@ const (
var (
grpcPort = flag.Int(
"grpcPort",
runserver.DefaultGrpcPort,
9002,
"The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int(
"grpcHealthPort",
Expand All @@ -43,31 +47,31 @@ var (
"metricsPort", 9090, "The metrics port")
targetPodHeader = flag.String(
"targetPodHeader",
runserver.DefaultTargetPodHeader,
"target-pod",
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
poolName = flag.String(
"poolName",
runserver.DefaultPoolName,
"",
"Name of the InferencePool this Endpoint Picker is associated with.")
poolNamespace = flag.String(
"poolNamespace",
runserver.DefaultPoolNamespace,
"default",
"Namespace of the InferencePool this Endpoint Picker is associated with.")
serviceName = flag.String(
"serviceName",
runserver.DefaultServiceName,
"",
"Name of the Service that will be used to read EndpointSlices from")
zone = flag.String(
"zone",
runserver.DefaultZone,
"",
"The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ")
refreshPodsInterval = flag.Duration(
"refreshPodsInterval",
runserver.DefaultRefreshPodsInterval,
10*time.Second,
"interval to refresh pods")
refreshMetricsInterval = flag.Duration(
"refreshMetricsInterval",
runserver.DefaultRefreshMetricsInterval,
50*time.Millisecond,
"interval to refresh metrics")

scheme = runtime.NewScheme()
Expand Down Expand Up @@ -99,34 +103,71 @@ func main() {
})
klog.Info(flags)

// Create a new manager to manage controllers
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
if err != nil {
klog.Fatalf("Failed to create controller manager: %v", err)
}

// Create the data store used to cache watched resources
datastore := backend.NewK8sDataStore()

serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
TargetPodHeader: *targetPodHeader,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
ServiceName: *serviceName,
Zone: *zone,
RefreshPodsInterval: *refreshPodsInterval,
RefreshMetricsInterval: *refreshMetricsInterval,
Scheme: scheme,
Config: ctrl.GetConfigOrDie(),
Datastore: datastore,
// Create the controllers and register them with the manager
if err := (&backend.InferencePoolReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
},
Record: mgr.GetEventRecorderFor("InferencePool"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
}

if err := (&backend.InferenceModelReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
},
Record: mgr.GetEventRecorderFor("InferenceModel"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
}

if err := (&backend.EndpointSliceReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Record: mgr.GetEventRecorderFor("endpointslice"),
ServiceName: *serviceName,
Zone: *zone,
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
}
serverRunner.Setup()

// Start health and ext-proc servers in goroutines
healthSvr := startHealthServer(datastore, *grpcHealthPort)
extProcSvr := serverRunner.Start(
extProcSvr := startExternalProcessorServer(
datastore,
&vllm.PodMetricsClientImpl{},
*grpcPort,
*refreshPodsInterval,
*refreshMetricsInterval,
*targetPodHeader,
)
// Start metrics handler
metricsSvr := startMetricsHandler(*metricsPort, cfg)

// Start manager, blocking
serverRunner.StartManager()
// Start the controller manager. Blocking and will return when shutdown is complete.
klog.Infof("Starting controller manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Fatalf("Error starting controller manager: %v", err)
}
klog.Info("Controller manager shutting down")

// Gracefully shutdown servers
if healthSvr != nil {
Expand Down Expand Up @@ -168,6 +209,43 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
return svr
}

// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
func startExternalProcessorServer(
datastore *backend.K8sDatastore,
port int,
refreshPodsInterval, refreshMetricsInterval time.Duration,
targetPodHeader string,
) *grpc.Server {
svr := grpc.NewServer()

go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
klog.Fatalf("Ext-proc server failed to listen: %v", err)
}
klog.Infof("Ext-proc server listening on port: %d", port)

// Initialize backend provider
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Ext-proc server failed: %v", err)
}
klog.Info("Ext-proc server shutting down")
}()
return svr
}

func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
metrics.Register()

Expand Down
156 changes: 0 additions & 156 deletions pkg/ext-proc/server/runserver.go

This file was deleted.

Loading