diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 77ee5b2a5..cb60f32d9 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -7,16 +7,20 @@ import ( "net" "net/http" "strconv" + "time" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" - runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -33,7 +37,7 @@ const ( var ( grpcPort = flag.Int( "grpcPort", - runserver.DefaultGrpcPort, + 9002, "The gRPC port used for communicating with Envoy proxy") grpcHealthPort = flag.Int( "grpcHealthPort", @@ -43,31 +47,31 @@ var ( "metricsPort", 9090, "The metrics port") targetPodHeader = flag.String( "targetPodHeader", - runserver.DefaultTargetPodHeader, + "target-pod", "Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.") poolName = flag.String( "poolName", - runserver.DefaultPoolName, + "", "Name of the InferencePool this Endpoint Picker is associated with.") poolNamespace = flag.String( "poolNamespace", - runserver.DefaultPoolNamespace, + "default", "Namespace of the InferencePool this Endpoint Picker is associated with.") serviceName = flag.String( "serviceName", - runserver.DefaultServiceName, + "", "Name of the Service that will be used to read EndpointSlices from") zone = flag.String( "zone", - runserver.DefaultZone, + "", "The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ") refreshPodsInterval = flag.Duration( "refreshPodsInterval", - runserver.DefaultRefreshPodsInterval, + 10*time.Second, "interval to refresh pods") refreshMetricsInterval = flag.Duration( "refreshMetricsInterval", - runserver.DefaultRefreshMetricsInterval, + 50*time.Millisecond, "interval to refresh metrics") scheme = runtime.NewScheme() @@ -99,34 +103,71 @@ func main() { }) klog.Info(flags) + // Create a new manager to manage controllers + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme}) + if err != nil { + klog.Fatalf("Failed to create controller manager: %v", err) + } + + // Create the data store used to cache watched resources datastore := backend.NewK8sDataStore() - serverRunner := &runserver.ExtProcServerRunner{ - GrpcPort: *grpcPort, - TargetPodHeader: *targetPodHeader, - PoolName: *poolName, - PoolNamespace: *poolNamespace, - ServiceName: *serviceName, - Zone: *zone, - RefreshPodsInterval: *refreshPodsInterval, - RefreshMetricsInterval: *refreshMetricsInterval, - Scheme: scheme, - Config: ctrl.GetConfigOrDie(), - Datastore: datastore, + // Create the controllers and register them with the manager + if err := (&backend.InferencePoolReconciler{ + Datastore: datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + PoolNamespacedName: types.NamespacedName{ + Name: *poolName, + Namespace: *poolNamespace, + }, + Record: mgr.GetEventRecorderFor("InferencePool"), + }).SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) + } + + if err := (&backend.InferenceModelReconciler{ + Datastore: datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + PoolNamespacedName: types.NamespacedName{ + Name: *poolName, + Namespace: *poolNamespace, + }, + Record: mgr.GetEventRecorderFor("InferenceModel"), + }).SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) + } + + if err := (&backend.EndpointSliceReconciler{ + Datastore: datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Record: mgr.GetEventRecorderFor("endpointslice"), + ServiceName: *serviceName, + Zone: *zone, + }).SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) } - serverRunner.Setup() // Start health and ext-proc servers in goroutines healthSvr := startHealthServer(datastore, *grpcHealthPort) - extProcSvr := serverRunner.Start( + extProcSvr := startExternalProcessorServer( datastore, - &vllm.PodMetricsClientImpl{}, + *grpcPort, + *refreshPodsInterval, + *refreshMetricsInterval, + *targetPodHeader, ) // Start metrics handler metricsSvr := startMetricsHandler(*metricsPort, cfg) - // Start manager, blocking - serverRunner.StartManager() + // Start the controller manager. Blocking and will return when shutdown is complete. + klog.Infof("Starting controller manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + klog.Fatalf("Error starting controller manager: %v", err) + } + klog.Info("Controller manager shutting down") // Gracefully shutdown servers if healthSvr != nil { @@ -168,6 +209,43 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server { return svr } +// startExternalProcessorServer starts the Envoy external processor server in a goroutine. +func startExternalProcessorServer( + datastore *backend.K8sDatastore, + port int, + refreshPodsInterval, refreshMetricsInterval time.Duration, + targetPodHeader string, +) *grpc.Server { + svr := grpc.NewServer() + + go func() { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + klog.Fatalf("Ext-proc server failed to listen: %v", err) + } + klog.Infof("Ext-proc server listening on port: %d", port) + + // Initialize backend provider + pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { + klog.Fatalf("Failed to initialize backend provider: %v", err) + } + + // Register ext_proc handlers + extProcPb.RegisterExternalProcessorServer( + svr, + handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore), + ) + + // Blocking and will return when shutdown is complete. + if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { + klog.Fatalf("Ext-proc server failed: %v", err) + } + klog.Info("Ext-proc server shutting down") + }() + return svr +} + func startMetricsHandler(port int, cfg *rest.Config) *http.Server { metrics.Register() diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go deleted file mode 100644 index 94c6078c8..000000000 --- a/pkg/ext-proc/server/runserver.go +++ /dev/null @@ -1,156 +0,0 @@ -package server - -import ( - "fmt" - "net" - "time" - - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "google.golang.org/grpc" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - klog "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" -) - -// ExtProcServerRunner provides methods to manage an external process server. -type ExtProcServerRunner struct { - GrpcPort int - TargetPodHeader string - PoolName string - PoolNamespace string - ServiceName string - Zone string - RefreshPodsInterval time.Duration - RefreshMetricsInterval time.Duration - Scheme *runtime.Scheme - Config *rest.Config - Datastore *backend.K8sDatastore - manager ctrl.Manager -} - -// Default values for CLI flags in main -const ( - DefaultGrpcPort = 9002 // default for --grpcPort - DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader - DefaultPoolName = "" // required but no default - DefaultPoolNamespace = "default" // default for --poolNamespace - DefaultServiceName = "" // required but no default - DefaultZone = "" // default for --zone - DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval - DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval -) - -func NewDefaultExtProcServerRunner() *ExtProcServerRunner { - return &ExtProcServerRunner{ - GrpcPort: DefaultGrpcPort, - TargetPodHeader: DefaultTargetPodHeader, - PoolName: DefaultPoolName, - PoolNamespace: DefaultPoolNamespace, - ServiceName: DefaultServiceName, - Zone: DefaultZone, - RefreshPodsInterval: DefaultRefreshPodsInterval, - RefreshMetricsInterval: DefaultRefreshMetricsInterval, - // Scheme, Config, and Datastore can be assigned later. - } -} - -// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager. -func (r *ExtProcServerRunner) Setup() { - // Create a new manager to manage controllers - mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme}) - if err != nil { - klog.Fatalf("Failed to create controller manager: %v", err) - } - r.manager = mgr - - // Create the controllers and register them with the manager - if err := (&backend.InferencePoolReconciler{ - Datastore: r.Datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - PoolNamespacedName: types.NamespacedName{ - Name: r.PoolName, - Namespace: r.PoolNamespace, - }, - Record: mgr.GetEventRecorderFor("InferencePool"), - }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) - } - - if err := (&backend.InferenceModelReconciler{ - Datastore: r.Datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - PoolNamespacedName: types.NamespacedName{ - Name: r.PoolName, - Namespace: r.PoolNamespace, - }, - Record: mgr.GetEventRecorderFor("InferenceModel"), - }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) - } - - if err := (&backend.EndpointSliceReconciler{ - Datastore: r.Datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Record: mgr.GetEventRecorderFor("endpointslice"), - ServiceName: r.ServiceName, - Zone: r.Zone, - }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) - } -} - -// Start starts the Envoy external processor server in a goroutine. -func (r *ExtProcServerRunner) Start( - podDatastore *backend.K8sDatastore, - podMetricsClient backend.PodMetricsClient, -) *grpc.Server { - svr := grpc.NewServer() - - go func() { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort)) - if err != nil { - klog.Fatalf("Ext-proc server failed to listen: %v", err) - } - klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort) - - // Initialize backend provider - pp := backend.NewProvider(podMetricsClient, podDatastore) - if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil { - klog.Fatalf("Failed to initialize backend provider: %v", err) - } - - // Register ext_proc handlers - extProcPb.RegisterExternalProcessorServer( - svr, - handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore), - ) - - // Blocking and will return when shutdown is complete. - if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { - klog.Fatalf("Ext-proc server failed: %v", err) - } - klog.Info("Ext-proc server shutting down") - }() - return svr -} - -func (r *ExtProcServerRunner) StartManager() { - if r.manager == nil { - klog.Fatalf("Runner has no manager setup to run: %v", r) - } - // Start the controller manager. Blocking and will return when shutdown is complete. - klog.Infof("Starting controller manager") - mgr := r.manager - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - klog.Fatalf("Error starting controller manager: %v", err) - } - klog.Info("Controller manager shutting down") -} diff --git a/pkg/ext-proc/test/hermetic_test.go b/pkg/ext-proc/test/hermetic_test.go new file mode 100644 index 000000000..acbd74a94 --- /dev/null +++ b/pkg/ext-proc/test/hermetic_test.go @@ -0,0 +1,178 @@ +// Package test contains e2e tests for the ext proc while faking the backend pods. +package test + +import ( + "context" + "fmt" + "testing" + "time" + + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/testing/protocmp" + "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" +) + +const ( + port = 9002 +) + +func TestHandleRequestBody(t *testing.T) { + tests := []struct { + name string + req *extProcPb.ProcessingRequest + pods []*backend.PodMetrics + models map[string]*v1alpha1.InferenceModel + wantHeaders []*configPb.HeaderValueOption + wantBody []byte + wantErr bool + }{ + { + name: "success", + req: GenerateRequest("my-model"), + models: map[string]*v1alpha1.InferenceModel{ + "my-model": { + Spec: v1alpha1.InferenceModelSpec{ + ModelName: "my-model", + TargetModels: []v1alpha1.TargetModel{ + { + Name: "my-model-v1", + Weight: pointer(100), + }, + }, + }, + }, + }, + // pod-1 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + pods: []*backend.PodMetrics{ + { + Pod: FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "my-model-v1": 1, + }, + }, + }, + { + Pod: FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte("address-1"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("73"), + }, + }, + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-v1\",\"prompt\":\"hello\",\"temperature\":0}"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client, cleanup := setUpServer(t, test.pods, test.models) + t.Cleanup(cleanup) + want := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: test.wantHeaders, + }, + BodyMutation: &extProcPb.BodyMutation{ + Mutation: &extProcPb.BodyMutation_Body{ + Body: test.wantBody, + }, + }, + }, + }, + }, + } + res, err := sendRequest(t, client, test.req) + + if (err != nil) != test.wantErr { + t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) + } + + if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { + t.Errorf("Unexpected response, (-want +got): %v", diff) + } + }) + } + +} + +func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { + server := StartExtProc(port, time.Second, time.Second, pods, models) + + address := fmt.Sprintf("localhost:%v", port) + // Create a grpc connection + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to connect to %v: %v", address, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + return client, func() { + cancel() + conn.Close() + server.GracefulStop() + } +} + +func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { + t.Logf("Sending request: %v", req) + if err := client.Send(req); err != nil { + t.Logf("Failed to send request %+v: %v", req, err) + return nil, err + } + + res, err := client.Recv() + if err != nil { + t.Logf("Failed to receive: %v", err) + return nil, err + } + t.Logf("Received request %+v", res) + return res, err +} + +func pointer(v int32) *int32 { + return &v +} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go deleted file mode 100644 index 1379285f3..000000000 --- a/test/integration/hermetic_test.go +++ /dev/null @@ -1,435 +0,0 @@ -// Package test contains e2e tests for the ext proc while faking the backend pods. -package integration - -import ( - "bufio" - "bytes" - "context" - "errors" - "flag" - "fmt" - "io" - "log" - "os" - "path/filepath" - "testing" - "time" - - configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/testing/protocmp" - "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" - runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" - extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - k8syaml "k8s.io/apimachinery/pkg/util/yaml" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - klog "k8s.io/klog/v2" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - "sigs.k8s.io/yaml" -) - -const ( - port = runserver.DefaultGrpcPort -) - -var ( - serverRunner *runserver.ExtProcServerRunner - k8sClient k8sclient.Client - testEnv *envtest.Environment - scheme = runtime.NewScheme() -) - -func SKIPTestHandleRequestBody(t *testing.T) { - tests := []struct { - name string - req *extProcPb.ProcessingRequest - pods []*backend.PodMetrics - models map[string]*v1alpha1.InferenceModel - wantHeaders []*configPb.HeaderValueOption - wantBody []byte - wantErr bool - }{ - { - name: "success", - req: extprocutils.GenerateRequest("my-model"), - models: map[string]*v1alpha1.InferenceModel{ - "my-model": { - Spec: v1alpha1.InferenceModelSpec{ - ModelName: "my-model", - TargetModels: []v1alpha1.TargetModel{ - { - Name: "my-model-v1", - Weight: pointer(100), - }, - }, - }, - }, - }, - // pod-1 will be picked because it has relatively low queue size, with the requested - // model being active, and has low KV cache. - pods: []*backend.PodMetrics{ - { - Pod: extprocutils.FakePod(0), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, - }, - }, - }, - { - Pod: extprocutils.FakePod(1), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.1, - ActiveModels: map[string]int{ - "foo": 1, - "my-model-v1": 1, - }, - }, - }, - { - Pod: extprocutils.FakePod(2), - Metrics: backend.Metrics{ - WaitingQueueSize: 10, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - }, - }, - }, - }, - wantHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "target-pod", - RawValue: []byte("address-1"), - }, - }, - { - Header: &configPb.HeaderValue{ - Key: "Content-Length", - RawValue: []byte("73"), - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-v1\",\"prompt\":\"hello\",\"temperature\":0}"), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpServer(t, test.pods, test.models) - t.Cleanup(cleanup) - want := &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_Body{ - Body: test.wantBody, - }, - }, - }, - }, - }, - } - res, err := sendRequest(t, client, test.req) - - if (err != nil) != test.wantErr { - t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) - } - - if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) - } - }) - } - -} - -func TestKubeInferenceModelRequest(t *testing.T) { - tests := []struct { - name string - req *extProcPb.ProcessingRequest - wantHeaders []*configPb.HeaderValueOption - wantBody []byte - wantErr bool - }{ - { - name: "success", - req: extprocutils.GenerateRequest("sql-lora"), - // pod-1 will be picked because it has relatively low queue size, with the requested - // model being active, and has low KV cache. - wantHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "target-pod", - RawValue: []byte("address-1"), - }, - }, - { - Header: &configPb.HeaderValue{ - Key: "Content-Length", - RawValue: []byte("76"), - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), - wantErr: false, - }, - } - - pods := []*backend.PodMetrics{ - { - Pod: extprocutils.FakePod(0), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, - }, - }, - }, - { - Pod: extprocutils.FakePod(1), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.1, - ActiveModels: map[string]int{ - "foo": 1, - "sql-lora-1fdg2": 1, - }, - }, - }, - { - Pod: extprocutils.FakePod(2), - Metrics: backend.Metrics{ - WaitingQueueSize: 10, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - }, - }, - }, - } - - // Set up global k8sclient and extproc server runner with test environment config - BeforeSuit() - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(t, pods) - t.Cleanup(cleanup) - want := &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_Body{ - Body: test.wantBody, - }, - }, - }, - }, - }, - } - res, err := sendRequest(t, client, test.req) - - if err != nil { - if !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) - } - } else if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) - } - }) - } -} - -func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - t.Logf("Setting up ExtProc server") - server := extprocutils.StartExtProc(port, time.Second, time.Second, pods, models) - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - log.Fatalf("Failed to connect to %v: %v", address, err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - log.Fatalf("Failed to create client: %v", err) - } - return client, func() { - cancel() - conn.Close() - server.GracefulStop() - } -} - -func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - t.Logf("Setting up hermetic ExtProc server") - klog.InitFlags(nil) - flag.Parse() - // Configure klog verbosity levels to print ext proc logs. - _ = flag.Lookup("v").Value.Set("3") - - // Unmarshal CRDs from file into structs - manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") - docs, err := readDocuments(manifestsPath) - if err != nil { - log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) - } - - inferenceModels := make([]*v1alpha1.InferenceModel, 0) - for _, doc := range docs { - inferenceModel := &v1alpha1.InferenceModel{} - if err = yaml.Unmarshal(doc, inferenceModel); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) - } - if inferenceModel.Kind != "InferenceModel" { - continue - } - inferenceModels = append(inferenceModels, inferenceModel) - } - t.Logf("Inference models to add: %+v", inferenceModels) - for _, model := range inferenceModels { - t.Logf("Creating inference model: %+v", model) - if err := k8sClient.Create(context.Background(), model); err != nil { - log.Fatalf("unable to create inferenceModel %v: %v", model.GetName(), err) - } - } - - ps := make(backend.PodSet) - pms := make(map[backend.Pod]*backend.PodMetrics) - for _, pod := range pods { - ps[pod.Pod] = true - pms[pod.Pod] = pod - } - pmc := &backend.FakePodMetricsClient{Res: pms} - - server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc) - if err != nil { - log.Fatalf("Ext-proc failed with the err: %v", err) - } - - // Wait the reconciler to populate the datastore. - time.Sleep(10 * time.Second) - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - log.Fatalf("Failed to connect to %v: %v", address, err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - log.Fatalf("Failed to create client: %v", err) - } - return client, func() { - cancel() - conn.Close() - server.GracefulStop() - } -} - -// Sets up a test environment and returns the runner struct -func BeforeSuit() { - // Set up mock k8s API Client - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: true, - } - cfg, err := testEnv.Start() - - if err != nil { - log.Fatalf("Failed to start test environment, cfg: %v error: %v", cfg, err) - } - - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(v1alpha1.AddToScheme(scheme)) - - k8sClient, err = k8sclient.New(cfg, k8sclient.Options{Scheme: scheme}) - if err != nil { - log.Fatalf("Failed to start k8s Client: %v", err) - } else if k8sClient == nil { - log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg) - } - - serverRunner = runserver.NewDefaultExtProcServerRunner() - // Adjust from defaults - serverRunner.PoolName = "vllm-llama2-7b-pool" - serverRunner.Scheme = scheme - serverRunner.Config = cfg - serverRunner.Datastore = backend.NewK8sDataStore() - - serverRunner.Setup() - - // Start the controller manager in go routine, not blocking - go func() { - serverRunner.StartManager() - }() -} - -func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - t.Logf("Sending request: %v", req) - if err := client.Send(req); err != nil { - t.Logf("Failed to send request %+v: %v", req, err) - return nil, err - } - - res, err := client.Recv() - if err != nil { - t.Logf("Failed to receive: %v", err) - return nil, err - } - t.Logf("Received request %+v", res) - return res, err -} - -// readDocuments reads documents from file. -func readDocuments(fp string) ([][]byte, error) { - b, err := os.ReadFile(fp) - if err != nil { - return nil, err - } - - docs := [][]byte{} - reader := k8syaml.NewYAMLReader(bufio.NewReader(bytes.NewReader(b))) - for { - // Read document - doc, err := reader.Read() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, err - } - docs = append(docs, doc) - } - return docs, nil -} -func pointer(v int32) *int32 { - return &v -} diff --git a/test/testdata/inferencepool-with-model-hermetic.yaml b/test/testdata/inferencepool-with-model-hermetic.yaml deleted file mode 100644 index 8703c37af..000000000 --- a/test/testdata/inferencepool-with-model-hermetic.yaml +++ /dev/null @@ -1,30 +0,0 @@ -apiVersion: inference.networking.x-k8s.io/v1alpha1 -kind: InferencePool -metadata: - labels: - name: vllm-llama2-7b-pool -spec: - targetPortNumber: 8000 - selector: - app: vllm-llama2-7b-pool ---- -apiVersion: inference.networking.x-k8s.io/v1alpha1 -kind: InferenceModel -metadata: - labels: - app.kubernetes.io/name: api - app.kubernetes.io/managed-by: kustomize - name: inferencemodel-sample - namespace: default -spec: - modelName: sql-lora - criticality: Critical - poolRef: - # this is the default val: - group: inference.networking.x-k8s.io - # this is the default val: - kind: InferencePool - name: vllm-llama2-7b-pool - targetModels: - - name: sql-lora-1fdg2 - weight: 100 \ No newline at end of file