From c79e37856cf0fab028cdb25c4365a0236b0a0d4a Mon Sep 17 00:00:00 2001 From: Ondrej Kupka Date: Thu, 13 Feb 2025 09:50:50 +0100 Subject: [PATCH 1/2] Use structured logging All logging calls are rewritten to use structured logging. --- pkg/ext-proc/backend/datastore.go | 2 +- pkg/ext-proc/backend/fake.go | 3 ++- .../backend/inferencemodel_reconciler.go | 18 +++++++++------ .../backend/inferencepool_reconciler.go | 8 ++++--- pkg/ext-proc/backend/provider.go | 19 ++++++++-------- pkg/ext-proc/backend/vllm/metrics.go | 14 ++++++------ pkg/ext-proc/handlers/request.go | 22 +++++++++---------- pkg/ext-proc/handlers/response.go | 8 +++---- pkg/ext-proc/handlers/server.go | 20 ++++++++--------- pkg/ext-proc/health.go | 5 +++-- pkg/ext-proc/main.go | 6 ++--- pkg/ext-proc/metrics/metrics.go | 12 ++++++---- pkg/ext-proc/scheduling/filter.go | 6 ++--- pkg/ext-proc/scheduling/scheduler.go | 7 +++--- pkg/ext-proc/test/benchmark/benchmark.go | 15 ++++++++++--- pkg/ext-proc/test/utils.go | 11 +++++----- pkg/ext-proc/util/logging/fatal.go | 11 ++++++++++ test/integration/hermetic_test.go | 6 ++--- 18 files changed, 113 insertions(+), 80 deletions(-) create mode 100644 pkg/ext-proc/util/logging/fatal.go diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index be3c7f0b4..a54833bc1 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -98,7 +98,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string { for _, model := range model.Spec.TargetModels { weights += *model.Weight } - klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights) + klog.V(logutil.VERBOSE).InfoS("Weights for model computed", "model", model.Name, "weights", weights) randomVal := r.Int31n(weights) for _, model := range model.Spec.TargetModels { if randomVal < *model.Weight { diff --git a/pkg/ext-proc/backend/fake.go b/pkg/ext-proc/backend/fake.go index 8c028b776..7ab8a4640 100644 --- a/pkg/ext-proc/backend/fake.go +++ b/pkg/ext-proc/backend/fake.go @@ -5,6 +5,7 @@ import ( klog "k8s.io/klog/v2" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type FakePodMetricsClient struct { @@ -16,7 +17,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi if err, ok := f.Err[pod]; ok { return nil, err } - klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod]) + klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod]) return f.Res[pod], nil } diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler.go b/pkg/ext-proc/backend/inferencemodel_reconciler.go index f0a139417..72ea063e9 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler.go +++ b/pkg/ext-proc/backend/inferencemodel_reconciler.go @@ -26,19 +26,21 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque if req.Namespace != c.PoolNamespacedName.Namespace { return ctrl.Result{}, nil } - klog.V(1).Infof("Reconciling InferenceModel %v", req.NamespacedName) + + klogV := klog.V(logutil.DEFAULT) + klogV.InfoS("Reconciling InferenceModel", "name", req.NamespacedName) infModel := &v1alpha1.InferenceModel{} if err := c.Get(ctx, req.NamespacedName, infModel); err != nil { if errors.IsNotFound(err) { - klog.V(1).Infof("InferenceModel %v not found. Removing from datastore since object must be deleted", req.NamespacedName) + klogV.InfoS("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName) c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) return ctrl.Result{}, nil } - klog.Error(err, "Unable to get InferenceModel") + klogV.ErrorS(err, "Unable to get InferenceModel", "name", req.NamespacedName) return ctrl.Result{}, err } else if !infModel.DeletionTimestamp.IsZero() { - klog.V(1).Infof("InferenceModel %v is marked for deletion. Removing from datastore", req.NamespacedName) + klogV.InfoS("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName) c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) return ctrl.Result{}, nil } @@ -48,13 +50,15 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque } func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) { + klogV := klog.V(logutil.DEFAULT) + if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name { - klog.V(1).Infof("Incoming pool ref %v, server pool name: %v", infModel.Spec.PoolRef, c.PoolNamespacedName.Name) - klog.V(1).Infof("Adding/Updating InferenceModel: %v", infModel.Spec.ModelName) + klogV.InfoS("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName) + klogV.InfoS("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName) c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel) return } - klog.V(logutil.DEFAULT).Infof("Removing/Not adding InferenceModel: %v", infModel.Spec.ModelName) + klogV.InfoS("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName) // If we get here. The model is not relevant to this pool, remove. c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) } diff --git a/pkg/ext-proc/backend/inferencepool_reconciler.go b/pkg/ext-proc/backend/inferencepool_reconciler.go index fd15ebc33..9504b4e0f 100644 --- a/pkg/ext-proc/backend/inferencepool_reconciler.go +++ b/pkg/ext-proc/backend/inferencepool_reconciler.go @@ -11,6 +11,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) // InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources @@ -28,11 +29,12 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques if req.NamespacedName.Name != c.PoolNamespacedName.Name || req.NamespacedName.Namespace != c.PoolNamespacedName.Namespace { return ctrl.Result{}, nil } - klog.V(1).Info("reconciling InferencePool", req.NamespacedName) + klogV := klog.V(logutil.DEFAULT) + klogV.InfoS("Reconciling InferencePool", "name", req.NamespacedName) serverPool := &v1alpha1.InferencePool{} if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil { - klog.Error(err, ": unable to get InferencePool") + klogV.ErrorS(err, "Unable to get InferencePool", "name", req.NamespacedName) return ctrl.Result{}, err } if c.Datastore.inferencePool == nil || !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) { @@ -49,7 +51,7 @@ func (c *InferencePoolReconciler) updateDatastore(serverPool *v1alpha1.Inference pool, _ := c.Datastore.getInferencePool() if pool == nil || serverPool.ObjectMeta.ResourceVersion != pool.ObjectMeta.ResourceVersion { - klog.Infof("Updating inference pool to %v/%v", serverPool.ObjectMeta.Namespace, serverPool.ObjectMeta.Name) + klog.V(logutil.DEFAULT).InfoS("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta)) c.Datastore.setInferencePool(serverPool) } } diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 68043d93c..d64b80b30 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -63,10 +63,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm p.refreshPodsOnce() if err := p.refreshMetricsOnce(); err != nil { - klog.Errorf("Failed to init metrics: %v", err) + klog.ErrorS(err, "Failed to init metrics") } - klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) + klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics()) // periodically refresh pods go func() { @@ -81,7 +81,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm for { time.Sleep(refreshMetricsInterval) if err := p.refreshMetricsOnce(); err != nil { - klog.V(logutil.TRACE).Infof("Failed to refresh metrics: %v", err) + klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics") } } }() @@ -95,11 +95,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm }() // Periodically print out the pods and metrics for DEBUGGING. - if klog.V(logutil.DEBUG).Enabled() { + if klogV := klog.V(logutil.DEBUG); klogV.Enabled() { go func() { for { time.Sleep(5 * time.Second) - klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics()) + klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics()) } }() } @@ -138,18 +138,19 @@ func (p *Provider) refreshPodsOnce() { } func (p *Provider) refreshMetricsOnce() error { + klogV := klog.V(logutil.TRACE) ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() start := time.Now() defer func() { d := time.Since(start) // TODO: add a metric instead of logging - klog.V(logutil.TRACE).Infof("Refreshed metrics in %v", d) + klogV.InfoS("Metrics refreshed", "duration", d) }() var wg sync.WaitGroup errCh := make(chan error) processOnePod := func(key, value any) bool { - klog.V(logutil.TRACE).Infof("Processing pod %v and metric %v", key, value) + klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value) pod := key.(Pod) existing := value.(*PodMetrics) wg.Add(1) @@ -161,7 +162,7 @@ func (p *Provider) refreshMetricsOnce() error { return } p.UpdatePodMetrics(pod, updated) - klog.V(logutil.TRACE).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) + klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics) }() return true } @@ -185,7 +186,7 @@ func (p *Provider) refreshMetricsOnce() error { } func (p *Provider) flushPrometheusMetricsOnce() { - klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics") + klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics") pool, _ := p.datastore.getInferencePool() if pool == nil { diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index e36939600..4c3804ce5 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -32,8 +32,7 @@ const ( KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity" ) -type PodMetricsClientImpl struct { -} +type PodMetricsClientImpl struct{} // FetchMetrics fetches metrics from a given pod. func (p *PodMetricsClientImpl) FetchMetrics( @@ -46,11 +45,12 @@ func (p *PodMetricsClientImpl) FetchMetrics( url := fmt.Sprintf("http://%s/metrics", pod.Address) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { + klog.V(logutil.DEFAULT).ErrorS(err, "Failed create HTTP request", "method", http.MethodGet, "url", url) return nil, fmt.Errorf("failed to create request: %v", err) } resp, err := http.DefaultClient.Do(req) if err != nil { - klog.Errorf("failed to fetch metrics from %s: %v", pod, err) + klog.V(logutil.DEFAULT).ErrorS(err, "Failed to fetch metrics", "pod", pod) return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err) } defer func() { @@ -58,7 +58,7 @@ func (p *PodMetricsClientImpl) FetchMetrics( }() if resp.StatusCode != http.StatusOK { - klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) + klog.V(logutil.DEFAULT).ErrorS(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode) return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) } @@ -138,7 +138,7 @@ func promToPodMetrics( func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) { loraRequests, ok := metricFamilies[LoraRequestInfoMetricName] if !ok { - klog.Warningf("metric family %q not found", LoraRequestInfoMetricName) + klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", LoraRequestInfoMetricName) return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName) } var latestTs float64 @@ -157,7 +157,7 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) { mf, ok := metricFamilies[metricName] if !ok { - klog.Warningf("metric family %q not found", metricName) + klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", metricName) return nil, fmt.Errorf("metric family %q not found", metricName) } if len(mf.GetMetric()) == 0 { @@ -171,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str latest = m } } - klog.V(logutil.TRACE).Infof("Got metric value %+v for metric %v", latest, metricName) + klog.V(logutil.TRACE).InfoS("Metric value selected", "value", latest, "metric", metricName) return latest, nil } diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 172780254..a36f7ae3b 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -19,23 +19,24 @@ import ( // parameter. // Envoy sends the request body to ext proc before sending the request to the backend server. func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(logutil.VERBOSE).Infof("Handling request body") + klogV := klog.V(logutil.VERBOSE) + klogV.InfoS("Handling request body") // Unmarshal request body (must be JSON). v := req.Request.(*extProcPb.ProcessingRequest_RequestBody) var rb map[string]interface{} if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil { - klog.Errorf("Error unmarshaling request body: %v", err) + klog.V(logutil.DEFAULT).ErrorS(err, "Error unmarshaling request body") return nil, fmt.Errorf("error unmarshaling request body: %v", err) } - klog.V(logutil.VERBOSE).Infof("Request body: %v", rb) + klogV.InfoS("Request body unmarshalled", "body", rb) // Resolve target models. model, ok := rb["model"].(string) if !ok { return nil, errors.New("model not found in request") } - klog.V(logutil.VERBOSE).Infof("Model requested: %v", model) + klogV.InfoS("Model requested", "model", model) modelName := model // NOTE: The nil checking for the modelObject means that we DO allow passthrough currently. @@ -56,7 +57,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces ResolvedTargetModel: modelName, Critical: backend.IsCritical(modelObj), } - klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq) + klogV.InfoS("LLM request assembled", "request", llmReq) requestBody := v.RequestBody.Body var err error @@ -65,17 +66,17 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces rb["model"] = llmReq.ResolvedTargetModel requestBody, err = json.Marshal(rb) if err != nil { - klog.Errorf("Error marshaling request body: %v", err) + klog.V(logutil.DEFAULT).ErrorS(err, "Error marshaling request body") return nil, fmt.Errorf("error marshaling request body: %v", err) } - klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody)) + klogV.InfoS("Updated request body marshalled", "body", string(requestBody)) } targetPod, err := s.scheduler.Schedule(llmReq) if err != nil { return nil, fmt.Errorf("failed to find target pod: %w", err) } - klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) + klogV.InfoS("Target model and pod selected", "model", llmReq.ResolvedTargetModel, "pod", targetPod) reqCtx.Model = llmReq.Model reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel @@ -101,7 +102,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } // Print headers for debugging for _, header := range headers { - klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue) + klog.V(logutil.DEBUG).InfoS("Request body header", "key", header.Header.Key, "value", header.Header.RawValue) } resp := &extProcPb.ProcessingResponse{ @@ -136,10 +137,9 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse { - klog.V(logutil.VERBOSE).Info("Handling request headers ...") r := req.Request h := r.(*extProcPb.ProcessingRequest_RequestHeaders) - klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h) + klog.V(logutil.VERBOSE).InfoS("Handling request headers", "headers", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestHeaders{ diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index 34a7219ae..012b0b8d6 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -12,9 +12,9 @@ import ( // HandleResponseHeaders processes response headers from the backend model server. func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(logutil.VERBOSE).Info("Processing ResponseHeaders") + klog.V(logutil.VERBOSE).InfoS("Processing ResponseHeaders") h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders) - klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h) + klog.V(logutil.VERBOSE).InfoS("Headers before", "headers", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ @@ -66,7 +66,7 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr } }*/ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody") + klog.V(logutil.VERBOSE).InfoS("Processing HandleResponseBody") body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) res := Response{} @@ -81,7 +81,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) // will add the processing for streaming case. reqCtx.ResponseComplete = true - klog.V(logutil.VERBOSE).Infof("Response: %+v", res) + klog.V(logutil.VERBOSE).InfoS("Response generated", "response", res) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index f27c9a153..a3cfcada5 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -51,7 +51,7 @@ type ModelDataStore interface { } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { - klog.V(logutil.VERBOSE).Info("Processing") + klog.V(logutil.VERBOSE).InfoS("Processing") ctx := srv.Context() // Create request context to share states during life time of an HTTP request. // See https://github.com/envoyproxy/envoy/issues/17540. @@ -71,7 +71,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { if err != nil { // This error occurs very frequently, though it doesn't seem to have any impact. // TODO Figure out if we can remove this noise. - klog.V(logutil.VERBOSE).Infof("cannot receive stream request: %v", err) + klog.V(logutil.VERBOSE).ErrorS(err, "Cannot receive stream request") return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) } @@ -80,17 +80,17 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { case *extProcPb.ProcessingRequest_RequestHeaders: reqCtx.RequestReceivedTimestamp = time.Now() resp = HandleRequestHeaders(reqCtx, req) - klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestHeaders: %+v", reqCtx) + klog.V(logutil.VERBOSE).InfoS("Request context after HandleRequestHeaders", "context", reqCtx) case *extProcPb.ProcessingRequest_RequestBody: resp, err = s.HandleRequestBody(reqCtx, req) if err == nil { metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel) metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize) } - klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestBody: %+v", reqCtx) + klog.V(logutil.VERBOSE).InfoS("Request context after HandleRequestBody", "context", reqCtx) case *extProcPb.ProcessingRequest_ResponseHeaders: resp, err = s.HandleResponseHeaders(reqCtx, req) - klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseHeaders: %+v", reqCtx) + klog.V(logutil.VERBOSE).InfoS("Request context after HandleResponseHeaders", "context", reqCtx) case *extProcPb.ProcessingRequest_ResponseBody: resp, err = s.HandleResponseBody(reqCtx, req) if err == nil && reqCtx.ResponseComplete { @@ -100,13 +100,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens) } - klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx) + klog.V(logutil.VERBOSE).InfoS("Request context after HandleResponseBody", "context", reqCtx) default: - klog.Errorf("Unknown Request type %+v", v) + klog.V(logutil.DEFAULT).ErrorS(nil, "Unknown Request type", "request", v) return status.Error(codes.Unknown, "unknown request type") } if err != nil { - klog.Errorf("failed to process request: %v", err) + klog.V(logutil.DEFAULT).ErrorS(err, "Failed to process request", "request", req) switch status.Code(err) { // This code can be returned by scheduler when there is no capacity for sheddable // requests. @@ -125,9 +125,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } - klog.V(logutil.VERBOSE).Infof("response: %v", resp) + klog.V(logutil.VERBOSE).InfoS("Response generated", "response", resp) if err := srv.Send(resp); err != nil { - klog.Errorf("send error %v", err) + klog.V(logutil.DEFAULT).ErrorS(err, "Send failed") return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } } diff --git a/pkg/ext-proc/health.go b/pkg/ext-proc/health.go index 764992b29..aabb150d9 100644 --- a/pkg/ext-proc/health.go +++ b/pkg/ext-proc/health.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/status" klog "k8s.io/klog/v2" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type healthServer struct { @@ -16,10 +17,10 @@ type healthServer struct { func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { if !s.datastore.HasSynced() { - klog.Infof("gRPC health check not serving: %s", in.String()) + klog.V(logutil.VERBOSE).InfoS("gRPC health check not serving", "service", in.Service) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil } - klog.Infof("gRPC health check serving: %s", in.String()) + klog.V(logutil.VERBOSE).InfoS("gRPC health check serving", "service", in.Service) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil } diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 968d09f5a..06c77af3e 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -102,11 +102,11 @@ func run() error { } // Print all flag values - flags := "Flags: " + flags := make(map[string]any) flag.VisitAll(func(f *flag.Flag) { - flags += fmt.Sprintf("%s=%v; ", f.Name, f.Value) + flags[f.Name] = f.Value }) - klog.Info(flags) + klog.InfoS("Flags processed", "flags", flags) datastore := backend.NewK8sDataStore() diff --git a/pkg/ext-proc/metrics/metrics.go b/pkg/ext-proc/metrics/metrics.go index 7bdc8436e..1412af6e7 100644 --- a/pkg/ext-proc/metrics/metrics.go +++ b/pkg/ext-proc/metrics/metrics.go @@ -7,6 +7,7 @@ import ( compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" klog "k8s.io/klog/v2" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) const ( @@ -31,8 +32,10 @@ var ( Subsystem: InferenceModelComponent, Name: "request_duration_seconds", Help: "Inference model response latency distribution in seconds for each model and target model.", - Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, - 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600}, + Buckets: []float64{ + 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, + 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600, + }, StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, @@ -140,10 +143,11 @@ func RecordRequestSizes(modelName, targetModelName string, reqSize int) { requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize)) } -// RecordRequstLatencies records duration of request. +// RecordRequestLatencies records duration of request. func RecordRequestLatencies(modelName, targetModelName string, received time.Time, complete time.Time) bool { if !complete.After(received) { - klog.Errorf("request latency value error for model name %v, target model name %v: complete time %v is before received time %v", modelName, targetModelName, complete, received) + klog.V(logutil.DEFAULT).ErrorS(nil, "Request latency values are invalid", + "modelName", modelName, "targetModelName", targetModelName, "completeTime", complete, "receivedTime", received) return false } elapsedSeconds := complete.Sub(received).Seconds() diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index fc0168825..ac7a287ce 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -42,7 +42,7 @@ func (f *filter) Name() string { } func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - klog.V(logutil.VERBOSE).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods)) + klog.V(logutil.VERBOSE).InfoS("Running a filter", "name", f.Name(), "request", req, "podCount", len(pods)) filtered, err := f.filter(req, pods) @@ -55,7 +55,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend if f.nextOnSuccess != nil { next = f.nextOnSuccess } - klog.V(logutil.VERBOSE).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered)) + klog.V(logutil.VERBOSE).InfoS("Filter succeeded", "filter", f.Name(), "next", next.Name(), "filteredPodCount", len(filtered)) // On success, pass the filtered result to the next filter. return next.Filter(req, filtered) } else { @@ -66,7 +66,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend if f.nextOnFailure != nil { next = f.nextOnFailure } - klog.V(logutil.VERBOSE).Infof("onFailure %q -> %q", f.name, next.Name()) + klog.V(logutil.VERBOSE).InfoS("Filter failed", "filter", f.Name(), "next", next.Name()) // On failure, pass the initial set of pods to the next filter. return next.Filter(req, pods) } diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index ca896c5a5..505648989 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -83,7 +83,7 @@ var ( nextOnFailure: &filter{ name: "drop request", filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - klog.Infof("Dropping request %v", req) + klog.V(logutil.DEFAULT).InfoS("Request dropped", "request", req) return []*backend.PodMetrics{}, status.Errorf( codes.ResourceExhausted, "dropping request due to limited backend resources") }, @@ -92,7 +92,6 @@ var ( ) func NewScheduler(pmp PodMetricsProvider) *Scheduler { - return &Scheduler{ podMetricsProvider: pmp, filter: defaultFilter, @@ -112,13 +111,13 @@ type PodMetricsProvider interface { // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { - klog.V(logutil.VERBOSE).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) + klog.V(logutil.VERBOSE).InfoS("Scheduling a request", "request", req, "metrics", s.podMetricsProvider.AllPodMetrics()) pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } - klog.V(logutil.VERBOSE).Infof("Going to randomly select a pod from the candidates: %+v", pods) + klog.V(logutil.VERBOSE).InfoS("Selecting a random pod from the candidates", "candidatePods", pods) i := rand.Intn(len(pods)) return pods[i].Pod, nil } diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index f18782d6f..c83dbcb91 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -15,6 +15,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) var ( @@ -34,13 +35,19 @@ const ( ) func main() { + if err := run(); err != nil { + os.Exit(1) + } +} + +func run() error { klog.InitFlags(nil) flag.Parse() if *localServer { test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels()) time.Sleep(time.Second) // wait until server is up - klog.Info("Server started") + klog.InfoS("Server started") } report, err := runner.Run( @@ -51,7 +58,8 @@ func main() { runner.WithTotalRequests(uint(*totalRequests)), ) if err != nil { - klog.Fatal(err) + klog.ErrorS(err, "Runner failed") + return err } printer := printer.ReportPrinter{ @@ -60,6 +68,7 @@ func main() { } printer.Print("summary") + return nil } func generateRequest(mtd *desc.MethodDescriptor, callData *runner.CallData) []byte { @@ -67,7 +76,7 @@ func generateRequest(mtd *desc.MethodDescriptor, callData *runner.CallData) []by req := test.GenerateRequest(modelName(int(callData.RequestNumber) % numModels)) data, err := proto.Marshal(req) if err != nil { - klog.Fatal("marshaling error: ", err) + logutil.Fatal(err, "Failed to marshal request", "request", req) } return data } diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index b91672fa5..4c0007220 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { @@ -26,7 +27,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refresh pmc := &backend.FakePodMetricsClient{Res: pms} pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods))) if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil { - klog.Fatalf("failed to initialize: %v", err) + logutil.Fatal(err, "Failed to initialize") } return startExtProc(port, pp, models) } @@ -35,19 +36,19 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refresh func startExtProc(port int, pp *backend.Provider, models map[string]*v1alpha1.InferenceModel) *grpc.Server { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - klog.Fatalf("failed to listen: %v", err) + logutil.Fatal(err, "Failed to listen", "port", port) } s := grpc.NewServer() extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(pp, scheduling.NewScheduler(pp), "target-pod", &backend.FakeDataStore{Res: models})) - klog.Infof("Starting gRPC server on port :%v", port) + klog.InfoS("gRPC server starting", "port", port) reflection.Register(s) go func() { err := s.Serve(lis) if err != nil { - klog.Fatalf("Ext-proc failed with the err: %v", err) + logutil.Fatal(err, "Ext-proc failed with the err") } }() return s @@ -63,7 +64,7 @@ func GenerateRequest(model string) *extProcPb.ProcessingRequest { llmReq, err := json.Marshal(j) if err != nil { - klog.Fatal(err) + logutil.Fatal(err, "Failed to unmarshal LLM request") } req := &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_RequestBody{ diff --git a/pkg/ext-proc/util/logging/fatal.go b/pkg/ext-proc/util/logging/fatal.go new file mode 100644 index 000000000..65926824d --- /dev/null +++ b/pkg/ext-proc/util/logging/fatal.go @@ -0,0 +1,11 @@ +package logging + +import "k8s.io/klog/v2" + +// Fatal calls klog.ErrorS followed by klog.FlushAndExit(1). +// +// This is a utility function and should not be used in production code! +func Fatal(err error, msg string, keysAndValues ...interface{}) { + klog.ErrorS(err, msg, keysAndValues...) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) +} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index ff018f286..6d03413be 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -491,7 +491,7 @@ func BeforeSuit() { } }() - klog.Info("Setting up hermetic ExtProc server") + klog.InfoS("Setting up hermetic ExtProc server") klog.InitFlags(nil) flag.Parse() // Configure klog verbosity levels to print ext proc logs. @@ -510,7 +510,7 @@ func BeforeSuit() { log.Fatalf("Can't unmarshal object: %v", doc) } if inferenceModel.Kind == "InferenceModel" { - klog.Infof("Creating inference model: %+v", inferenceModel) + klog.InfoS("Creating inference model", "model", inferenceModel) if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { log.Fatalf("unable to create inferenceModel %v: %v", inferenceModel.Name, err) } @@ -522,7 +522,7 @@ func BeforeSuit() { log.Fatalf("Can't unmarshal object: %v", doc) } if inferencePool.Kind == "InferencePool" { - klog.Infof("Creating inference pool: %+v", inferencePool) + klog.InfoS("Creating inference pool", "pool", inferencePool) if err := k8sClient.Create(context.Background(), inferencePool); err != nil { log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err) } From 68a1e201af415deec25d5e27b538e1ee4f7e0f63 Mon Sep 17 00:00:00 2001 From: Ondrej Kupka Date: Thu, 13 Feb 2025 21:38:58 +0100 Subject: [PATCH 2/2] test/integration: Use logutil.Fatal --- test/integration/hermetic_test.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 6d03413be..13cddfdf7 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -9,7 +9,6 @@ import ( "flag" "fmt" "io" - "log" "os" "path/filepath" "testing" @@ -35,6 +34,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" extprocutils "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" "sigs.k8s.io/yaml" ) @@ -420,7 +420,7 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP if err := serverRunner.AsRunnable( backend.NewK8sDataStore(backend.WithPods(pods)), pmc, ).Start(serverCtx); err != nil { - log.Fatalf("Failed to start ext-proc server: %v", err) + logutil.Fatal(err, "Failed to start ext-proc server") } }() @@ -431,13 +431,13 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP // Create a grpc connection conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - log.Fatalf("Failed to connect to %v: %v", address, err) + logutil.Fatal(err, "Failed to connect", "address", address) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) if err != nil { - log.Fatalf("Failed to create client: %v", err) + logutil.Fatal(err, "Failed to create client") } return client, func() { cancel() @@ -455,7 +455,7 @@ func BeforeSuit() { } cfg, err := testEnv.Start() if err != nil { - log.Fatalf("Failed to start test environment, cfg: %v error: %v", cfg, err) + logutil.Fatal(err, "Failed to start test environment", "config", cfg) } utilruntime.Must(clientgoscheme.AddToScheme(scheme)) @@ -463,16 +463,15 @@ func BeforeSuit() { k8sClient, err = k8sclient.New(cfg, k8sclient.Options{Scheme: scheme}) if err != nil { - log.Fatalf("Failed to start k8s Client: %v", err) + logutil.Fatal(err, "Failed to start k8s Client") } else if k8sClient == nil { - log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg) + logutil.Fatal(nil, "No error, but returned kubernetes client is nil", "config", cfg) } // Init runtime. mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) if err != nil { - klog.ErrorS(err, "Failed to create controller manager") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) + logutil.Fatal(err, "Failed to create controller manager") } serverRunner = runserver.NewDefaultExtProcServerRunner() @@ -481,13 +480,13 @@ func BeforeSuit() { serverRunner.Datastore = backend.NewK8sDataStore() if err := serverRunner.SetupWithManager(mgr); err != nil { - log.Fatalf("Failed to start server runner: %v", err) + logutil.Fatal(err, "Failed to setup server runner") } // Start the controller manager in go routine, not blocking go func() { if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - log.Fatalf("Failed to start manager: %v", err) + logutil.Fatal(err, "Failed to start manager") } }() @@ -501,30 +500,30 @@ func BeforeSuit() { manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") docs, err := readDocuments(manifestsPath) if err != nil { - log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) + logutil.Fatal(err, "Can't read object manifests", "path", manifestsPath) } for _, doc := range docs { inferenceModel := &v1alpha1.InferenceModel{} if err = yaml.Unmarshal(doc, inferenceModel); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) + logutil.Fatal(err, "Can't unmarshal object", "document", doc) } if inferenceModel.Kind == "InferenceModel" { klog.InfoS("Creating inference model", "model", inferenceModel) if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { - log.Fatalf("unable to create inferenceModel %v: %v", inferenceModel.Name, err) + logutil.Fatal(err, "Unable to create inferenceModel", "modelName", inferenceModel.Name) } } } for _, doc := range docs { inferencePool := &v1alpha1.InferencePool{} if err = yaml.Unmarshal(doc, inferencePool); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) + logutil.Fatal(err, "Can't unmarshal object", "document", doc) } if inferencePool.Kind == "InferencePool" { klog.InfoS("Creating inference pool", "pool", inferencePool) if err := k8sClient.Create(context.Background(), inferencePool); err != nil { - log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err) + logutil.Fatal(err, "Unable to create inferencePool", "poolName", inferencePool.Name) } } }