Skip to content

Commit 9de3439

Browse files
committed
Use structured logging
All logging calls are rewritten to use structured logging.
1 parent 5bc8fcd commit 9de3439

19 files changed

+223
-80
lines changed

pkg/ext-proc/backend/datastore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
9898
for _, model := range model.Spec.TargetModels {
9999
weights += *model.Weight
100100
}
101-
klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
101+
klog.V(logutil.VERBOSE).InfoS("Weights for model computed", "model", model.Name, "weights", weights)
102102
randomVal := r.Int31n(weights)
103103
for _, model := range model.Spec.TargetModels {
104104
if randomVal < *model.Weight {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"time"
7+
8+
discoveryv1 "k8s.io/api/discovery/v1"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/client-go/tools/record"
11+
klog "k8s.io/klog/v2"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
"sigs.k8s.io/controller-runtime/pkg/builder"
14+
"sigs.k8s.io/controller-runtime/pkg/client"
15+
"sigs.k8s.io/controller-runtime/pkg/predicate"
16+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
17+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
18+
)
19+
20+
var serviceOwnerLabel = "kubernetes.io/service-name"
21+
22+
type EndpointSliceReconciler struct {
23+
client.Client
24+
Scheme *runtime.Scheme
25+
Record record.EventRecorder
26+
ServiceName string
27+
Zone string
28+
Datastore *K8sDatastore
29+
}
30+
31+
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
32+
klogV := klog.V(logutil.DEFAULT)
33+
34+
inferencePool, err := c.Datastore.getInferencePool()
35+
if err != nil {
36+
klogV.InfoS("Skipping reconciling EndpointSlice because the InferencePool is not available yet", "error", err)
37+
return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil
38+
}
39+
40+
klogV.InfoS("Reconciling EndpointSlice", "name", req.NamespacedName)
41+
42+
endpointSlice := &discoveryv1.EndpointSlice{}
43+
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
44+
klogV.ErrorS(err, "Unable to get EndpointSlice", "name", req.NamespacedName)
45+
return ctrl.Result{}, err
46+
}
47+
c.updateDatastore(endpointSlice, inferencePool)
48+
49+
return ctrl.Result{}, nil
50+
}
51+
52+
// TODO: Support multiple endpointslices for a single service
53+
func (c *EndpointSliceReconciler) updateDatastore(
54+
slice *discoveryv1.EndpointSlice,
55+
inferencePool *v1alpha1.InferencePool,
56+
) {
57+
klogV := klog.V(logutil.DEFAULT)
58+
podMap := make(map[Pod]bool)
59+
60+
for _, endpoint := range slice.Endpoints {
61+
klogV.InfoS("Processing endpoint", "zone", c.Zone, "endpoint", endpoint)
62+
if c.validPod(endpoint) {
63+
pod := Pod{
64+
Name: endpoint.TargetRef.Name,
65+
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
66+
}
67+
podMap[pod] = true
68+
klogV.InfoS("Storing pod", "pod", pod)
69+
c.Datastore.pods.Store(pod, true)
70+
}
71+
}
72+
73+
removeOldPods := func(k, v any) bool {
74+
pod, ok := k.(Pod)
75+
if !ok {
76+
klogV.ErrorS(nil, "Unable to cast key to Pod", "key", k)
77+
return false
78+
}
79+
if _, ok := podMap[pod]; !ok {
80+
klogV.InfoS("Removing pod", "pod", pod)
81+
c.Datastore.pods.Delete(pod)
82+
}
83+
return true
84+
}
85+
c.Datastore.pods.Range(removeOldPods)
86+
}
87+
88+
func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
89+
ownsEndPointSlice := func(object client.Object) bool {
90+
// Check if the object is an EndpointSlice
91+
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
92+
if !ok {
93+
return false
94+
}
95+
96+
gotLabel := endpointSlice.ObjectMeta.Labels[serviceOwnerLabel]
97+
wantLabel := c.ServiceName
98+
return gotLabel == wantLabel
99+
}
100+
101+
return ctrl.NewControllerManagedBy(mgr).
102+
For(&discoveryv1.EndpointSlice{},
103+
builder.WithPredicates(predicate.NewPredicateFuncs(ownsEndPointSlice))).
104+
Complete(c)
105+
}
106+
107+
func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
108+
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
109+
return validZone && *endpoint.Conditions.Ready
110+
}

pkg/ext-proc/backend/fake.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
klog "k8s.io/klog/v2"
77
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
8+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
89
)
910

1011
type FakePodMetricsClient struct {
@@ -16,7 +17,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
1617
if err, ok := f.Err[pod]; ok {
1718
return nil, err
1819
}
19-
klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
20+
klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod])
2021
return f.Res[pod], nil
2122
}
2223

pkg/ext-proc/backend/inferencemodel_reconciler.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,21 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
2626
if req.Namespace != c.PoolNamespacedName.Namespace {
2727
return ctrl.Result{}, nil
2828
}
29-
klog.V(1).Infof("Reconciling InferenceModel %v", req.NamespacedName)
29+
30+
klogV := klog.V(logutil.DEFAULT)
31+
klogV.InfoS("Reconciling InferenceModel", "name", req.NamespacedName)
3032

3133
infModel := &v1alpha1.InferenceModel{}
3234
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
3335
if errors.IsNotFound(err) {
34-
klog.V(1).Infof("InferenceModel %v not found. Removing from datastore since object must be deleted", req.NamespacedName)
36+
klogV.InfoS("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
3537
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
3638
return ctrl.Result{}, nil
3739
}
38-
klog.Error(err, "Unable to get InferenceModel")
40+
klogV.ErrorS(err, "Unable to get InferenceModel", "name", req.NamespacedName)
3941
return ctrl.Result{}, err
4042
} else if !infModel.DeletionTimestamp.IsZero() {
41-
klog.V(1).Infof("InferenceModel %v is marked for deletion. Removing from datastore", req.NamespacedName)
43+
klogV.InfoS("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
4244
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
4345
return ctrl.Result{}, nil
4446
}
@@ -48,13 +50,15 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
4850
}
4951

5052
func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) {
53+
klogV := klog.V(logutil.DEFAULT)
54+
5155
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
52-
klog.V(1).Infof("Incoming pool ref %v, server pool name: %v", infModel.Spec.PoolRef, c.PoolNamespacedName.Name)
53-
klog.V(1).Infof("Adding/Updating InferenceModel: %v", infModel.Spec.ModelName)
56+
klogV.InfoS("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
57+
klogV.InfoS("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
5458
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
5559
return
5660
}
57-
klog.V(logutil.DEFAULT).Infof("Removing/Not adding InferenceModel: %v", infModel.Spec.ModelName)
61+
klogV.InfoS("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
5862
// If we get here. The model is not relevant to this pool, remove.
5963
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
6064
}

pkg/ext-proc/backend/inferencepool_reconciler.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
ctrl "sigs.k8s.io/controller-runtime"
1212
"sigs.k8s.io/controller-runtime/pkg/client"
1313
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
14+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1415
)
1516

1617
// InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources
@@ -28,11 +29,12 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
2829
if req.NamespacedName.Name != c.PoolNamespacedName.Name || req.NamespacedName.Namespace != c.PoolNamespacedName.Namespace {
2930
return ctrl.Result{}, nil
3031
}
31-
klog.V(1).Info("reconciling InferencePool", req.NamespacedName)
32+
klogV := klog.V(logutil.DEFAULT)
33+
klogV.InfoS("Reconciling InferencePool", "name", req.NamespacedName)
3234

3335
serverPool := &v1alpha1.InferencePool{}
3436
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
35-
klog.Error(err, ": unable to get InferencePool")
37+
klogV.ErrorS(err, "Unable to get InferencePool", "name", req.NamespacedName)
3638
return ctrl.Result{}, err
3739
}
3840
if c.Datastore.inferencePool == nil || !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) {
@@ -49,7 +51,7 @@ func (c *InferencePoolReconciler) updateDatastore(serverPool *v1alpha1.Inference
4951
pool, _ := c.Datastore.getInferencePool()
5052
if pool == nil ||
5153
serverPool.ObjectMeta.ResourceVersion != pool.ObjectMeta.ResourceVersion {
52-
klog.Infof("Updating inference pool to %v/%v", serverPool.ObjectMeta.Namespace, serverPool.ObjectMeta.Name)
54+
klog.V(logutil.DEFAULT).InfoS("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta))
5355
c.Datastore.setInferencePool(serverPool)
5456
}
5557
}

pkg/ext-proc/backend/provider.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
6363
p.refreshPodsOnce()
6464

6565
if err := p.refreshMetricsOnce(); err != nil {
66-
klog.Errorf("Failed to init metrics: %v", err)
66+
klog.ErrorS(err, "Failed to init metrics")
6767
}
6868

69-
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
69+
klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics())
7070

7171
// periodically refresh pods
7272
go func() {
@@ -81,7 +81,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
8181
for {
8282
time.Sleep(refreshMetricsInterval)
8383
if err := p.refreshMetricsOnce(); err != nil {
84-
klog.V(logutil.TRACE).Infof("Failed to refresh metrics: %v", err)
84+
klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics")
8585
}
8686
}
8787
}()
@@ -95,11 +95,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
9595
}()
9696

9797
// Periodically print out the pods and metrics for DEBUGGING.
98-
if klog.V(logutil.DEBUG).Enabled() {
98+
if klogV := klog.V(logutil.DEBUG); klogV.Enabled() {
9999
go func() {
100100
for {
101101
time.Sleep(5 * time.Second)
102-
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
102+
klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics())
103103
}
104104
}()
105105
}
@@ -138,18 +138,19 @@ func (p *Provider) refreshPodsOnce() {
138138
}
139139

140140
func (p *Provider) refreshMetricsOnce() error {
141+
klogV := klog.V(logutil.TRACE)
141142
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
142143
defer cancel()
143144
start := time.Now()
144145
defer func() {
145146
d := time.Since(start)
146147
// TODO: add a metric instead of logging
147-
klog.V(logutil.TRACE).Infof("Refreshed metrics in %v", d)
148+
klogV.InfoS("Metrics refreshed", "duration", d)
148149
}()
149150
var wg sync.WaitGroup
150151
errCh := make(chan error)
151152
processOnePod := func(key, value any) bool {
152-
klog.V(logutil.TRACE).Infof("Processing pod %v and metric %v", key, value)
153+
klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value)
153154
pod := key.(Pod)
154155
existing := value.(*PodMetrics)
155156
wg.Add(1)
@@ -161,7 +162,7 @@ func (p *Provider) refreshMetricsOnce() error {
161162
return
162163
}
163164
p.UpdatePodMetrics(pod, updated)
164-
klog.V(logutil.TRACE).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
165+
klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics)
165166
}()
166167
return true
167168
}
@@ -185,7 +186,7 @@ func (p *Provider) refreshMetricsOnce() error {
185186
}
186187

187188
func (p *Provider) flushPrometheusMetricsOnce() {
188-
klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")
189+
klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics")
189190

190191
pool, _ := p.datastore.getInferencePool()
191192
if pool == nil {

pkg/ext-proc/backend/vllm/metrics.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ const (
3232
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
3333
)
3434

35-
type PodMetricsClientImpl struct {
36-
}
35+
type PodMetricsClientImpl struct{}
3736

3837
// FetchMetrics fetches metrics from a given pod.
3938
func (p *PodMetricsClientImpl) FetchMetrics(
@@ -46,19 +45,20 @@ func (p *PodMetricsClientImpl) FetchMetrics(
4645
url := fmt.Sprintf("http://%s/metrics", pod.Address)
4746
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
4847
if err != nil {
48+
klog.V(logutil.DEFAULT).ErrorS(err, "Failed create HTTP request", "method", http.MethodGet, "url", url)
4949
return nil, fmt.Errorf("failed to create request: %v", err)
5050
}
5151
resp, err := http.DefaultClient.Do(req)
5252
if err != nil {
53-
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
53+
klog.V(logutil.DEFAULT).ErrorS(err, "Failed to fetch metrics", "pod", pod)
5454
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
5555
}
5656
defer func() {
5757
_ = resp.Body.Close()
5858
}()
5959

6060
if resp.StatusCode != http.StatusOK {
61-
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
61+
klog.V(logutil.DEFAULT).ErrorS(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode)
6262
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
6363
}
6464

@@ -138,7 +138,7 @@ func promToPodMetrics(
138138
func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) {
139139
loraRequests, ok := metricFamilies[LoraRequestInfoMetricName]
140140
if !ok {
141-
klog.Warningf("metric family %q not found", LoraRequestInfoMetricName)
141+
klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
142142
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
143143
}
144144
var latestTs float64
@@ -157,7 +157,7 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr
157157
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) {
158158
mf, ok := metricFamilies[metricName]
159159
if !ok {
160-
klog.Warningf("metric family %q not found", metricName)
160+
klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", metricName)
161161
return nil, fmt.Errorf("metric family %q not found", metricName)
162162
}
163163
if len(mf.GetMetric()) == 0 {
@@ -171,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
171171
latest = m
172172
}
173173
}
174-
klog.V(logutil.TRACE).Infof("Got metric value %+v for metric %v", latest, metricName)
174+
klog.V(logutil.TRACE).InfoS("Metric value selected", "value", latest, "metric", metricName)
175175
return latest, nil
176176
}

0 commit comments

Comments
 (0)