From f3c34bd2ebf9b0c0fd87d27aa2bfd0954364aad6 Mon Sep 17 00:00:00 2001 From: jeffluoo Date: Fri, 31 Jan 2025 18:10:09 +0000 Subject: [PATCH] [Metrics] Add request error metrics This change defines some general errors, the list might grow in the future if more finer error types are needed. --- pkg/ext-proc/handlers/request.go | 14 ++-- pkg/ext-proc/handlers/response.go | 40 +++++++++- pkg/ext-proc/handlers/server.go | 58 ++++++++++++-- pkg/ext-proc/metrics/README.md | 1 + pkg/ext-proc/metrics/metrics.go | 18 +++++ pkg/ext-proc/metrics/metrics_test.go | 77 +++++++++++++++++-- .../testdata/request_error_total_metric | 5 ++ pkg/ext-proc/scheduling/scheduler.go | 7 +- pkg/ext-proc/util/error/error.go | 34 ++++++++ 9 files changed, 229 insertions(+), 25 deletions(-) create mode 100644 pkg/ext-proc/metrics/testdata/request_error_total_metric create mode 100644 pkg/ext-proc/util/error/error.go diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 7f6178d6a..34db206de 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -19,7 +19,6 @@ package handlers import ( "context" "encoding/json" - "errors" "fmt" "strconv" @@ -29,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -49,14 +49,14 @@ func (s *Server) HandleRequestBody( var rb map[string]interface{} if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil { logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body") - return nil, fmt.Errorf("error unmarshaling request body: %v", err) + return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("error unmarshaling request body: %v", err)} } loggerVerbose.Info("Request body unmarshalled", "body", rb) // Resolve target models. model, ok := rb["model"].(string) if !ok { - return nil, errors.New("model not found in request") + return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"} } loggerVerbose.Info("Model requested", "model", model) modelName := model @@ -66,12 +66,12 @@ func (s *Server) HandleRequestBody( // are able to be requested by using their distinct name. modelObj, exist := s.datastore.ModelGet(model) if !exist { - return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model) + return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", model)} } if len(modelObj.Spec.TargetModels) > 0 { modelName = datastore.RandomWeightedDraw(logger, modelObj, 0) if modelName == "" { - return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name) + return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)} } } llmReq := &scheduling.LLMRequest{ @@ -89,14 +89,14 @@ func (s *Server) HandleRequestBody( requestBody, err = json.Marshal(rb) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body") - return nil, fmt.Errorf("error marshaling request body: %v", err) + return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)} } loggerVerbose.Info("Updated request body marshalled", "body", string(requestBody)) } targetPod, err := s.scheduler.Schedule(ctx, llmReq) if err != nil { - return nil, fmt.Errorf("failed to find target pod: %w", err) + return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} } logger.V(logutil.DEFAULT).Info("Request handled", diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index afe7549b3..ed3082c51 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -24,6 +24,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "sigs.k8s.io/controller-runtime/pkg/log" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -38,6 +39,43 @@ func (s *Server) HandleResponseHeaders( h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders) loggerVerbose.Info("Headers before", "headers", h) + // Example header + // { + // "ResponseHeaders": { + // "headers": [ + // { + // "key": ":status", + // "raw_value": "200" + // }, + // { + // "key": "date", + // "raw_value": "Thu, 30 Jan 2025 18:50:48 GMT" + // }, + // { + // "key": "server", + // "raw_value": "uvicorn" + // }, + // { + // "key": "content-type", + // "raw_value": "text/event-stream; charset=utf-8" + // }, + // { + // "key": "transfer-encoding", + // "raw_value": "chunked" + // } + // ] + // } + // } + for _, header := range h.ResponseHeaders.Headers.GetHeaders() { + if header.Key == "status" { + code := header.RawValue[0] + if string(code) != "200" { + reqCtx.ResponseStatusCode = errutil.ModelServerError + } + break + } + } + resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ ResponseHeaders: &extProcPb.HeadersResponse{ @@ -99,7 +137,7 @@ func (s *Server) HandleResponseBody( res := Response{} if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { - return nil, fmt.Errorf("unmarshaling response body: %v", err) + return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)} } reqCtx.Response = res reqCtx.ResponseSize = len(body.ResponseBody.Body) diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index a52742752..506eaa97d 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -65,6 +66,18 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { // See https://github.com/envoyproxy/envoy/issues/17540. reqCtx := &RequestContext{} + // Create variable for error handling as each request should only report once for + // error metric. This doesn't cover the error "Cannot receive stream request" because + // such error might happen even the response is processed. + var err error + defer func(error) { + if reqCtx.ResponseStatusCode != "" { + metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode) + } else if err != nil { + metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err)) + } + }(err) + for { select { case <-ctx.Done(): @@ -72,11 +85,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { default: } - req, err := srv.Recv() - if err == io.EOF || errors.Is(err, context.Canceled) { + req, recvErr := srv.Recv() + if recvErr == io.EOF || errors.Is(recvErr, context.Canceled) { return nil } - if err != nil { + if recvErr != nil { // This error occurs very frequently, though it doesn't seem to have any impact. // TODO Figure out if we can remove this noise. loggerVerbose.Error(err, "Cannot receive stream request") @@ -113,12 +126,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v) return status.Error(codes.Unknown, "unknown request type") } + if err != nil { logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req) - switch status.Code(err) { + switch errutil.CanonicalCode(err) { // This code can be returned by scheduler when there is no capacity for sheddable // requests. - case codes.ResourceExhausted: + case errutil.InferencePoolResourceExhausted: resp = &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ImmediateResponse{ ImmediateResponse: &extProcPb.ImmediateResponse{ @@ -128,6 +142,38 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { }, }, } + // This code can be returned by when EPP processes the request and run into server-side errors. + case errutil.Internal: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_InternalServerError, + }, + }, + }, + } + // This code can be returned when users provide invalid json request. + case errutil.BadRequest: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_BadRequest, + }, + }, + }, + } + case errutil.BadConfiguration: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_NotFound, + }, + }, + }, + } default: return status.Errorf(status.Code(err), "failed to handle request: %v", err) } @@ -139,6 +185,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } } + } // RequestContext stores context information during the life time of an HTTP request. @@ -153,4 +200,5 @@ type RequestContext struct { Response Response ResponseSize int ResponseComplete bool + ResponseStatusCode string } diff --git a/pkg/ext-proc/metrics/README.md b/pkg/ext-proc/metrics/README.md index 8adfd94e9..1f68a0bdb 100644 --- a/pkg/ext-proc/metrics/README.md +++ b/pkg/ext-proc/metrics/README.md @@ -41,6 +41,7 @@ spec: | Metric name | Metric Type | Description | Labels | Status | | ------------|--------------| ----------- | ------ | ------ | | inference_model_request_total | Counter | The counter of requests broken out for each model. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | +| inference_model_request_error_total | Counter | The counter of requests errors broken out for each model. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_request_duration_seconds | Distribution | Distribution of response latency. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_request_sizes | Distribution | Distribution of request size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | diff --git a/pkg/ext-proc/metrics/metrics.go b/pkg/ext-proc/metrics/metrics.go index a396f4aef..cc21d531b 100644 --- a/pkg/ext-proc/metrics/metrics.go +++ b/pkg/ext-proc/metrics/metrics.go @@ -44,6 +44,16 @@ var ( []string{"model_name", "target_model_name"}, ) + requestErrCounter = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Subsystem: InferenceModelComponent, + Name: "request_error_total", + Help: "Counter of inference model requests errors broken out for each model and target model.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"model_name", "target_model_name", "error_code"}, + ) + requestLatencies = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Subsystem: InferenceModelComponent, @@ -139,6 +149,7 @@ var registerMetrics sync.Once func Register() { registerMetrics.Do(func() { legacyregistry.MustRegister(requestCounter) + legacyregistry.MustRegister(requestErrCounter) legacyregistry.MustRegister(requestLatencies) legacyregistry.MustRegister(requestSizes) legacyregistry.MustRegister(responseSizes) @@ -155,6 +166,13 @@ func RecordRequestCounter(modelName, targetModelName string) { requestCounter.WithLabelValues(modelName, targetModelName).Inc() } +// RecordRequestErrCounter records the number of error requests. +func RecordRequestErrCounter(modelName, targetModelName string, code string) { + if code != "" { + requestErrCounter.WithLabelValues(modelName, targetModelName, code).Inc() + } +} + // RecordRequestSizes records the request sizes. func RecordRequestSizes(modelName, targetModelName string, reqSize int) { requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize)) diff --git a/pkg/ext-proc/metrics/metrics_test.go b/pkg/ext-proc/metrics/metrics_test.go index cf638b93c..2e891066b 100644 --- a/pkg/ext-proc/metrics/metrics_test.go +++ b/pkg/ext-proc/metrics/metrics_test.go @@ -24,18 +24,20 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) const ( - RequestTotalMetric = InferenceModelComponent + "_request_total" - RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds" - RequestSizesMetric = InferenceModelComponent + "_request_sizes" - ResponseSizesMetric = InferenceModelComponent + "_response_sizes" - InputTokensMetric = InferenceModelComponent + "_input_tokens" - OutputTokensMetric = InferenceModelComponent + "_output_tokens" - KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" - QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" + RequestTotalMetric = InferenceModelComponent + "_request_total" + RequestErrorTotalMetric = InferenceModelComponent + "_request_error_total" + RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds" + RequestSizesMetric = InferenceModelComponent + "_request_sizes" + ResponseSizesMetric = InferenceModelComponent + "_response_sizes" + InputTokensMetric = InferenceModelComponent + "_input_tokens" + OutputTokensMetric = InferenceModelComponent + "_output_tokens" + KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" + QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" ) func TestRecordRequestCounterandSizes(t *testing.T) { @@ -107,6 +109,65 @@ func TestRecordRequestCounterandSizes(t *testing.T) { } } +func TestRecordRequestErrorCounter(t *testing.T) { + type requests struct { + modelName string + targetModelName string + error string + } + scenarios := []struct { + name string + reqs []requests + invalid bool + }{{ + name: "multiple requests", + reqs: []requests{ + { + modelName: "m10", + targetModelName: "t10", + error: errutil.Internal, + }, + { + modelName: "m10", + targetModelName: "t10", + error: errutil.Internal, + }, + { + modelName: "m10", + targetModelName: "t11", + error: errutil.ModelServerError, + }, + { + modelName: "m20", + targetModelName: "t20", + error: errutil.InferencePoolResourceExhausted, + }, + }, + }, + } + Register() + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + for _, req := range scenario.reqs { + RecordRequestErrCounter(req.modelName, req.targetModelName, req.error) + } + + wantRequestErrorCounter, err := os.Open("testdata/request_error_total_metric") + defer func() { + if err := wantRequestErrorCounter.Close(); err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil { + t.Error(err) + } + }) + } +} + func TestRecordRequestLatencies(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) timeBaseline := time.Now() diff --git a/pkg/ext-proc/metrics/testdata/request_error_total_metric b/pkg/ext-proc/metrics/testdata/request_error_total_metric new file mode 100644 index 000000000..31036eb60 --- /dev/null +++ b/pkg/ext-proc/metrics/testdata/request_error_total_metric @@ -0,0 +1,5 @@ +# HELP inference_model_request_error_total [ALPHA] Counter of inference model requests errors broken out for each model and target model. +# TYPE inference_model_request_error_total counter +inference_model_request_error_total{error_code="Internal", model_name="m10",target_model_name="t10"} 2 +inference_model_request_error_total{error_code="ModelServerError", model_name="m10",target_model_name="t11"} 1 +inference_model_request_error_total{error_code="InferencePoolResourceExhausted", model_name="m20",target_model_name="t20"} 1 diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 49402fb33..b5f2f4f23 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -23,10 +23,9 @@ import ( "math/rand" "github.com/go-logr/logr" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -102,8 +101,8 @@ var ( name: "drop request", filter: func(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { logger.V(logutil.DEFAULT).Info("Request dropped", "request", req) - return []*datastore.PodMetrics{}, status.Errorf( - codes.ResourceExhausted, "dropping request due to limited backend resources") + return []*datastore.PodMetrics{}, errutil.Error{ + Code: errutil.InferencePoolResourceExhausted, Msg: "dropping request due to limited backend resources"} }, }, } diff --git a/pkg/ext-proc/util/error/error.go b/pkg/ext-proc/util/error/error.go new file mode 100644 index 000000000..2f9c992c8 --- /dev/null +++ b/pkg/ext-proc/util/error/error.go @@ -0,0 +1,34 @@ +package error + +import ( + "fmt" +) + +// Error is an error struct for errors returned by the epp server. +type Error struct { + Code string + Msg string +} + +const ( + Unknown = "Unknown" + BadRequest = "BadRequest" + Internal = "Internal" + ModelServerError = "ModelServerError" + BadConfiguration = "BadConfiguration" + InferencePoolResourceExhausted = "InferencePoolResourceExhausted" +) + +// Error returns a string version of the error. +func (e Error) Error() string { + return fmt.Sprintf("inference gateway: %s - %s", e.Code, e.Msg) +} + +// CanonicalCode returns the error's ErrorCode. +func CanonicalCode(err error) string { + e, ok := err.(Error) + if ok { + return e.Code + } + return Unknown +}