diff --git a/examples/poc/README.md b/examples/poc/README.md new file mode 100644 index 00000000..b859fe50 --- /dev/null +++ b/examples/poc/README.md @@ -0,0 +1,68 @@ +# Envoy Ext Proc Gateway with LoRA Integration + +This project sets up an Envoy gateway to handle gRPC calls with integration of LoRA (Low-Rank Adaptation). The configuration aims to manage gRPC traffic through Envoy's external processing and custom routing based on headers and load balancing rules. The setup includes Kubernetes services and deployments for both the gRPC server and the vllm-lora application. + +## Requirements +- A vLLM based deployment (using the custom image provided below), with LoRA Adapters +- Kubernetes cluster +- Envoy Gateway v1.1 installed on your cluster: https://gateway.envoyproxy.io/v1.1/tasks/quickstart/ +- `kubectl` command-line tool +- Go (for local development) + +## vLLM +***This PoC uses a modified vLLM fork, the public image of the fork is here: `ghcr.io/tomatillo-and-multiverse/vllm:demo`*** + +The fork is here: https://github.com/kaushikmitr/vllm. + +The summary of changes from standard vLLM are: +- Active/Registered LoRA adapters are returned as a response header (used for lora-aware routing) +- Queue size is returned as a response header +- Active/Registered LoRA adapters are emitted as metrics (for out-of-band scraping during low traffic periods) + + +## Overview + +This project contains the necessary configurations and code to set up and deploy a service using Kubernetes, Envoy, and Go. The service involves routing based on the model specified (using Open AI API format), collecting metrics, and ensuring efficient load balancing. + +![alt text](./envoy-gateway-bootstrap.png) + + +## Quickstart + +### Steps + +1. **Apply Kubernetes Manifests** + ```bash + cd manifests + kubectl apply -f ext_proc.yaml + kubectl apply -f vllm/vllm-lora-service.yaml + kubectl apply -f vllm/vllm-lora-deployment.yaml + ``` + +2. **Update `ext_proc.yaml`** + - Ensure the `ext_proc.yaml` is updated with the pod names and internal IP addresses of the vLLM replicas. This step is crucial for the correct routing of requests based on headers. + +2. **Update and apply `gateway.yaml`** + - Ensure the `gateway.yaml` is updated with the internal IP addresses of the ExtProc service. This step is also crucial for the correct routing of requests based on headers. + ```bash + cd manifests + kubectl apply -f gateway.yaml + ``` + +### Monitoring and Metrics + +- The Go application collects metrics and saves the latest response headers in memory. +- Ensure Envoy is configured to route based on the metrics collected from the `/metric` endpoint of different service pods. + +## Contributing + +1. Fork the repository. +2. Create a new branch. +3. Make your changes. +4. Open a pull request. + +## License + +This project is licensed under the MIT License. + +--- \ No newline at end of file diff --git a/examples/poc/envoy-gateway-bootstrap.png b/examples/poc/envoy-gateway-bootstrap.png new file mode 100644 index 00000000..aecfcd1d Binary files /dev/null and b/examples/poc/envoy-gateway-bootstrap.png differ diff --git a/examples/poc/ext-proc/Dockerfile b/examples/poc/ext-proc/Dockerfile new file mode 100644 index 00000000..3a35c1cc --- /dev/null +++ b/examples/poc/ext-proc/Dockerfile @@ -0,0 +1,19 @@ +## Multistage build +FROM golang:1.22.5-alpine as build +ENV CGO_ENABLED=0 +ENV GOOS=linux +ENV GOARCH=amd64 + +WORKDIR /src +COPY . . +RUN go mod download +RUN go build -o /ext-proc +FROM alpine:latest +## Multistage deploy +FROM gcr.io/distroless/base-debian10 +# Install bash + +WORKDIR / +COPY --from=build /ext-proc /ext-proc + +ENTRYPOINT ["/ext-proc"] diff --git a/examples/poc/ext-proc/cache/cache.go b/examples/poc/ext-proc/cache/cache.go new file mode 100644 index 00000000..aeb2e545 --- /dev/null +++ b/examples/poc/ext-proc/cache/cache.go @@ -0,0 +1,91 @@ +package cache + +import ( + "encoding/json" + "fmt" + + "github.com/coocood/freecache" +) + +type ActiveLoraModelMetrics struct { + Date string + PodName string + ModelName string + NumberOfPendingRequests int +} + +type PendingRequestActiveAdaptersMetrics struct { + Date string + PodName string + PendingRequests int + NumberOfActiveAdapters int +} + +func SetCacheActiveLoraModel(cache *freecache.Cache, metric ActiveLoraModelMetrics) error { + cacheKey := fmt.Sprintf("%s:%s", metric.PodName, metric.ModelName) + cacheValue, err := json.Marshal(metric) + if err != nil { + return fmt.Errorf("error marshaling ActiveLoraModelMetrics for key %s: %v", cacheKey, err) + } + err = cache.Set([]byte(cacheKey), cacheValue, 0) + if err != nil { + return fmt.Errorf("error setting cacheActiveLoraModel for key %s: %v", cacheKey, err) + } + fmt.Printf("Set cacheActiveLoraModel - Key: %s, Value: %s\n", cacheKey, cacheValue) + return nil +} + +func SetCachePendingRequestActiveAdapters(cache *freecache.Cache, metric PendingRequestActiveAdaptersMetrics) error { + cacheKey := fmt.Sprintf("%s:", metric.PodName) + cacheValue, err := json.Marshal(metric) + if err != nil { + return fmt.Errorf("error marshaling PendingRequestActiveAdaptersMetrics for key %s: %v", cacheKey, err) + } + err = cache.Set([]byte(cacheKey), cacheValue, 0) + if err != nil { + return fmt.Errorf("error setting cachePendingRequestActiveAdapters for key %s: %v", cacheKey, err) + } + fmt.Printf("Set cachePendingRequestActiveAdapters - Key: %s, Value: %s\n", cacheKey, cacheValue) + return nil +} + +func GetCacheActiveLoraModel(cache *freecache.Cache, podName, modelName string) (*ActiveLoraModelMetrics, error) { + cacheKey := fmt.Sprintf("%s:%s", podName, modelName) + + value, err := cache.Get([]byte(cacheKey)) + if err != nil { + return nil, fmt.Errorf("error fetching cacheActiveLoraModel for key %s: %v", cacheKey, err) + } + var metric ActiveLoraModelMetrics + err = json.Unmarshal(value, &metric) + if err != nil { + return nil, fmt.Errorf("error unmarshaling ActiveLoraModelMetrics for key %s: %v", cacheKey, err) + } + fmt.Printf("Got cacheActiveLoraModel - Key: %s, Value: %s\n", cacheKey, value) + return &metric, nil +} + +func GetCachePendingRequestActiveAdapters(cache *freecache.Cache, podName string) (*PendingRequestActiveAdaptersMetrics, error) { + cacheKey := fmt.Sprintf("%s:", podName) + + value, err := cache.Get([]byte(cacheKey)) + if err != nil { + return nil, fmt.Errorf("error fetching cachePendingRequestActiveAdapters for key %s: %v", cacheKey, err) + } + var metric PendingRequestActiveAdaptersMetrics + err = json.Unmarshal(value, &metric) + if err != nil { + return nil, fmt.Errorf("error unmarshaling PendingRequestActiveAdaptersMetrics for key %s: %v", cacheKey, err) + } + fmt.Printf("Got cachePendingRequestActiveAdapters - Key: %s, Value: %s\n", cacheKey, value) + return &metric, nil +} + +type PodCache struct { + PodIPMap map[string]string + IpPodMap map[string]string +} + +func SetPodCache(cache *freecache.Cache, pods []string) { + cacheKey := fmt.Sprintf("") +} diff --git a/examples/poc/ext-proc/go.mod b/examples/poc/ext-proc/go.mod new file mode 100644 index 00000000..31f844a7 --- /dev/null +++ b/examples/poc/ext-proc/go.mod @@ -0,0 +1,24 @@ +module ext-proc + +go 1.21 + +require ( + github.com/coocood/freecache v1.2.4 + github.com/envoyproxy/go-control-plane v0.12.0 + github.com/prometheus/client_model v0.6.1 + github.com/prometheus/common v0.55.0 + google.golang.org/grpc v1.65.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/examples/poc/ext-proc/go.sum b/examples/poc/ext-proc/go.sum new file mode 100644 index 00000000..6d27a35c --- /dev/null +++ b/examples/poc/ext-proc/go.sum @@ -0,0 +1,33 @@ +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/coocood/freecache v1.2.4 h1:UdR6Yz/X1HW4fZOuH0Z94KwG851GWOSknua5VUbb/5M= +github.com/coocood/freecache v1.2.4/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/examples/poc/ext-proc/handlers/handlers.go b/examples/poc/ext-proc/handlers/handlers.go new file mode 100644 index 00000000..0191ebd6 --- /dev/null +++ b/examples/poc/ext-proc/handlers/handlers.go @@ -0,0 +1,392 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "log" + "strconv" + "strings" + "time" + + "ext-proc/cache" + "ext-proc/metrics" + "ext-proc/scheduling" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + filterPb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + + "github.com/coocood/freecache" +) + +type Server struct { + Pods []string + PodIPMap map[string]string + IpPodMap map[string]string + CacheActiveLoraModel *freecache.Cache + CachePendingRequestActiveAdapters *freecache.Cache + TokenCache *scheduling.TokenCache + EnforceFairness bool +} + +func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { + log.Println(" ") + log.Println(" ") + log.Println("Started process: --> ") + + ctx := srv.Context() + targetPodIP := "" + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + req, err := srv.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) + } + + log.Println(" ") + log.Println(" ") + log.Println("Got stream: --> ") + + resp := &extProcPb.ProcessingResponse{} + switch v := req.Request.(type) { + case *extProcPb.ProcessingRequest_RequestHeaders: + resp, targetPodIP = s.HandleRequestHeaders(req, targetPodIP) + case *extProcPb.ProcessingRequest_RequestBody: + resp, targetPodIP = s.HandleRequestBody(req, targetPodIP) + case *extProcPb.ProcessingRequest_ResponseHeaders: + resp, targetPodIP = s.HandleResponseHeaders(req, targetPodIP) + default: + log.Printf("Unknown Request type %+v\n", v) + } + + if err := srv.Send(resp); err != nil { + log.Printf("send error %v", err) + } + } +} + +func valueExists(m map[string]string, valueToFind string) bool { + for _, value := range m { + if value == valueToFind { + return true + } + } + return false +} + +func (s *Server) HandleRequestBody(req *extProcPb.ProcessingRequest, targetPodIP string) (*extProcPb.ProcessingResponse, string) { + log.Println("--- In RequestBody processing") + var requestBody map[string]interface{} + v := req.Request.(*extProcPb.ProcessingRequest_RequestBody) + if err := json.Unmarshal(v.RequestBody.Body, &requestBody); err != nil { + log.Printf("Error unmarshaling request body: %v", err) + return nil, targetPodIP + } + + loraAdapterRequested, ok := requestBody["model"].(string) + if !ok { + log.Println("model/lora-adapter not found in request body") + return nil, targetPodIP + } + + threshold := 100000 + thresholdValue, ok := requestBody["threshold"].(float64) + if ok { + threshold = int(thresholdValue) + } + targetPod := "" + + if targetPodIP == "" { + // Retrieve metrics from cache + var loraMetrics []cache.ActiveLoraModelMetrics + var requestMetrics []cache.PendingRequestActiveAdaptersMetrics + + for _, pod := range s.Pods { + loraMetric, err := cache.GetCacheActiveLoraModel(s.CacheActiveLoraModel, pod, loraAdapterRequested) + if err == nil { + loraMetrics = append(loraMetrics, *loraMetric) + } else if err != freecache.ErrNotFound { + log.Printf("Error fetching cacheActiveLoraModel for pod %s and lora_adapter_requested %s: %v", pod, loraAdapterRequested, err) + } + + requestMetric, err := cache.GetCachePendingRequestActiveAdapters(s.CachePendingRequestActiveAdapters, pod) + if err == nil { + requestMetrics = append(requestMetrics, *requestMetric) + } else if err != freecache.ErrNotFound { + log.Printf("Error fetching cachePendingRequestActiveAdapters for pod %s: %v", pod, err) + break + } + } + + fmt.Printf("Fetched loraMetrics: %+v\n", loraMetrics) + fmt.Printf("Fetched requestMetrics: %+v\n", requestMetrics) + + targetPod = metrics.FindTargetPod(loraMetrics, requestMetrics, loraAdapterRequested, threshold) + targetPodIP = s.PodIPMap[targetPod] + fmt.Printf("Selected target pod: %s\n", targetPod) + fmt.Printf("Selected target pod IP: %s\n", targetPodIP) + } else { + targetPod = s.IpPodMap[targetPodIP] + fmt.Printf("Pre-selected target pod: %s\n", targetPod) + fmt.Printf("Pre-selected target pod IP: %s\n", targetPodIP) + } + + var resp *extProcPb.ProcessingResponse + if s.EnforceFairness && !s.TokenCache.IsFairRequest(loraAdapterRequested) { + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_TooManyRequests, + }, + }, + }, + } + } else if !metrics.Contains(s.Pods, targetPod) { + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_NotFound, + }, + }, + }, + } + } else { + headers := []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "x-went-into-req-body", + RawValue: []byte("true"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte(targetPodIP), + }, + }, + } + + // Print headers + for _, header := range headers { + fmt.Printf("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue) + } + + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: headers, + }, + }, + }, + }, + } + } + return resp, targetPodIP +} + +func (s *Server) HandleResponseHeaders(req *extProcPb.ProcessingRequest, targetPodIP string) (*extProcPb.ProcessingResponse, string) { + log.Println("--- In ResponseHeaders processing") + r := req.Request + h := r.(*extProcPb.ProcessingRequest_ResponseHeaders) + + log.Printf("Headers: %+v\n", h) + + var loraMetrics []cache.ActiveLoraModelMetrics + var requestMetrics []cache.PendingRequestActiveAdaptersMetrics + var modelNames map[string]int + var totalTokens int + var model string + var err error + currentTime := time.Now().Unix() + pendingQueueSize := -1 + podAdapterMap := make(map[string]int) + targetPod := s.IpPodMap[targetPodIP] + for _, header := range h.ResponseHeaders.Headers.Headers { + switch header.Key { + case "active_lora_adapters": + err = json.Unmarshal([]byte(header.RawValue), &modelNames) + if err != nil { + log.Printf("Error parsing model_names: %v", err) + } + case "pending_queue_size": + var err error + pendingQueueSize, err = strconv.Atoi(string(header.RawValue)) + if err != nil { + log.Printf("Error converting pending_queue_size: %v", err) + } + case "model": + model = string(header.RawValue) + case "total_tokens": + totalTokens, err = strconv.Atoi(string(header.RawValue)) + if err != nil { + log.Printf("Error parsing total_tokens: %v", err) + } + } + } + if modelNames != nil { + for modelName, numberOfPendingRequests := range modelNames { + metric := cache.ActiveLoraModelMetrics{ + Date: time.Now().Format(time.RFC3339), + PodName: targetPod, + ModelName: modelName, + NumberOfPendingRequests: numberOfPendingRequests, + } + podAdapterMap[metric.PodName]++ + loraMetrics = append(loraMetrics, metric) + } + // Update cache with parsed values + for _, metric := range loraMetrics { + if err := cache.SetCacheActiveLoraModel(s.CacheActiveLoraModel, metric); err != nil { + log.Printf("Error setting cache in Response Header: %v", err) + } + } + } + if pendingQueueSize >= 0 { + requestMetric := cache.PendingRequestActiveAdaptersMetrics{ + Date: time.Now().Format(time.RFC3339), + PodName: targetPod, + PendingRequests: pendingQueueSize, + NumberOfActiveAdapters: podAdapterMap[targetPod], + } + requestMetrics = append(requestMetrics, requestMetric) + for _, metric := range requestMetrics { + if err := cache.SetCachePendingRequestActiveAdapters(s.CachePendingRequestActiveAdapters, metric); err != nil { + log.Printf("Error setting cache in Response Header: %v", err) + } + } + } + log.Printf("Model Value: %v", model) + log.Printf("Total Tokens: %v", totalTokens) + if "model" != "" { + s.TokenCache.StoreResponseInfo(model, currentTime, totalTokens) + } + s.TokenCache.AdapterMap.Range(func(k, v any) bool { + log.Printf("Adapter: %+v Entries: %+v", k, v) + return true + }) + + resp := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "x-went-into-resp-headers", + RawValue: []byte("true"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte(targetPod), + }, + }, + }, + }, + }, + }, + }, + } + return resp, targetPod +} + +func (s *Server) HandleRequestHeaders(req *extProcPb.ProcessingRequest, targetPodIP string) (*extProcPb.ProcessingResponse, string) { + log.Println("--- In RequestHeaders processing ...") + r := req.Request + h := r.(*extProcPb.ProcessingRequest_RequestHeaders) + + log.Printf("Headers: %+v\n", h) + log.Printf("EndOfStream: %v\n", h.RequestHeaders.EndOfStream) + for _, n := range h.RequestHeaders.Headers.Headers { + if strings.ToLower(n.Key) == "target-pod" { + targetPodIP = string(n.RawValue) + } + } + + var resp *extProcPb.ProcessingResponse + if targetPodIP == "" { + bodyMode := filterPb.ProcessingMode_BUFFERED + + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "x-went-into-req-headers", + RawValue: []byte("true"), + }, + }, + }, + }, + ClearRouteCache: true, + }, + }, + }, + ModeOverride: &filterPb.ProcessingMode{ + ResponseHeaderMode: filterPb.ProcessingMode_SEND, + RequestBodyMode: bodyMode, + }, + } + } else { + bodyMode := filterPb.ProcessingMode_NONE + + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "x-went-into-req-headers", + RawValue: []byte("true"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte(targetPodIP), + }, + }, + }, + }, + ClearRouteCache: true, + }, + }, + }, + ModeOverride: &filterPb.ProcessingMode{ + ResponseHeaderMode: filterPb.ProcessingMode_SEND, + RequestBodyMode: bodyMode, + }, + } + } + // Print final headers being sent + fmt.Println("[request_header]Final headers being sent:") + for _, header := range resp.GetRequestHeaders().GetResponse().GetHeaderMutation().GetSetHeaders() { + fmt.Printf("%s: %s\n", header.GetHeader().Key, header.GetHeader().RawValue) + } + return resp, targetPodIP +} diff --git a/examples/poc/ext-proc/main.go b/examples/poc/ext-proc/main.go new file mode 100644 index 00000000..2c7addaf --- /dev/null +++ b/examples/poc/ext-proc/main.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "net" + "os" + "os/signal" + "runtime/debug" + "strings" + "syscall" + "time" + + "ext-proc/handlers" + "ext-proc/metrics" + "ext-proc/scheduling" + + "github.com/coocood/freecache" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + healthPb "google.golang.org/grpc/health/grpc_health_v1" +) + +type extProcServer struct{} +type server struct{} + +var ( + port int + certPath string + enforeFairness bool + cacheActiveLoraModel *freecache.Cache + cachePendingRequestActiveAdapters *freecache.Cache + pods []string + podIPMap map[string]string + ipPodMap map[string]string + interval = 30 * time.Second // Update interval for fetching metrics + TTL = int64(7) +) + +type healthServer struct{} + +func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { + log.Printf("Handling grpc Check request + %s", in.String()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil +} + +func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "Watch is not implemented") +} + +func main() { + flag.IntVar(&port, "port", 9002, "gRPC port") + flag.StringVar(&certPath, "certPath", "", "path to extProcServer certificate and private key") + enforceFairness := flag.Bool("enable-fairness", false, "flag to enable fairness enforcement over the KV-Cache") + podsFlag := flag.String("pods", "", "Comma-separated list of pod addresses") + podIPsFlag := flag.String("podIPs", "", "Comma-separated list of pod IPs") + flag.Parse() + + if *podsFlag == "" || *podIPsFlag == "" { + log.Fatal("No pods or pod IPs provided. Use the -pods and -podIPs flags to specify comma-separated lists of pod addresses and pod IPs.") + } + + pods = strings.Split(*podsFlag, ",") + podIPs := strings.Split(*podIPsFlag, ",") + + if len(pods) != len(podIPs) { + log.Fatal("The number of pod addresses and pod IPs must match.") + } + + podIPMap = make(map[string]string) + for i := range pods { + podIPMap[pods[i]] = podIPs[i] + } + ipPodMap = make(map[string]string) + for i := range podIPs { + ipPodMap[podIPs[i]] = pods[i] + } + + // cache init + cacheActiveLoraModel = freecache.NewCache(1024) + cachePendingRequestActiveAdapters = freecache.NewCache(1024) + debug.SetGCPercent(20) + + // Start the periodic metrics fetching in a separate goroutine + + go metrics.FetchMetricsPeriodically(pods, podIPMap, cacheActiveLoraModel, cachePendingRequestActiveAdapters, interval) + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + // creates the in-cluster config + config, err := rest.InClusterConfig() + if err != nil { + panic(err.Error()) + } + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + + s := grpc.NewServer() + + extProcPb.RegisterExternalProcessorServer(s, &handlers.Server{ + Pods: pods, + PodIPMap: podIPMap, + IpPodMap: ipPodMap, + CacheActiveLoraModel: cacheActiveLoraModel, + CachePendingRequestActiveAdapters: cachePendingRequestActiveAdapters, + TokenCache: scheduling.CreateNewTokenCache(TTL), + EnforceFairness: *enforceFairness, + }) + healthPb.RegisterHealthServer(s, &healthServer{}) + + log.Println("Starting gRPC server on port :9002") + + // shutdown + var gracefulStop = make(chan os.Signal) + signal.Notify(gracefulStop, syscall.SIGTERM) + signal.Notify(gracefulStop, syscall.SIGINT) + go func() { + sig := <-gracefulStop + log.Printf("caught sig: %+v", sig) + log.Println("Wait for 1 second to finish processing") + time.Sleep(1 * time.Second) + os.Exit(0) + }() + + s.Serve(lis) + +} diff --git a/examples/poc/ext-proc/metrics/metrics.go b/examples/poc/ext-proc/metrics/metrics.go new file mode 100644 index 00000000..219014b5 --- /dev/null +++ b/examples/poc/ext-proc/metrics/metrics.go @@ -0,0 +1,327 @@ +package metrics + +import ( + "fmt" + "log" + "math" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + "github.com/coocood/freecache" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + + "ext-proc/cache" +) + +// Contains checks if a slice contains a specific element +func Contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} + +// FetchLoraMetricsFromPod fetches metrics from a given pod and sends them to a channel +func FetchLoraMetricsFromPod(pod string, podIPMap map[string]string, ch chan<- []cache.ActiveLoraModelMetrics, wg *sync.WaitGroup) { + defer wg.Done() + ip, exists := podIPMap[pod] + if !exists { + log.Printf("pod %s has no corresponding ip defined", pod) + return + } + url := fmt.Sprintf("http://%s/metrics", ip) + resp, err := http.Get(url) + if err != nil { + log.Printf("failed to fetch metrics from %s: %v", pod, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("unexpected status code from %s: %v", pod, resp.StatusCode) + return + } + + parser := expfmt.TextParser{} + metricFamilies, err := parser.TextToMetricFamilies(resp.Body) + if err != nil { + log.Printf("failed to parse metrics from %s: %v", pod, err) + return + } + + var loraMetrics []cache.ActiveLoraModelMetrics + var adapterList []string + modelsDict := make(map[string]int) + + for name, mf := range metricFamilies { + if name == "vllm:active_lora_adapters" { + for _, m := range mf.GetMetric() { + modelName := GetLabelValue(m, "active_lora_adapters") + numberOfPendingRequests := int(m.GetGauge().GetValue()) + modelsDict[modelName] = numberOfPendingRequests + } + } + if name == "vllm:info_active_adapters_info" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "active_adapters" { + if label.GetValue() != "" { + adapterList = strings.Split(label.GetValue(), ",") + } + } + } + } + } + } + + for modelName, numberOfPendingRequests := range modelsDict { + if !Contains(adapterList, modelName) { + continue + } + loraMetric := cache.ActiveLoraModelMetrics{ + Date: time.Now().Format(time.RFC3339), + PodName: pod, + ModelName: modelName, + NumberOfPendingRequests: numberOfPendingRequests, + } + loraMetrics = append(loraMetrics, loraMetric) + } + + ch <- loraMetrics +} + +// FetchRequestMetricsFromPod fetches request metrics from a given pod and sends them to a channel +func FetchRequestMetricsFromPod(pod string, podIPMap map[string]string, ch chan<- []cache.PendingRequestActiveAdaptersMetrics, wg *sync.WaitGroup) { + defer wg.Done() + + ip, exists := podIPMap[pod] + if !exists { + log.Printf("pod %s has no corresponding ip defined", pod) + return + } + url := fmt.Sprintf("http://%s/metrics", ip) + resp, err := http.Get(url) + if err != nil { + log.Printf("failed to fetch metrics from %s: %v", pod, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("unexpected status code from %s: %v", pod, resp.StatusCode) + return + } + + parser := expfmt.TextParser{} + metricFamilies, err := parser.TextToMetricFamilies(resp.Body) + if err != nil { + log.Printf("failed to parse metrics from %s: %v", pod, err) + return + } + + var requestMetrics []cache.PendingRequestActiveAdaptersMetrics + pendingRequests := 0 + adapterCount := 0 + + for name, mf := range metricFamilies { + switch name { + case "vllm:num_requests_waiting": + for _, m := range mf.GetMetric() { + pendingRequests += int(m.GetGauge().GetValue()) + } + case "vllm:num_requests_running": + for _, m := range mf.GetMetric() { + pendingRequests += int(m.GetGauge().GetValue()) + } + case "vllm:info_active_adapters_info": + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "active_adapters" { + if label.GetValue() != "" { + adapterCount = len(strings.Split(label.GetValue(), ",")) + } + } + } + } + } + } + + requestMetric := cache.PendingRequestActiveAdaptersMetrics{ + Date: time.Now().Format(time.RFC3339), + PodName: pod, + PendingRequests: pendingRequests, + NumberOfActiveAdapters: adapterCount, + } + requestMetrics = append(requestMetrics, requestMetric) + + ch <- requestMetrics +} + +// FetchMetrics fetches metrics from all pods and returns them +func FetchMetrics(pods []string, podIPMap map[string]string) ([]cache.ActiveLoraModelMetrics, []cache.PendingRequestActiveAdaptersMetrics) { + ch := make(chan []cache.ActiveLoraModelMetrics) + ch2 := make(chan []cache.PendingRequestActiveAdaptersMetrics) + var wg sync.WaitGroup + var wg2 sync.WaitGroup + + for _, pod := range pods { + wg.Add(1) + go FetchLoraMetricsFromPod(pod, podIPMap, ch, &wg) + } + + for _, pod := range pods { + wg2.Add(1) + go FetchRequestMetricsFromPod(pod, podIPMap, ch2, &wg2) + } + + go func() { + wg.Wait() + close(ch) + }() + + go func() { + wg2.Wait() + close(ch2) + }() + + var allLoraMetrics []cache.ActiveLoraModelMetrics + var allRequestMetrics []cache.PendingRequestActiveAdaptersMetrics + for loraMetrics := range ch { + allLoraMetrics = append(allLoraMetrics, loraMetrics...) + } + for requestMetrics := range ch2 { + allRequestMetrics = append(allRequestMetrics, requestMetrics...) + } + return allLoraMetrics, allRequestMetrics +} + +// GetLabelValue returns the value of a label from a Prometheus metric +func GetLabelValue(m *io_prometheus_client.Metric, label string) string { + for _, l := range m.GetLabel() { + if l.GetName() == label { + return l.GetValue() + } + } + return "" +} + +// FindTargetPod finds the target pod based on metrics and the requested lora adapter +func FindTargetPod(loraMetrics []cache.ActiveLoraModelMetrics, requestMetrics []cache.PendingRequestActiveAdaptersMetrics, loraAdapterRequested string, threshold int) string { + var targetPod string + bestAlternativePod := "" + minAltRequests := math.MaxInt + + fmt.Println("Searching for the best pod...") + + // Filter metrics for the requested model + for _, reqMetric := range requestMetrics { + if reqMetric.PendingRequests < minAltRequests { + minAltRequests = reqMetric.PendingRequests + bestAlternativePod = reqMetric.PodName + } + } + + if loraAdapterRequested == "" { + targetPod = bestAlternativePod + if targetPod == "" { + fmt.Println("Error: No pod found") + } else { + fmt.Printf("Selected the best alternative pod: %s with %d pending requests\n", targetPod, minAltRequests) + } + return targetPod + } + + var relevantMetrics []cache.ActiveLoraModelMetrics + for _, metric := range loraMetrics { + if metric.ModelName == loraAdapterRequested { + relevantMetrics = append(relevantMetrics, metric) + } + } + + // If no metrics found for the requested model, choose the pod with the least active adapters randomly + if len(relevantMetrics) == 0 { + minActiveAdapters := math.MaxInt + var podsWithLeastAdapters []cache.PendingRequestActiveAdaptersMetrics + for _, reqMetric := range requestMetrics { + if reqMetric.NumberOfActiveAdapters < minActiveAdapters { + minActiveAdapters = reqMetric.NumberOfActiveAdapters + podsWithLeastAdapters = []cache.PendingRequestActiveAdaptersMetrics{} + } + if reqMetric.NumberOfActiveAdapters == minActiveAdapters { + podsWithLeastAdapters = append(podsWithLeastAdapters, reqMetric) + } + } + + if len(podsWithLeastAdapters) == 0 { + fmt.Println("Error: No pod with min adapter found") + } else { + rand.Seed(time.Now().UnixNano()) + targetPod = podsWithLeastAdapters[rand.Intn(len(podsWithLeastAdapters))].PodName + fmt.Printf("Selected pod with the least active adapters: %s\n", targetPod) + } + return targetPod + } + + // Find the pod with the max lora requests among the relevant metrics + maxNumberOfPendingRequests := -1 + var bestPods []cache.ActiveLoraModelMetrics + for _, metric := range relevantMetrics { + if metric.ModelName == loraAdapterRequested { + if metric.NumberOfPendingRequests > maxNumberOfPendingRequests { + maxNumberOfPendingRequests = metric.NumberOfPendingRequests + bestPods = []cache.ActiveLoraModelMetrics{} + } + if metric.NumberOfPendingRequests == maxNumberOfPendingRequests { + bestPods = append(bestPods, metric) + } + } + } + + if len(bestPods) > 0 { + rand.Seed(time.Now().UnixNano()) + targetPod = bestPods[rand.Intn(len(bestPods))].PodName + fmt.Printf("Selected pod with the highest NumberOfPendingRequests: %s\n", targetPod) + } else { + fmt.Printf("No pods match the requested model: %s\n", loraAdapterRequested) + } + + // If the number of active Lora adapters in the selected pod is greater than the threshold, choose the pod with the least requests + if maxNumberOfPendingRequests > threshold && bestAlternativePod != "" { + targetPod = bestAlternativePod + fmt.Printf("Selected pod's active Lora adapters exceed threshold, selecting the best alternative pod: %s with %d pending requests\n", targetPod, minAltRequests) + } + + if targetPod == "" { + fmt.Println("Error: No pod found") + } + + return targetPod +} + +// FetchMetricsPeriodically fetches metrics periodically and updates the cache +func FetchMetricsPeriodically(pods []string, podIPMap map[string]string, cacheActiveLoraModel *freecache.Cache, cachePendingRequestActiveAdapters *freecache.Cache, interval time.Duration) { + for { + loraMetrics, requestMetrics := FetchMetrics(pods, podIPMap) + fmt.Printf("fetchMetricsPeriodically requestMetrics: %+v\n", requestMetrics) + fmt.Printf("fetchMetricsPeriodically loraMetrics: %+v\n", loraMetrics) + cacheActiveLoraModel.Clear() + cachePendingRequestActiveAdapters.Clear() + for _, metric := range loraMetrics { + if err := cache.SetCacheActiveLoraModel(cacheActiveLoraModel, metric); err != nil { + log.Printf("Error setting cache: %v", err) + } + } + for _, metric := range requestMetrics { + if err := cache.SetCachePendingRequestActiveAdapters(cachePendingRequestActiveAdapters, metric); err != nil { + log.Printf("Error setting cache: %v", err) + } + } + time.Sleep(interval) + } +} diff --git a/examples/poc/ext-proc/scheduling/fairness.go b/examples/poc/ext-proc/scheduling/fairness.go new file mode 100644 index 00000000..2ea9a464 --- /dev/null +++ b/examples/poc/ext-proc/scheduling/fairness.go @@ -0,0 +1,72 @@ +package scheduling + +import ( + "fmt" + "sync" + "time" +) + +type TokenCache struct { + AdapterMap sync.Map + TTL int64 +} +type TokenRepsonseData struct { + Time int64 + TokenCount int +} + +func CreateNewTokenCache(ttl int64) *TokenCache { + return &TokenCache{AdapterMap: sync.Map{}, TTL: ttl} +} + +func (c *TokenCache) IsFairRequest(adapter string) bool { + current := time.Now().Unix() + adapterTokens := 0 + total := 0 + adapterCount := 0 + fmt.Println("Liveness", adapter) + c.AdapterMap.Range(func(k, v any) bool { + local_sum := 0 + for _, entry := range v.([]TokenRepsonseData) { + if entry.Time > int64(current-c.TTL) { + local_sum += entry.TokenCount + } + } + if local_sum > 0 { + adapterCount = adapterCount + 1 + } + if k.(string) == adapter { + adapterTokens = local_sum + } else { + fmt.Println("k adapter:", k, adapter) + } + total += local_sum + return true + }) + if adapterCount == 0 { + fmt.Println("No adapter") + return true + } + fairShare := total / adapterCount + fmt.Println("adapter Tokens: %+v; Fair share %+v", adapterTokens, fairShare) + if adapterTokens > fairShare { + return false + } + return true +} + +func (c *TokenCache) StoreResponseInfo(model string, currentTime int64, totalTokens int) { + tokenData := TokenRepsonseData{Time: currentTime, TokenCount: totalTokens} + if v, ok := c.AdapterMap.Load(model); ok { + val := v.([]TokenRepsonseData) + newArr := []TokenRepsonseData{} + for _, entry := range val { + if entry.Time >= int64(currentTime-c.TTL) { + newArr = append(newArr, entry) + } + } + c.AdapterMap.Store(model, append(newArr, tokenData)) + } else { + c.AdapterMap.Store(model, []TokenRepsonseData{tokenData}) + } +} diff --git a/examples/poc/manifests/ext-proc.yaml b/examples/poc/manifests/ext-proc.yaml new file mode 100644 index 00000000..a40576bb --- /dev/null +++ b/examples/poc/manifests/ext-proc.yaml @@ -0,0 +1,68 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: grpc-server-deployment + labels: + app: grpc-server +spec: + replicas: 1 + selector: + matchLabels: + app: grpc-server + template: + metadata: + labels: + app: grpc-server + spec: + containers: + - name: grpc-server + image: # Image built from the Dockerfile in ./ext-proc + args: + #TODO: specify label selector and dynamically update pods + - -pods + - "vllm-575d76dbfc-l4w5z" + - -podIPs + - "10.100.0.7:8000" + - -enable-fairness + - "true" + ports: + - containerPort: 9002 + - name: curl + image: curlimages/curl + command: ["sleep", "3600"] +--- +apiVersion: v1 +kind: Service +metadata: + name: grpc-server-service +spec: + selector: + app: grpc-server + ports: + - protocol: TCP + port: 9002 + targetPort: 9002 + type: ClusterIP + +#TODO: specify label selector and dynamically update pods +# --- +# kind: ClusterRole +# apiVersion: rbac.authorization.k8s.io/v1 +# metadata: +# name: pod-read +# rules: +# - apiGroups: [""] +# resources: ["pods"] +# verbs: ["get", "watch", "list"] +# --- +# kind: ClusterRoleBinding +# apiVersion: rbac.authorization.k8s.io/v1 +# metadata: +# name: pod-read-binding +# subjects: +# - kind: ServiceAccount +# name: default +# namespace: default +# roleRef: +# kind: ClusterRole +# name: pod-read diff --git a/examples/poc/manifests/gateway.yaml b/examples/poc/manifests/gateway.yaml new file mode 100644 index 00000000..f2136304 --- /dev/null +++ b/examples/poc/manifests/gateway.yaml @@ -0,0 +1,115 @@ +--- +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyProxy +metadata: + name: custom-proxy-config + namespace: envoy-gateway-system +spec: + provider: + type: Kubernetes + kubernetes: + envoyDeployment: + container: + image: envoyproxy/envoy:v1.31-latest + envoyService: + patch: + type: StrategicMerge + value: + spec: + ports: + - name: http-8081 + port: 8081 + protocol: TCP + targetPort: 8081 + bootstrap: + type: Merge + value: | + static_resources: + listeners: + - name: listener_0 + address: + socket_address: + address: 0.0.0.0 + port_value: 8081 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: http + codec_type: AUTO + route_config: + name: local_route + virtual_hosts: + - name: backend + domains: ["*"] + routes: + - match: + prefix: "/" + route: + cluster: original_destination_cluster + timeout: 1000s # Increase route timeout + http_filters: + - name: envoy.filters.http.ext_proc + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + failure_mode_allow: false + grpc_service: + envoy_grpc: + cluster_name: ext_proc_cluster + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "BUFFERED" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: original_destination_cluster + type: ORIGINAL_DST + original_dst_lb_config: + use_http_header: true + http_header_name: "target-pod" + connect_timeout: 6s + lb_policy: CLUSTER_PROVIDED + dns_lookup_family: V4_ONLY + - name: ext_proc_cluster + connect_timeout: 1000s + type: STATIC + http2_protocol_options: {} + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: ext_proc_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 34.118.231.147 + port_value: 9002 +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: inference-gateway +spec: + controllerName: gateway.envoyproxy.io/gatewayclass-controller + parametersRef: + group: gateway.envoyproxy.io + kind: EnvoyProxy + name: custom-proxy-config + namespace: envoy-gateway-system +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: inference-gateway +spec: + gatewayClassName: inference-gateway + listeners: + - name: http + protocol: HTTP + port: 8080 diff --git a/examples/poc/manifests/vllm/vllm-lora-deployment.yaml b/examples/poc/manifests/vllm/vllm-lora-deployment.yaml new file mode 100644 index 00000000..dcd729b0 --- /dev/null +++ b/examples/poc/manifests/vllm/vllm-lora-deployment.yaml @@ -0,0 +1,124 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: vllm + namespace: default +spec: + replicas: 6 + selector: + matchLabels: + app: vllm + template: + metadata: + labels: + app: vllm + spec: + containers: + - name: lora + image: "ghcr.io/tomatillo-and-multiverse/vllm:demo" + imagePullPolicy: Always + command: ["python3", "-m", "vllm.entrypoints.openai.api_server"] + args: + - "--model" + - "meta-llama/Llama-2-7b-hf" + - "--tensor-parallel-size" + - "1" + - "--port" + - "8000" + - "--disable-log-requests" + - "--enable-lora" + - "--max-loras" + - "4" + - "--max-cpu-loras" + - "12" + - "--lora-modules" + - "sql-lora=/adapters/hub/models--yard1--llama-2-7b-sql-lora-test/snapshots/0dfa347e8877a4d4ed19ee56c140fa518470028c/" + - "tweet-summary=/adapters/hub/models--vineetsharma--qlora-adapter-Llama-2-7b-hf-TweetSumm/snapshots/796337d8e866318c59e38f16416e3ecd11fe5403" + - 'sql-lora-0=/adapters/yard1/llama-2-7b-sql-lora-test_0' + - 'sql-lora-1=/adapters/yard1/llama-2-7b-sql-lora-test_1' + - 'sql-lora-2=/adapters/yard1/llama-2-7b-sql-lora-test_2' + - 'sql-lora-3=/adapters/yard1/llama-2-7b-sql-lora-test_3' + - 'sql-lora-4=/adapters/yard1/llama-2-7b-sql-lora-test_4' + - 'tweet-summary-0=/adapters/vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm_0' + - 'tweet-summary-1=/adapters/vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm_1' + - 'tweet-summary-2=/adapters/vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm_2' + - 'tweet-summary-3=/adapters/vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm_3' + - 'tweet-summary-4=/adapters/vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm_4' + env: + - name: PORT + value: "8000" + - name: HUGGING_FACE_HUB_TOKEN + valueFrom: + secretKeyRef: + name: hf-token + key: token + ports: + - containerPort: 8000 + name: http + protocol: TCP + livenessProbe: + failureThreshold: 240 + httpGet: + path: /health + port: http + scheme: HTTP + initialDelaySeconds: 5 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 1 + readinessProbe: + failureThreshold: 600 + httpGet: + path: /health + port: http + scheme: HTTP + initialDelaySeconds: 5 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 1 + resources: + limits: + nvidia.com/gpu: 2 + requests: + nvidia.com/gpu: 2 + volumeMounts: + - mountPath: /data + name: data + - mountPath: /dev/shm + name: shm + - name: adapters + mountPath: "/adapters" + initContainers: + - name: adapter-loader + image: ghcr.io/tomatillo-and-multiverse/adapter-puller:demo + command: ["python"] + args: + - ./pull_adapters.py + - --adapter + - yard1/llama-2-7b-sql-lora-test + - --adapter + - vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm + - --duplicate-count + - "5" + env: + - name: HF_TOKEN + valueFrom: + secretKeyRef: + name: hf-token + key: token + - name: HF_HOME + value: /adapters + volumeMounts: + - name: adapters + mountPath: "/adapters" + restartPolicy: Always + schedulerName: default-scheduler + terminationGracePeriodSeconds: 30 + volumes: + - name: data + emptyDir: {} + - name: shm + emptyDir: + medium: Memory + - name: adapters + emptyDir: {} diff --git a/examples/poc/manifests/vllm/vllm-lora-service.yaml b/examples/poc/manifests/vllm/vllm-lora-service.yaml new file mode 100644 index 00000000..9a529bae --- /dev/null +++ b/examples/poc/manifests/vllm/vllm-lora-service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: vllm-lora + namespace: default +spec: + clusterIP: None + selector: + app: vllm + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 \ No newline at end of file