Skip to content

Commit 08605e5

Browse files
committed
Use structured logging
All logging calls are rewritten to use structured logging.
1 parent 0662f1f commit 08605e5

18 files changed

+114
-92
lines changed

pkg/ext-proc/backend/datastore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
9494
for _, model := range model.Spec.TargetModels {
9595
weights += *model.Weight
9696
}
97-
klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
97+
klog.V(logutil.VERBOSE).InfoS("Weights for model computed", "model", model.Name, "weights", weights)
9898
randomVal := r.Int31n(weights)
9999
for _, model := range model.Spec.TargetModels {
100100
if randomVal < *model.Weight {

pkg/ext-proc/backend/endpointslice_reconciler.go

+10-12
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ import (
1717
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1818
)
1919

20-
var (
21-
serviceOwnerLabel = "kubernetes.io/service-name"
22-
)
20+
var serviceOwnerLabel = "kubernetes.io/service-name"
2321

2422
type EndpointSliceReconciler struct {
2523
client.Client
@@ -33,15 +31,15 @@ type EndpointSliceReconciler struct {
3331
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
3432
inferencePool, err := c.Datastore.getInferencePool()
3533
if err != nil {
36-
klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
34+
klog.V(logutil.DEFAULT).InfoS("Skipping reconciling EndpointSlice because the InferencePool is not available yet", "error", err)
3735
return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil
3836
}
3937

40-
klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName)
38+
klog.V(logutil.DEFAULT).InfoS("Reconciling EndpointSlice", "name", req.NamespacedName)
4139

4240
endpointSlice := &discoveryv1.EndpointSlice{}
4341
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
44-
klog.Errorf("Unable to get EndpointSlice: %v", err)
42+
klog.ErrorS(err, "Unable to get EndpointSlice", "name", req.NamespacedName)
4543
return ctrl.Result{}, err
4644
}
4745
c.updateDatastore(endpointSlice, inferencePool)
@@ -52,30 +50,31 @@ func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
5250
// TODO: Support multiple endpointslices for a single service
5351
func (c *EndpointSliceReconciler) updateDatastore(
5452
slice *discoveryv1.EndpointSlice,
55-
inferencePool *v1alpha1.InferencePool) {
53+
inferencePool *v1alpha1.InferencePool,
54+
) {
5655
podMap := make(map[Pod]bool)
5756

5857
for _, endpoint := range slice.Endpoints {
59-
klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
58+
klog.V(logutil.DEFAULT).InfoS("Processing endpoint", "zone", c.Zone, "endpoint", endpoint)
6059
if c.validPod(endpoint) {
6160
pod := Pod{
6261
Name: endpoint.TargetRef.Name,
6362
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
6463
}
6564
podMap[pod] = true
66-
klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod)
65+
klog.V(logutil.DEFAULT).InfoS("Storing pod", "pod", pod)
6766
c.Datastore.pods.Store(pod, true)
6867
}
6968
}
7069

7170
removeOldPods := func(k, v any) bool {
7271
pod, ok := k.(Pod)
7372
if !ok {
74-
klog.Errorf("Unable to cast key to Pod: %v", k)
73+
klog.V(logutil.DEFAULT).ErrorS(nil, "Unable to cast key to Pod", "key", k)
7574
return false
7675
}
7776
if _, ok := podMap[pod]; !ok {
78-
klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod)
77+
klog.V(logutil.DEFAULT).InfoS("Removing pod", "pod", pod)
7978
c.Datastore.pods.Delete(pod)
8079
}
8180
return true
@@ -105,5 +104,4 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
105104
func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
106105
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
107106
return validZone && *endpoint.Conditions.Ready
108-
109107
}

pkg/ext-proc/backend/fake.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
1616
if err, ok := f.Err[pod]; ok {
1717
return nil, err
1818
}
19-
klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
19+
klog.V(1).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod])
2020
return f.Res[pod], nil
2121
}
2222

pkg/ext-proc/backend/inferencemodel_reconciler.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,21 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
2626
if req.Namespace != c.PoolNamespacedName.Namespace {
2727
return ctrl.Result{}, nil
2828
}
29-
klog.V(1).Infof("Reconciling InferenceModel %v", req.NamespacedName)
29+
30+
klogV := klog.V(logutil.DEFAULT)
31+
klogV.InfoS("Reconciling InferenceModel", "name", req.NamespacedName)
3032

3133
infModel := &v1alpha1.InferenceModel{}
3234
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
3335
if errors.IsNotFound(err) {
34-
klog.V(1).Infof("InferenceModel %v not found. Removing from datastore since object must be deleted", req.NamespacedName)
36+
klogV.InfoS("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
3537
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
3638
return ctrl.Result{}, nil
3739
}
38-
klog.Error(err, "Unable to get InferenceModel")
40+
klogV.ErrorS(err, "Unable to get InferenceModel", "name", req.NamespacedName)
3941
return ctrl.Result{}, err
4042
} else if !infModel.DeletionTimestamp.IsZero() {
41-
klog.V(1).Infof("InferenceModel %v is marked for deletion. Removing from datastore", req.NamespacedName)
43+
klogV.InfoS("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
4244
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
4345
return ctrl.Result{}, nil
4446
}
@@ -48,13 +50,15 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
4850
}
4951

5052
func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) {
53+
klogV := klog.V(logutil.DEFAULT)
54+
5155
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
52-
klog.V(1).Infof("Incoming pool ref %v, server pool name: %v", infModel.Spec.PoolRef, c.PoolNamespacedName.Name)
53-
klog.V(1).Infof("Adding/Updating InferenceModel: %v", infModel.Spec.ModelName)
56+
klogV.InfoS("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
57+
klogV.InfoS("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
5458
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
5559
return
5660
}
57-
klog.V(logutil.DEFAULT).Infof("Removing/Not adding InferenceModel: %v", infModel.Spec.ModelName)
61+
klogV.InfoS("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
5862
// If we get here. The model is not relevant to this pool, remove.
5963
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
6064
}

pkg/ext-proc/backend/inferencepool_reconciler.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
ctrl "sigs.k8s.io/controller-runtime"
1111
"sigs.k8s.io/controller-runtime/pkg/client"
1212
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
13+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1314
)
1415

1516
// InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources
@@ -28,11 +29,12 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
2829
if req.NamespacedName.Name != c.PoolNamespacedName.Name || req.NamespacedName.Namespace != c.PoolNamespacedName.Namespace {
2930
return ctrl.Result{}, nil
3031
}
31-
klog.V(1).Info("reconciling InferencePool", req.NamespacedName)
32+
klogV := klog.V(logutil.DEFAULT)
33+
klogV.InfoS("Reconciling InferencePool", "name", req.NamespacedName)
3234

3335
serverPool := &v1alpha1.InferencePool{}
3436
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
35-
klog.Error(err, "unable to get InferencePool")
37+
klogV.ErrorS(err, "Unable to get InferencePool", "name", req.NamespacedName)
3638
return ctrl.Result{}, err
3739
}
3840

@@ -45,7 +47,7 @@ func (c *InferencePoolReconciler) updateDatastore(serverPool *v1alpha1.Inference
4547
pool, _ := c.Datastore.getInferencePool()
4648
if pool == nil ||
4749
serverPool.ObjectMeta.ResourceVersion != pool.ObjectMeta.ResourceVersion {
48-
klog.Infof("Updating inference pool to %v/%v", serverPool.ObjectMeta.Namespace, serverPool.ObjectMeta.Name)
50+
klog.V(logutil.DEFAULT).InfoS("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta))
4951
c.Datastore.setInferencePool(serverPool)
5052
}
5153
}

pkg/ext-proc/backend/provider.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
6363
p.refreshPodsOnce()
6464

6565
if err := p.refreshMetricsOnce(); err != nil {
66-
klog.Errorf("Failed to init metrics: %v", err)
66+
klog.ErrorS(err, "Failed to init metrics")
6767
}
6868

69-
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
69+
klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics())
7070

7171
// periodically refresh pods
7272
go func() {
@@ -81,7 +81,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
8181
for {
8282
time.Sleep(refreshMetricsInterval)
8383
if err := p.refreshMetricsOnce(); err != nil {
84-
klog.V(logutil.TRACE).Infof("Failed to refresh metrics: %v", err)
84+
klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics")
8585
}
8686
}
8787
}()
@@ -95,11 +95,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
9595
}()
9696

9797
// Periodically print out the pods and metrics for DEBUGGING.
98-
if klog.V(logutil.DEBUG).Enabled() {
98+
if klogV := klog.V(logutil.DEBUG); klogV.Enabled() {
9999
go func() {
100100
for {
101101
time.Sleep(5 * time.Second)
102-
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
102+
klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics())
103103
}
104104
}()
105105
}
@@ -138,18 +138,19 @@ func (p *Provider) refreshPodsOnce() {
138138
}
139139

140140
func (p *Provider) refreshMetricsOnce() error {
141+
klogV := klog.V(logutil.TRACE)
141142
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
142143
defer cancel()
143144
start := time.Now()
144145
defer func() {
145146
d := time.Since(start)
146147
// TODO: add a metric instead of logging
147-
klog.V(logutil.TRACE).Infof("Refreshed metrics in %v", d)
148+
klogV.InfoS("Metrics refreshed", "duration", d)
148149
}()
149150
var wg sync.WaitGroup
150151
errCh := make(chan error)
151152
processOnePod := func(key, value any) bool {
152-
klog.V(logutil.TRACE).Infof("Processing pod %v and metric %v", key, value)
153+
klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value)
153154
pod := key.(Pod)
154155
existing := value.(*PodMetrics)
155156
wg.Add(1)
@@ -161,7 +162,7 @@ func (p *Provider) refreshMetricsOnce() error {
161162
return
162163
}
163164
p.UpdatePodMetrics(pod, updated)
164-
klog.V(logutil.TRACE).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
165+
klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics)
165166
}()
166167
return true
167168
}
@@ -185,7 +186,7 @@ func (p *Provider) refreshMetricsOnce() error {
185186
}
186187

187188
func (p *Provider) flushPrometheusMetricsOnce() {
188-
klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")
189+
klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics")
189190

190191
pool, _ := p.datastore.getInferencePool()
191192
if pool == nil {

pkg/ext-proc/backend/vllm/metrics.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ const (
3232
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
3333
)
3434

35-
type PodMetricsClientImpl struct {
36-
}
35+
type PodMetricsClientImpl struct{}
3736

3837
// FetchMetrics fetches metrics from a given pod.
3938
func (p *PodMetricsClientImpl) FetchMetrics(
@@ -46,19 +45,20 @@ func (p *PodMetricsClientImpl) FetchMetrics(
4645
url := fmt.Sprintf("http://%s/metrics", pod.Address)
4746
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
4847
if err != nil {
48+
klog.ErrorS(err, "Failed create HTTP request", "method", http.MethodGet, "url", url)
4949
return nil, fmt.Errorf("failed to create request: %v", err)
5050
}
5151
resp, err := http.DefaultClient.Do(req)
5252
if err != nil {
53-
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
53+
klog.ErrorS(err, "Failed to fetch metrics", "pod", pod)
5454
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
5555
}
5656
defer func() {
5757
_ = resp.Body.Close()
5858
}()
5959

6060
if resp.StatusCode != http.StatusOK {
61-
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
61+
klog.ErrorS(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode)
6262
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
6363
}
6464

@@ -138,7 +138,7 @@ func promToPodMetrics(
138138
func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) {
139139
loraRequests, ok := metricFamilies[LoraRequestInfoMetricName]
140140
if !ok {
141-
klog.Warningf("metric family %q not found", LoraRequestInfoMetricName)
141+
klog.ErrorS(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
142142
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
143143
}
144144
var latestTs float64
@@ -157,7 +157,7 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr
157157
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) {
158158
mf, ok := metricFamilies[metricName]
159159
if !ok {
160-
klog.Warningf("metric family %q not found", metricName)
160+
klog.ErrorS(nil, "Metric family not found", "name", metricName)
161161
return nil, fmt.Errorf("metric family %q not found", metricName)
162162
}
163163
if len(mf.GetMetric()) == 0 {
@@ -171,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
171171
latest = m
172172
}
173173
}
174-
klog.V(logutil.TRACE).Infof("Got metric value %+v for metric %v", latest, metricName)
174+
klog.V(logutil.TRACE).InfoS("Metric value selected", "value", latest, "metric", metricName)
175175
return latest, nil
176176
}

pkg/ext-proc/handlers/request.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,24 @@ import (
1919
// parameter.
2020
// Envoy sends the request body to ext proc before sending the request to the backend server.
2121
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
22-
klog.V(logutil.VERBOSE).Infof("Handling request body")
22+
klogV := klog.V(logutil.VERBOSE)
23+
klogV.InfoS("Handling request body")
2324

2425
// Unmarshal request body (must be JSON).
2526
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
2627
var rb map[string]interface{}
2728
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
28-
klog.Errorf("Error unmarshaling request body: %v", err)
29+
klog.ErrorS(err, "Error unmarshaling request body")
2930
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
3031
}
31-
klog.V(logutil.VERBOSE).Infof("Request body: %v", rb)
32+
klogV.InfoS("Request body unmarshalled", "body", rb)
3233

3334
// Resolve target models.
3435
model, ok := rb["model"].(string)
3536
if !ok {
3637
return nil, errors.New("model not found in request")
3738
}
38-
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
39+
klogV.InfoS("Model requested", "model", model)
3940
modelName := model
4041

4142
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
@@ -56,7 +57,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
5657
ResolvedTargetModel: modelName,
5758
Critical: backend.IsCritical(modelObj),
5859
}
59-
klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq)
60+
klogV.InfoS("LLM request assembled", "request", llmReq)
6061

6162
requestBody := v.RequestBody.Body
6263
var err error
@@ -65,17 +66,17 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
6566
rb["model"] = llmReq.ResolvedTargetModel
6667
requestBody, err = json.Marshal(rb)
6768
if err != nil {
68-
klog.Errorf("Error marshaling request body: %v", err)
69+
klog.V(logutil.DEFAULT).ErrorS(err, "Error marshaling request body")
6970
return nil, fmt.Errorf("error marshaling request body: %v", err)
7071
}
71-
klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody))
72+
klogV.InfoS("Updated request body marshalled", "body", string(requestBody))
7273
}
7374

7475
targetPod, err := s.scheduler.Schedule(llmReq)
7576
if err != nil {
7677
return nil, fmt.Errorf("failed to find target pod: %w", err)
7778
}
78-
klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
79+
klogV.InfoS("Target model and pod selected", "model", llmReq.ResolvedTargetModel, "pod", targetPod)
7980

8081
reqCtx.Model = llmReq.Model
8182
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
@@ -101,7 +102,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
101102
}
102103
// Print headers for debugging
103104
for _, header := range headers {
104-
klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
105+
klog.V(logutil.DEBUG).InfoS("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
105106
}
106107

107108
resp := &extProcPb.ProcessingResponse{
@@ -136,10 +137,9 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
136137
}
137138

138139
func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
139-
klog.V(logutil.VERBOSE).Info("Handling request headers ...")
140140
r := req.Request
141141
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
142-
klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h)
142+
klog.V(logutil.VERBOSE).InfoS("Handling request headers", "headers", h)
143143

144144
resp := &extProcPb.ProcessingResponse{
145145
Response: &extProcPb.ProcessingResponse_RequestHeaders{

0 commit comments

Comments
 (0)