Skip to content

Commit 3377ed6

Browse files
tchapkfswain
authored andcommitted
Use structured logging (kubernetes-sigs#330)
* Use structured logging All logging calls are rewritten to use structured logging. * test/integration: Use logutil.Fatal
1 parent c5b7563 commit 3377ed6

File tree

18 files changed

+128
-96
lines changed

18 files changed

+128
-96
lines changed

pkg/ext-proc/backend/datastore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
9898
for _, model := range model.Spec.TargetModels {
9999
weights += *model.Weight
100100
}
101-
klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
101+
klog.V(logutil.VERBOSE).InfoS("Weights for model computed", "model", model.Name, "weights", weights)
102102
randomVal := r.Int31n(weights)
103103
for _, model := range model.Spec.TargetModels {
104104
if randomVal < *model.Weight {

pkg/ext-proc/backend/fake.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
klog "k8s.io/klog/v2"
77
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
8+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
89
)
910

1011
type FakePodMetricsClient struct {
@@ -16,7 +17,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
1617
if err, ok := f.Err[pod]; ok {
1718
return nil, err
1819
}
19-
klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
20+
klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod])
2021
return f.Res[pod], nil
2122
}
2223

pkg/ext-proc/backend/inferencemodel_reconciler.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,21 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
2626
if req.Namespace != c.PoolNamespacedName.Namespace {
2727
return ctrl.Result{}, nil
2828
}
29-
klog.V(1).Infof("Reconciling InferenceModel %v", req.NamespacedName)
29+
30+
klogV := klog.V(logutil.DEFAULT)
31+
klogV.InfoS("Reconciling InferenceModel", "name", req.NamespacedName)
3032

3133
infModel := &v1alpha1.InferenceModel{}
3234
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
3335
if errors.IsNotFound(err) {
34-
klog.V(1).Infof("InferenceModel %v not found. Removing from datastore since object must be deleted", req.NamespacedName)
36+
klogV.InfoS("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
3537
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
3638
return ctrl.Result{}, nil
3739
}
38-
klog.Error(err, "Unable to get InferenceModel")
40+
klogV.ErrorS(err, "Unable to get InferenceModel", "name", req.NamespacedName)
3941
return ctrl.Result{}, err
4042
} else if !infModel.DeletionTimestamp.IsZero() {
41-
klog.V(1).Infof("InferenceModel %v is marked for deletion. Removing from datastore", req.NamespacedName)
43+
klogV.InfoS("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
4244
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
4345
return ctrl.Result{}, nil
4446
}
@@ -48,13 +50,15 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
4850
}
4951

5052
func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) {
53+
klogV := klog.V(logutil.DEFAULT)
54+
5155
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
52-
klog.V(1).Infof("Incoming pool ref %v, server pool name: %v", infModel.Spec.PoolRef, c.PoolNamespacedName.Name)
53-
klog.V(1).Infof("Adding/Updating InferenceModel: %v", infModel.Spec.ModelName)
56+
klogV.InfoS("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
57+
klogV.InfoS("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
5458
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
5559
return
5660
}
57-
klog.V(logutil.DEFAULT).Infof("Removing/Not adding InferenceModel: %v", infModel.Spec.ModelName)
61+
klogV.InfoS("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
5862
// If we get here. The model is not relevant to this pool, remove.
5963
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
6064
}

pkg/ext-proc/backend/inferencepool_reconciler.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
ctrl "sigs.k8s.io/controller-runtime"
1212
"sigs.k8s.io/controller-runtime/pkg/client"
1313
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
14+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1415
)
1516

1617
// InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources
@@ -28,11 +29,12 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
2829
if req.NamespacedName.Name != c.PoolNamespacedName.Name || req.NamespacedName.Namespace != c.PoolNamespacedName.Namespace {
2930
return ctrl.Result{}, nil
3031
}
31-
klog.V(1).Info("reconciling InferencePool", req.NamespacedName)
32+
klogV := klog.V(logutil.DEFAULT)
33+
klogV.InfoS("Reconciling InferencePool", "name", req.NamespacedName)
3234

3335
serverPool := &v1alpha1.InferencePool{}
3436
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
35-
klog.Error(err, ": unable to get InferencePool")
37+
klogV.ErrorS(err, "Unable to get InferencePool", "name", req.NamespacedName)
3638
return ctrl.Result{}, err
3739
}
3840
if c.Datastore.inferencePool == nil || !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) {
@@ -49,7 +51,7 @@ func (c *InferencePoolReconciler) updateDatastore(serverPool *v1alpha1.Inference
4951
pool, _ := c.Datastore.getInferencePool()
5052
if pool == nil ||
5153
serverPool.ObjectMeta.ResourceVersion != pool.ObjectMeta.ResourceVersion {
52-
klog.Infof("Updating inference pool to %v/%v", serverPool.ObjectMeta.Namespace, serverPool.ObjectMeta.Name)
54+
klog.V(logutil.DEFAULT).InfoS("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta))
5355
c.Datastore.setInferencePool(serverPool)
5456
}
5557
}

pkg/ext-proc/backend/provider.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
6363
p.refreshPodsOnce()
6464

6565
if err := p.refreshMetricsOnce(); err != nil {
66-
klog.Errorf("Failed to init metrics: %v", err)
66+
klog.ErrorS(err, "Failed to init metrics")
6767
}
6868

69-
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
69+
klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics())
7070

7171
// periodically refresh pods
7272
go func() {
@@ -81,7 +81,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
8181
for {
8282
time.Sleep(refreshMetricsInterval)
8383
if err := p.refreshMetricsOnce(); err != nil {
84-
klog.V(logutil.TRACE).Infof("Failed to refresh metrics: %v", err)
84+
klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics")
8585
}
8686
}
8787
}()
@@ -95,11 +95,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
9595
}()
9696

9797
// Periodically print out the pods and metrics for DEBUGGING.
98-
if klog.V(logutil.DEBUG).Enabled() {
98+
if klogV := klog.V(logutil.DEBUG); klogV.Enabled() {
9999
go func() {
100100
for {
101101
time.Sleep(5 * time.Second)
102-
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
102+
klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics())
103103
}
104104
}()
105105
}
@@ -138,18 +138,19 @@ func (p *Provider) refreshPodsOnce() {
138138
}
139139

140140
func (p *Provider) refreshMetricsOnce() error {
141+
klogV := klog.V(logutil.TRACE)
141142
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
142143
defer cancel()
143144
start := time.Now()
144145
defer func() {
145146
d := time.Since(start)
146147
// TODO: add a metric instead of logging
147-
klog.V(logutil.TRACE).Infof("Refreshed metrics in %v", d)
148+
klogV.InfoS("Metrics refreshed", "duration", d)
148149
}()
149150
var wg sync.WaitGroup
150151
errCh := make(chan error)
151152
processOnePod := func(key, value any) bool {
152-
klog.V(logutil.TRACE).Infof("Processing pod %v and metric %v", key, value)
153+
klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value)
153154
pod := key.(Pod)
154155
existing := value.(*PodMetrics)
155156
wg.Add(1)
@@ -161,7 +162,7 @@ func (p *Provider) refreshMetricsOnce() error {
161162
return
162163
}
163164
p.UpdatePodMetrics(pod, updated)
164-
klog.V(logutil.TRACE).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
165+
klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics)
165166
}()
166167
return true
167168
}
@@ -185,7 +186,7 @@ func (p *Provider) refreshMetricsOnce() error {
185186
}
186187

187188
func (p *Provider) flushPrometheusMetricsOnce() {
188-
klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")
189+
klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics")
189190

190191
pool, _ := p.datastore.getInferencePool()
191192
if pool == nil {

pkg/ext-proc/backend/vllm/metrics.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ const (
3232
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
3333
)
3434

35-
type PodMetricsClientImpl struct {
36-
}
35+
type PodMetricsClientImpl struct{}
3736

3837
// FetchMetrics fetches metrics from a given pod.
3938
func (p *PodMetricsClientImpl) FetchMetrics(
@@ -46,19 +45,20 @@ func (p *PodMetricsClientImpl) FetchMetrics(
4645
url := fmt.Sprintf("http://%s/metrics", pod.Address)
4746
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
4847
if err != nil {
48+
klog.V(logutil.DEFAULT).ErrorS(err, "Failed create HTTP request", "method", http.MethodGet, "url", url)
4949
return nil, fmt.Errorf("failed to create request: %v", err)
5050
}
5151
resp, err := http.DefaultClient.Do(req)
5252
if err != nil {
53-
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
53+
klog.V(logutil.DEFAULT).ErrorS(err, "Failed to fetch metrics", "pod", pod)
5454
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
5555
}
5656
defer func() {
5757
_ = resp.Body.Close()
5858
}()
5959

6060
if resp.StatusCode != http.StatusOK {
61-
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
61+
klog.V(logutil.DEFAULT).ErrorS(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode)
6262
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
6363
}
6464

@@ -138,7 +138,7 @@ func promToPodMetrics(
138138
func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) {
139139
loraRequests, ok := metricFamilies[LoraRequestInfoMetricName]
140140
if !ok {
141-
klog.Warningf("metric family %q not found", LoraRequestInfoMetricName)
141+
klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
142142
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
143143
}
144144
var latestTs float64
@@ -157,7 +157,7 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr
157157
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) {
158158
mf, ok := metricFamilies[metricName]
159159
if !ok {
160-
klog.Warningf("metric family %q not found", metricName)
160+
klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", metricName)
161161
return nil, fmt.Errorf("metric family %q not found", metricName)
162162
}
163163
if len(mf.GetMetric()) == 0 {
@@ -171,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
171171
latest = m
172172
}
173173
}
174-
klog.V(logutil.TRACE).Infof("Got metric value %+v for metric %v", latest, metricName)
174+
klog.V(logutil.TRACE).InfoS("Metric value selected", "value", latest, "metric", metricName)
175175
return latest, nil
176176
}

pkg/ext-proc/handlers/request.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,24 @@ import (
1919
// parameter.
2020
// Envoy sends the request body to ext proc before sending the request to the backend server.
2121
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
22-
klog.V(logutil.VERBOSE).Infof("Handling request body")
22+
klogV := klog.V(logutil.VERBOSE)
23+
klogV.InfoS("Handling request body")
2324

2425
// Unmarshal request body (must be JSON).
2526
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
2627
var rb map[string]interface{}
2728
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
28-
klog.Errorf("Error unmarshaling request body: %v", err)
29+
klog.V(logutil.DEFAULT).ErrorS(err, "Error unmarshaling request body")
2930
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
3031
}
31-
klog.V(logutil.VERBOSE).Infof("Request body: %v", rb)
32+
klogV.InfoS("Request body unmarshalled", "body", rb)
3233

3334
// Resolve target models.
3435
model, ok := rb["model"].(string)
3536
if !ok {
3637
return nil, errors.New("model not found in request")
3738
}
38-
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
39+
klogV.InfoS("Model requested", "model", model)
3940
modelName := model
4041

4142
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
@@ -56,7 +57,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
5657
ResolvedTargetModel: modelName,
5758
Critical: backend.IsCritical(modelObj),
5859
}
59-
klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq)
60+
klogV.InfoS("LLM request assembled", "request", llmReq)
6061

6162
requestBody := v.RequestBody.Body
6263
var err error
@@ -65,17 +66,17 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
6566
rb["model"] = llmReq.ResolvedTargetModel
6667
requestBody, err = json.Marshal(rb)
6768
if err != nil {
68-
klog.Errorf("Error marshaling request body: %v", err)
69+
klog.V(logutil.DEFAULT).ErrorS(err, "Error marshaling request body")
6970
return nil, fmt.Errorf("error marshaling request body: %v", err)
7071
}
71-
klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody))
72+
klogV.InfoS("Updated request body marshalled", "body", string(requestBody))
7273
}
7374

7475
targetPod, err := s.scheduler.Schedule(llmReq)
7576
if err != nil {
7677
return nil, fmt.Errorf("failed to find target pod: %w", err)
7778
}
78-
klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
79+
klogV.InfoS("Target model and pod selected", "model", llmReq.ResolvedTargetModel, "pod", targetPod)
7980

8081
reqCtx.Model = llmReq.Model
8182
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
@@ -101,7 +102,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
101102
}
102103
// Print headers for debugging
103104
for _, header := range headers {
104-
klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
105+
klog.V(logutil.DEBUG).InfoS("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
105106
}
106107

107108
resp := &extProcPb.ProcessingResponse{
@@ -136,10 +137,9 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
136137
}
137138

138139
func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
139-
klog.V(logutil.VERBOSE).Info("Handling request headers ...")
140140
r := req.Request
141141
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
142-
klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h)
142+
klog.V(logutil.VERBOSE).InfoS("Handling request headers", "headers", h)
143143

144144
resp := &extProcPb.ProcessingResponse{
145145
Response: &extProcPb.ProcessingResponse_RequestHeaders{

pkg/ext-proc/handlers/response.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import (
1212

1313
// HandleResponseHeaders processes response headers from the backend model server.
1414
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
15-
klog.V(logutil.VERBOSE).Info("Processing ResponseHeaders")
15+
klog.V(logutil.VERBOSE).InfoS("Processing ResponseHeaders")
1616
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
17-
klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h)
17+
klog.V(logutil.VERBOSE).InfoS("Headers before", "headers", h)
1818

1919
resp := &extProcPb.ProcessingResponse{
2020
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
@@ -66,7 +66,7 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
6666
}
6767
}*/
6868
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
69-
klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody")
69+
klog.V(logutil.VERBOSE).InfoS("Processing HandleResponseBody")
7070
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
7171

7272
res := Response{}
@@ -81,7 +81,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce
8181
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
8282
// will add the processing for streaming case.
8383
reqCtx.ResponseComplete = true
84-
klog.V(logutil.VERBOSE).Infof("Response: %+v", res)
84+
klog.V(logutil.VERBOSE).InfoS("Response generated", "response", res)
8585

8686
resp := &extProcPb.ProcessingResponse{
8787
Response: &extProcPb.ProcessingResponse_ResponseBody{

0 commit comments

Comments
 (0)