Skip to content

Use structured logging #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
for _, model := range model.Spec.TargetModels {
weights += *model.Weight
}
klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
klog.V(logutil.VERBOSE).InfoS("Weights for model computed", "model", model.Name, "weights", weights)
randomVal := r.Int31n(weights)
for _, model := range model.Spec.TargetModels {
if randomVal < *model.Weight {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ext-proc/backend/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

klog "k8s.io/klog/v2"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

type FakePodMetricsClient struct {
Expand All @@ -16,7 +17,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
if err, ok := f.Err[pod]; ok {
return nil, err
}
klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a VERBOSE message to me, but not sure.

klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod])
return f.Res[pod], nil
}

Expand Down
18 changes: 11 additions & 7 deletions pkg/ext-proc/backend/inferencemodel_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
if req.Namespace != c.PoolNamespacedName.Namespace {
return ctrl.Result{}, nil
}
klog.V(1).Infof("Reconciling InferenceModel %v", req.NamespacedName)

klogV := klog.V(logutil.DEFAULT)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this pattern, but what happens when I want to use a non-default log level, mixing the logging objects might be odd. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you can always just do klog.V with another level when needed. I think that there is a function somewhere here that actually uses klogV for VERBOSE and klog.V(logutil.DEFAULT) is done explicitly. There is no rule really, just saving some typing.

Actually doing klogDEBUG := klog.V(logutil.DEBUG) would be more readable as klogV does not always point to the same verbose level, but I don't have a strong feeling about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I will reconsider this when implementing contextual logging, don't feel like postponing this.

klogV.InfoS("Reconciling InferenceModel", "name", req.NamespacedName)

infModel := &v1alpha1.InferenceModel{}
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
if errors.IsNotFound(err) {
klog.V(1).Infof("InferenceModel %v not found. Removing from datastore since object must be deleted", req.NamespacedName)
klogV.InfoS("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
return ctrl.Result{}, nil
}
klog.Error(err, "Unable to get InferenceModel")
klogV.ErrorS(err, "Unable to get InferenceModel", "name", req.NamespacedName)
return ctrl.Result{}, err
} else if !infModel.DeletionTimestamp.IsZero() {
klog.V(1).Infof("InferenceModel %v is marked for deletion. Removing from datastore", req.NamespacedName)
klogV.InfoS("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
return ctrl.Result{}, nil
}
Expand All @@ -48,13 +50,15 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) {
klogV := klog.V(logutil.DEFAULT)

if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
klog.V(1).Infof("Incoming pool ref %v, server pool name: %v", infModel.Spec.PoolRef, c.PoolNamespacedName.Name)
klog.V(1).Infof("Adding/Updating InferenceModel: %v", infModel.Spec.ModelName)
klogV.InfoS("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
klogV.InfoS("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
return
}
klog.V(logutil.DEFAULT).Infof("Removing/Not adding InferenceModel: %v", infModel.Spec.ModelName)
klogV.InfoS("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
// If we get here. The model is not relevant to this pool, remove.
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ext-proc/backend/inferencepool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

// InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources
Expand All @@ -28,11 +29,12 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if req.NamespacedName.Name != c.PoolNamespacedName.Name || req.NamespacedName.Namespace != c.PoolNamespacedName.Namespace {
return ctrl.Result{}, nil
}
klog.V(1).Info("reconciling InferencePool", req.NamespacedName)
klogV := klog.V(logutil.DEFAULT)
klogV.InfoS("Reconciling InferencePool", "name", req.NamespacedName)

serverPool := &v1alpha1.InferencePool{}
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
klog.Error(err, ": unable to get InferencePool")
klogV.ErrorS(err, "Unable to get InferencePool", "name", req.NamespacedName)
return ctrl.Result{}, err
}
if c.Datastore.inferencePool == nil || !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) {
Expand All @@ -49,7 +51,7 @@ func (c *InferencePoolReconciler) updateDatastore(serverPool *v1alpha1.Inference
pool, _ := c.Datastore.getInferencePool()
if pool == nil ||
serverPool.ObjectMeta.ResourceVersion != pool.ObjectMeta.ResourceVersion {
klog.Infof("Updating inference pool to %v/%v", serverPool.ObjectMeta.Namespace, serverPool.ObjectMeta.Name)
klog.V(logutil.DEFAULT).InfoS("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta))
c.Datastore.setInferencePool(serverPool)
}
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
p.refreshPodsOnce()

if err := p.refreshMetricsOnce(); err != nil {
klog.Errorf("Failed to init metrics: %v", err)
klog.ErrorS(err, "Failed to init metrics")
}

klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics())

// periodically refresh pods
go func() {
Expand All @@ -81,7 +81,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(logutil.TRACE).Infof("Failed to refresh metrics: %v", err)
klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics")
}
}
}()
Expand All @@ -95,11 +95,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
}()

// Periodically print out the pods and metrics for DEBUGGING.
if klog.V(logutil.DEBUG).Enabled() {
if klogV := klog.V(logutil.DEBUG); klogV.Enabled() {
go func() {
for {
time.Sleep(5 * time.Second)
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics())
}
}()
}
Expand Down Expand Up @@ -138,18 +138,19 @@ func (p *Provider) refreshPodsOnce() {
}

func (p *Provider) refreshMetricsOnce() error {
klogV := klog.V(logutil.TRACE)
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
defer cancel()
start := time.Now()
defer func() {
d := time.Since(start)
// TODO: add a metric instead of logging
klog.V(logutil.TRACE).Infof("Refreshed metrics in %v", d)
klogV.InfoS("Metrics refreshed", "duration", d)
}()
var wg sync.WaitGroup
errCh := make(chan error)
processOnePod := func(key, value any) bool {
klog.V(logutil.TRACE).Infof("Processing pod %v and metric %v", key, value)
klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value)
pod := key.(Pod)
existing := value.(*PodMetrics)
wg.Add(1)
Expand All @@ -161,7 +162,7 @@ func (p *Provider) refreshMetricsOnce() error {
return
}
p.UpdatePodMetrics(pod, updated)
klog.V(logutil.TRACE).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics)
}()
return true
}
Expand All @@ -185,7 +186,7 @@ func (p *Provider) refreshMetricsOnce() error {
}

func (p *Provider) flushPrometheusMetricsOnce() {
klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")
klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics")

pool, _ := p.datastore.getInferencePool()
if pool == nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const (
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
)

type PodMetricsClientImpl struct {
}
type PodMetricsClientImpl struct{}

// FetchMetrics fetches metrics from a given pod.
func (p *PodMetricsClientImpl) FetchMetrics(
Expand All @@ -46,19 +45,20 @@ func (p *PodMetricsClientImpl) FetchMetrics(
url := fmt.Sprintf("http://%s/metrics", pod.Address)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
klog.V(logutil.DEFAULT).ErrorS(err, "Failed create HTTP request", "method", http.MethodGet, "url", url)
return nil, fmt.Errorf("failed to create request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
klog.V(logutil.DEFAULT).ErrorS(err, "Failed to fetch metrics", "pod", pod)
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
klog.V(logutil.DEFAULT).ErrorS(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode)
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func promToPodMetrics(
func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) {
loraRequests, ok := metricFamilies[LoraRequestInfoMetricName]
if !ok {
klog.Warningf("metric family %q not found", LoraRequestInfoMetricName)
klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
}
var latestTs float64
Expand All @@ -157,7 +157,7 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) {
mf, ok := metricFamilies[metricName]
if !ok {
klog.Warningf("metric family %q not found", metricName)
klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", metricName)
return nil, fmt.Errorf("metric family %q not found", metricName)
}
if len(mf.GetMetric()) == 0 {
Expand All @@ -171,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
latest = m
}
}
klog.V(logutil.TRACE).Infof("Got metric value %+v for metric %v", latest, metricName)
klog.V(logutil.TRACE).InfoS("Metric value selected", "value", latest, "metric", metricName)
return latest, nil
}
22 changes: 11 additions & 11 deletions pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ import (
// parameter.
// Envoy sends the request body to ext proc before sending the request to the backend server.
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(logutil.VERBOSE).Infof("Handling request body")
klogV := klog.V(logutil.VERBOSE)
klogV.InfoS("Handling request body")

// Unmarshal request body (must be JSON).
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
var rb map[string]interface{}
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
klog.Errorf("Error unmarshaling request body: %v", err)
klog.V(logutil.DEFAULT).ErrorS(err, "Error unmarshaling request body")
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
}
klog.V(logutil.VERBOSE).Infof("Request body: %v", rb)
klogV.InfoS("Request body unmarshalled", "body", rb)

// Resolve target models.
model, ok := rb["model"].(string)
if !ok {
return nil, errors.New("model not found in request")
}
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
klogV.InfoS("Model requested", "model", model)
modelName := model

// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
Expand All @@ -56,7 +57,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
ResolvedTargetModel: modelName,
Critical: backend.IsCritical(modelObj),
}
klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq)
klogV.InfoS("LLM request assembled", "request", llmReq)

requestBody := v.RequestBody.Body
var err error
Expand All @@ -65,17 +66,17 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
rb["model"] = llmReq.ResolvedTargetModel
requestBody, err = json.Marshal(rb)
if err != nil {
klog.Errorf("Error marshaling request body: %v", err)
klog.V(logutil.DEFAULT).ErrorS(err, "Error marshaling request body")
return nil, fmt.Errorf("error marshaling request body: %v", err)
}
klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody))
klogV.InfoS("Updated request body marshalled", "body", string(requestBody))
}

targetPod, err := s.scheduler.Schedule(llmReq)
if err != nil {
return nil, fmt.Errorf("failed to find target pod: %w", err)
}
klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
klogV.InfoS("Target model and pod selected", "model", llmReq.ResolvedTargetModel, "pod", targetPod)

reqCtx.Model = llmReq.Model
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
Expand All @@ -101,7 +102,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}
// Print headers for debugging
for _, header := range headers {
klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
klog.V(logutil.DEBUG).InfoS("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like DEBUG level to me.

}

resp := &extProcPb.ProcessingResponse{
Expand Down Expand Up @@ -136,10 +137,9 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}

func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
klog.V(logutil.VERBOSE).Info("Handling request headers ...")
r := req.Request
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h)
klog.V(logutil.VERBOSE).InfoS("Handling request headers", "headers", h)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_RequestHeaders{
Expand Down
8 changes: 4 additions & 4 deletions pkg/ext-proc/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

// HandleResponseHeaders processes response headers from the backend model server.
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(logutil.VERBOSE).Info("Processing ResponseHeaders")
klog.V(logutil.VERBOSE).InfoS("Processing ResponseHeaders")
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h)
klog.V(logutil.VERBOSE).InfoS("Headers before", "headers", h)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
Expand Down Expand Up @@ -66,7 +66,7 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
}
}*/
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody")
klog.V(logutil.VERBOSE).InfoS("Processing HandleResponseBody")
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)

res := Response{}
Expand All @@ -81,7 +81,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
// will add the processing for streaming case.
reqCtx.ResponseComplete = true
klog.V(logutil.VERBOSE).Infof("Response: %+v", res)
klog.V(logutil.VERBOSE).InfoS("Response generated", "response", res)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
Expand Down
Loading