Skip to content

Commit 4448a4b

Browse files
authored
Adding logging const and updating usage (#236)
1 parent 906de56 commit 4448a4b

File tree

11 files changed

+56
-39
lines changed

11 files changed

+56
-39
lines changed

pkg/ext-proc/backend/datastore.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77

88
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
9+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
910
corev1 "k8s.io/api/core/v1"
1011
"k8s.io/klog/v2"
1112
)
@@ -93,7 +94,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
9394
for _, model := range model.Spec.TargetModels {
9495
weights += *model.Weight
9596
}
96-
klog.V(3).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
97+
klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
9798
randomVal := r.Int31n(weights)
9899
for _, model := range model.Spec.TargetModels {
99100
if randomVal < *model.Weight {

pkg/ext-proc/backend/endpointslice_reconciler.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strconv"
66

77
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
8+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
89
discoveryv1 "k8s.io/api/discovery/v1"
910
"k8s.io/apimachinery/pkg/runtime"
1011
"k8s.io/client-go/tools/record"
@@ -29,7 +30,7 @@ type EndpointSliceReconciler struct {
2930
}
3031

3132
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
32-
klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName)
33+
klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName)
3334

3435
endpointSlice := &discoveryv1.EndpointSlice{}
3536
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
@@ -52,14 +53,14 @@ func (c *EndpointSliceReconciler) updateDatastore(
5253
podMap := make(map[Pod]bool)
5354

5455
for _, endpoint := range slice.Endpoints {
55-
klog.V(2).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
56+
klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
5657
if c.validPod(endpoint) {
5758
pod := Pod{
5859
Name: endpoint.TargetRef.Name,
5960
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
6061
}
6162
podMap[pod] = true
62-
klog.V(2).Infof("Storing pod %v", pod)
63+
klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod)
6364
c.Datastore.pods.Store(pod, true)
6465
}
6566
}
@@ -71,7 +72,7 @@ func (c *EndpointSliceReconciler) updateDatastore(
7172
return false
7273
}
7374
if _, ok := podMap[pod]; !ok {
74-
klog.V(2).Infof("Removing pod %v", pod)
75+
klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod)
7576
c.Datastore.pods.Delete(pod)
7677
}
7778
return true
@@ -83,7 +84,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
8384
inferencePoolAvailable := func(object client.Object) bool {
8485
_, err := c.Datastore.getInferencePool()
8586
if err != nil {
86-
klog.V(2).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
87+
klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
8788
}
8889
return err == nil
8990
}
@@ -99,7 +100,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
99100
wantLabel := c.ServiceName
100101
if gotLabel != wantLabel {
101102
namesapcedName := endpointSlice.ObjectMeta.Namespace + "/" + endpointSlice.ObjectMeta.Name
102-
klog.V(2).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel)
103+
klog.V(logutil.DEFAULT).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel)
103104
}
104105
return gotLabel == wantLabel
105106
}

pkg/ext-proc/backend/inferencemodel_reconciler.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
7+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
78
"k8s.io/apimachinery/pkg/runtime"
89
"k8s.io/apimachinery/pkg/types"
910
"k8s.io/client-go/tools/record"
@@ -49,7 +50,7 @@ func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceM
4950
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
5051
return
5152
}
52-
klog.V(2).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName)
53+
klog.V(logutil.DEFAULT).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName)
5354
// If we get here. The model is not relevant to this pool, remove.
5455
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
5556
}

pkg/ext-proc/backend/provider.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"go.uber.org/multierr"
10+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1011
klog "k8s.io/klog/v2"
1112
)
1213

@@ -79,13 +80,13 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
7980
for {
8081
time.Sleep(refreshMetricsInterval)
8182
if err := p.refreshMetricsOnce(); err != nil {
82-
klog.V(4).Infof("Failed to refresh metrics: %v", err)
83+
klog.V(logutil.DEBUG).Infof("Failed to refresh metrics: %v", err)
8384
}
8485
}
8586
}()
8687

8788
// Periodically print out the pods and metrics for DEBUGGING.
88-
if klog.V(4).Enabled() {
89+
if klog.V(logutil.DEBUG).Enabled() {
8990
go func() {
9091
for {
9192
time.Sleep(5 * time.Second)
@@ -134,12 +135,12 @@ func (p *Provider) refreshMetricsOnce() error {
134135
defer func() {
135136
d := time.Since(start)
136137
// TODO: add a metric instead of logging
137-
klog.V(4).Infof("Refreshed metrics in %v", d)
138+
klog.V(logutil.DEBUG).Infof("Refreshed metrics in %v", d)
138139
}()
139140
var wg sync.WaitGroup
140141
errCh := make(chan error)
141142
processOnePod := func(key, value any) bool {
142-
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
143+
klog.V(logutil.DEBUG).Infof("Processing pod %v and metric %v", key, value)
143144
pod := key.(Pod)
144145
existing := value.(*PodMetrics)
145146
wg.Add(1)
@@ -151,7 +152,7 @@ func (p *Provider) refreshMetricsOnce() error {
151152
return
152153
}
153154
p.UpdatePodMetrics(pod, updated)
154-
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
155+
klog.V(logutil.DEBUG).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
155156
}()
156157
return true
157158
}

pkg/ext-proc/backend/vllm/metrics.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/common/expfmt"
1414
"go.uber.org/multierr"
1515
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
16+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1617
klog "k8s.io/klog/v2"
1718
)
1819

@@ -170,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
170171
latest = m
171172
}
172173
}
173-
klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName)
174+
klog.V(logutil.DEBUG).Infof("Got metric value %+v for metric %v", latest, metricName)
174175
return latest, nil
175176
}

pkg/ext-proc/handlers/request.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1111
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1212
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
13+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1314
klog "k8s.io/klog/v2"
1415
)
1516

1617
// HandleRequestBody handles body of the request to the backend server, such as parsing the "model"
1718
// parameter.
1819
// Envoy sends the request body to ext proc before sending the request to the backend server.
1920
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
20-
klog.V(3).Infof("Handling request body")
21+
klog.V(logutil.VERBOSE).Infof("Handling request body")
2122

2223
// Unmarshal request body (must be JSON).
2324
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
@@ -26,14 +27,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
2627
klog.Errorf("Error unmarshaling request body: %v", err)
2728
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
2829
}
29-
klog.V(3).Infof("Request body: %v", rb)
30+
klog.V(logutil.VERBOSE).Infof("Request body: %v", rb)
3031

3132
// Resolve target models.
3233
model, ok := rb["model"].(string)
3334
if !ok {
3435
return nil, errors.New("model not found in request")
3536
}
36-
klog.V(3).Infof("Model requested: %v", model)
37+
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
3738
modelName := model
3839

3940
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
@@ -54,7 +55,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
5455
ResolvedTargetModel: modelName,
5556
Critical: backend.IsCritical(modelObj),
5657
}
57-
klog.V(3).Infof("LLM Request: %+v", llmReq)
58+
klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq)
5859

5960
requestBody := v.RequestBody.Body
6061
var err error
@@ -66,14 +67,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
6667
klog.Errorf("Error marshaling request body: %v", err)
6768
return nil, fmt.Errorf("error marshaling request body: %v", err)
6869
}
69-
klog.V(3).Infof("Updated body: %v", string(requestBody))
70+
klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody))
7071
}
7172

7273
targetPod, err := s.scheduler.Schedule(llmReq)
7374
if err != nil {
7475
return nil, fmt.Errorf("failed to find target pod: %w", err)
7576
}
76-
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
77+
klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
7778

7879
reqCtx.Model = llmReq.Model
7980
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
@@ -99,7 +100,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
99100
}
100101
// Print headers for debugging
101102
for _, header := range headers {
102-
klog.V(3).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
103+
klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
103104
}
104105

105106
resp := &extProcPb.ProcessingResponse{
@@ -122,10 +123,10 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
122123
}
123124

124125
func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
125-
klog.V(3).Info("Handling request headers ...")
126+
klog.V(logutil.VERBOSE).Info("Handling request headers ...")
126127
r := req.Request
127128
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
128-
klog.V(3).Infof("Headers: %+v\n", h)
129+
klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h)
129130

130131
resp := &extProcPb.ProcessingResponse{
131132
Response: &extProcPb.ProcessingResponse_RequestHeaders{

pkg/ext-proc/handlers/response.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import (
66

77
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
88
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
9+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
910
klog "k8s.io/klog/v2"
1011
)
1112

1213
// HandleResponseHeaders processes response headers from the backend model server.
1314
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
14-
klog.V(3).Info("Processing ResponseHeaders")
15+
klog.V(logutil.VERBOSE).Info("Processing ResponseHeaders")
1516
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
16-
klog.V(3).Infof("Headers before: %+v\n", h)
17+
klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h)
1718

1819
resp := &extProcPb.ProcessingResponse{
1920
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
@@ -65,7 +66,7 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
6566
}
6667
}*/
6768
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
68-
klog.V(3).Info("Processing HandleResponseBody")
69+
klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody")
6970
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
7071

7172
res := Response{}
@@ -80,7 +81,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce
8081
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
8182
// will add the processing for streaming case.
8283
reqCtx.ResponseComplete = true
83-
klog.V(3).Infof("Response: %+v", res)
84+
klog.V(logutil.VERBOSE).Infof("Response: %+v", res)
8485

8586
resp := &extProcPb.ProcessingResponse{
8687
Response: &extProcPb.ProcessingResponse_ResponseBody{

pkg/ext-proc/handlers/server.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1313
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1414
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
15+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1516
klog "k8s.io/klog/v2"
1617
)
1718

@@ -50,7 +51,7 @@ type ModelDataStore interface {
5051
}
5152

5253
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
53-
klog.V(3).Info("Processing")
54+
klog.V(logutil.VERBOSE).Info("Processing")
5455
ctx := srv.Context()
5556
// Create request context to share states during life time of an HTTP request.
5657
// See https://github.com/envoyproxy/envoy/issues/17540.
@@ -70,7 +71,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
7071
if err != nil {
7172
// This error occurs very frequently, though it doesn't seem to have any impact.
7273
// TODO Figure out if we can remove this noise.
73-
klog.V(3).Infof("cannot receive stream request: %v", err)
74+
klog.V(logutil.VERBOSE).Infof("cannot receive stream request: %v", err)
7475
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
7576
}
7677

@@ -79,17 +80,17 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
7980
case *extProcPb.ProcessingRequest_RequestHeaders:
8081
reqCtx.RequestReceivedTimestamp = time.Now()
8182
resp = HandleRequestHeaders(reqCtx, req)
82-
klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
83+
klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
8384
case *extProcPb.ProcessingRequest_RequestBody:
8485
resp, err = s.HandleRequestBody(reqCtx, req)
8586
if err == nil {
8687
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
8788
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
8889
}
89-
klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx)
90+
klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestBody: %+v", reqCtx)
9091
case *extProcPb.ProcessingRequest_ResponseHeaders:
9192
resp, err = s.HandleResponseHeaders(reqCtx, req)
92-
klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
93+
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
9394
case *extProcPb.ProcessingRequest_ResponseBody:
9495
resp, err = s.HandleResponseBody(reqCtx, req)
9596
if err == nil && reqCtx.ResponseComplete {
@@ -99,7 +100,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
99100
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
100101
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
101102
}
102-
klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx)
103+
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx)
103104
default:
104105
klog.Errorf("Unknown Request type %+v", v)
105106
return status.Error(codes.Unknown, "unknown request type")
@@ -124,7 +125,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
124125
}
125126
}
126127

127-
klog.V(3).Infof("response: %v", resp)
128+
klog.V(logutil.VERBOSE).Infof("response: %v", resp)
128129
if err := srv.Send(resp); err != nil {
129130
klog.Errorf("send error %v", err)
130131
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)

pkg/ext-proc/scheduling/filter.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"math"
66

77
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
8+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
89
klog "k8s.io/klog/v2"
910
)
1011

@@ -41,7 +42,7 @@ func (f *filter) Name() string {
4142
}
4243

4344
func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
44-
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods))
45+
klog.V(logutil.VERBOSE).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods))
4546

4647
filtered, err := f.filter(req, pods)
4748

@@ -54,7 +55,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
5455
if f.nextOnSuccess != nil {
5556
next = f.nextOnSuccess
5657
}
57-
klog.V(3).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered))
58+
klog.V(logutil.VERBOSE).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered))
5859
// On success, pass the filtered result to the next filter.
5960
return next.Filter(req, filtered)
6061
} else {
@@ -65,7 +66,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
6566
if f.nextOnFailure != nil {
6667
next = f.nextOnFailure
6768
}
68-
klog.V(3).Infof("onFailure %q -> %q", f.name, next.Name())
69+
klog.V(logutil.VERBOSE).Infof("onFailure %q -> %q", f.name, next.Name())
6970
// On failure, pass the initial set of pods to the next filter.
7071
return next.Filter(req, pods)
7172
}

0 commit comments

Comments
 (0)