diff --git a/docs/dev.md b/docs/dev.md index efd2023a0..2af396688 100644 --- a/docs/dev.md +++ b/docs/dev.md @@ -1,27 +1,33 @@ - ## Logging +We use `logr.Logger` interface for logging everywhere. +The logger instance is loaded from `context.Context` or passed around as an argument directly. +This is aligned with contextual logging as explained in [k8s instrumentation logging guidelines](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md). + +In other words, we explicitly don't use `klog` global logging calls. +Using `klog` log value helpers like `klog.KObj` is just fine. + ### Change log verbosity -We use the `k8s.io/klog/v2` package to manage logging. We generally follow the [k8s instrumentation logging guidelines](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md), which states "the practical default level is V(2). Developers and QE environments may wish to run at V(3) or V(4)". -To configure logging verbosity, specify the `v` flag such as `--v=2`. +To configure logging verbosity, specify the `v` flag such as `--v=2`. ### Add logs The [k8s instrumentation logging guidelines](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md) has the following definitions: -* `klog.V(0).InfoS` = `klog.InfoS` - Generally useful for this to **always** be visible to a cluster operator -* `klog.V(1).InfoS` - A reasonable default log level if you don't want verbosity. -* `klog.V(2).InfoS` - Useful steady state information about the service and important log messages that may correlate to significant changes in the system. This is the recommended default log level for most systems. -* `klog.V(3).InfoS` - Extended information about changes -* `klog.V(4).InfoS` - Debug level verbosity -* `klog.V(5).InfoS` - Trace level verbosity +- `logger.V(0).Info` = `logger.Info` - Generally useful for this to **always** be visible to a cluster operator +- `logger.V(1).Info` - A reasonable default log level if you don't want verbosity. +- `logger.V(2).Info` - Useful steady state information about the service and important log messages that may correlate to significant changes in the system. This is the recommended default log level for most systems. +- `logger.V(3).Info` - Extended information about changes +- `logger.V(4).Info` - Debug level verbosity +- `logger.V(5).Info` - Trace level verbosity We choose to simplify to the following 3 common levels. + ``` const( DEFAULT=2 @@ -33,34 +39,46 @@ const( The guidelines are written in the context of a k8s controller. Our [ext-proc](../pkg/ext-proc/) does more things such as handling requests and scraping metrics, therefore we adapt the guidelines as follows: -1. The server startup process and configuration. - * `klog.InfoS` Logging at the `V(0)` verbosity level is generally welcome here as this is only logged once at startup, and provides useful info for debugging. +1. The server startup process and configuration. + + - `logger.Info` Logging at the `V(0)` verbosity level is generally welcome here as this is only logged once at startup, and provides useful info for debugging. 2. Reconciler loops. The reconciler loops watch for CR changes such as the `InferenceModel` CR. And given changes in these CRs significantly affect the behavior of the extension, we recommend using v=1 verbosity level as default, and sparsely use higher verbosity levels. - - * `klog.V(DEFAULT).InfoS` - * Default log level in the reconcilers. - * Information about config (listening on X, watching Y) - * Errors that repeat frequently that relate to conditions that can be corrected (e.g., inference model not initialized yet) - * System state changing (adding/removing objects in the data store) - * `V(VERBOSE)` and above: Use your best judgement. + + - `logger.V(DEFAULT)` + - Default log level in the reconcilers. + - Information about config (listening on X, watching Y) + - Errors that repeat frequently that relate to conditions that can be corrected (e.g., inference model not initialized yet) + - System state changing (adding/removing objects in the data store) + - `logger.V(VERBOSE)` and above: Use your best judgement. 3. Inference request handling. These requests are expected to be much higher volume than the control flow in the reconcilers and therefore we should be mindful of log spamming. We recommend using v=2 to log important info about a request, such as the HTTP response code, and higher verbosity levels for less important info. - * `klog.V(DEFAULT).InfoS` - * Logging the status code of an HTTP request - * Important decision making such as picking the target model, target pod - * `klog.V(VERBOSE).InfoS` - * Detailed request scheduling algorithm operations, such as running the filtering logic - * `V(DEBUG)` and above: Use your best judgement. + - `logger.V(DEFAULT)` + - Logging the status code of an HTTP request + - Important decision making such as picking the target model, target pod + - `logger.V(VERBOSE)` + - Detailed request scheduling algorithm operations, such as running the filtering logic + - `logger.V(DEBUG)` and above: Use your best judgement. 4. Metric scraping loops. These loops run at a very high frequency, and logs can be very spammy if not handled properly. - * `klog.V(TRACE).InfoS` - * Transient errors/warnings, such as failure to get response from a pod. - * Important state changes, such as updating a metric. -5. Misc + - `logger.V(TRACE)` + - Transient errors/warnings, such as failure to get response from a pod. + - Important state changes, such as updating a metric. + +5. Misc 1. Periodic (every 5s) debug loop which prints the current pods and metrics. - * `klog.WarningS` If the metrics are not fresh enough, which indicates an error occurred during the metric scraping loop. - * `klog.V(DEBUG).InfoS` - * This is very important to debug the request scheduling algorithm, and yet not spammy compared to the metric scraping loop logs. \ No newline at end of file + - `logger.V(DEFAULT).Error` If the metrics are not fresh enough, which indicates an error occurred during the metric scraping loop. + - `logger.V(DEBUG)` + - This is very important to debug the request scheduling algorithm, and yet not spammy compared to the metric scraping loop logs. + +### Passing Logger Around + +You can pass around a `context.Context` that contains a logger or a `logr.Logger` instance directly. +You need to make the call which one to use. Passing a `context.Context` is more standard, +on the other hand you then need to call `log.FromContext` everywhere. + +As `logger.V` calls are cummulative, i.e. `logger.V(2).V(3)` results in `logger.V(5)`, +a logger should be passed around with no verbosity level set so that `logger.V(DEFAULT)` +actually uses `DEFAULT` verbosity level. diff --git a/go.mod b/go.mod index d8b143ec8..a59a28cc5 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/bojand/ghz v0.120.0 github.com/elastic/crd-ref-docs v0.1.0 github.com/envoyproxy/go-control-plane/envoy v1.32.4 + github.com/go-logr/logr v1.4.2 github.com/google/go-cmp v0.6.0 github.com/jhump/protoreflect v1.17.0 github.com/onsi/ginkgo/v2 v2.22.2 @@ -61,7 +62,6 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index a54833bc1..a75e7e433 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -7,10 +7,11 @@ import ( "strconv" "sync" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -87,7 +88,7 @@ func (ds *K8sDatastore) HasSynced() bool { return ds.inferencePool != nil } -func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string { +func RandomWeightedDraw(logger logr.Logger, model *v1alpha1.InferenceModel, seed int64) string { var weights int32 source := rand.NewSource(rand.Int63()) @@ -98,7 +99,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string { for _, model := range model.Spec.TargetModels { weights += *model.Weight } - klog.V(logutil.VERBOSE).InfoS("Weights for model computed", "model", model.Name, "weights", weights) + logger.V(logutil.TRACE).Info("Weights for model computed", "model", model.Name, "weights", weights) randomVal := r.Int31n(weights) for _, model := range model.Spec.TargetModels { if randomVal < *model.Weight { @@ -128,7 +129,7 @@ func (ds *K8sDatastore) flushPodsAndRefetch(ctx context.Context, ctrlClient clie LabelSelector: selectorFromInferencePoolSelector(newServerPool.Spec.Selector), Namespace: newServerPool.Namespace, }); err != nil { - klog.Error(err, "error listing clients") + log.FromContext(ctx).V(logutil.DEFAULT).Error(err, "Failed to list clients") } ds.pods.Clear() @@ -139,7 +140,6 @@ func (ds *K8sDatastore) flushPodsAndRefetch(ctx context.Context, ctrlClient clie } ds.pods.Store(pod, true) } - } func selectorFromInferencePoolSelector(selector map[v1alpha1.LabelKey]v1alpha1.LabelValue) labels.Selector { diff --git a/pkg/ext-proc/backend/datastore_test.go b/pkg/ext-proc/backend/datastore_test.go index 0fc5da1a6..9f74226a8 100644 --- a/pkg/ext-proc/backend/datastore_test.go +++ b/pkg/ext-proc/backend/datastore_test.go @@ -5,6 +5,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) func TestHasSynced(t *testing.T) { @@ -46,6 +47,7 @@ func TestHasSynced(t *testing.T) { } func TestRandomWeightedDraw(t *testing.T) { + logger := logutil.NewTestLogger() tests := []struct { name string model *v1alpha1.InferenceModel @@ -118,7 +120,7 @@ func TestRandomWeightedDraw(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { for range 10000 { - model := RandomWeightedDraw(test.model, seedVal) + model := RandomWeightedDraw(logger, test.model, seedVal) if model != test.want { t.Errorf("Model returned!: %v", model) break diff --git a/pkg/ext-proc/backend/fake.go b/pkg/ext-proc/backend/fake.go index 7ab8a4640..2c0757dbe 100644 --- a/pkg/ext-proc/backend/fake.go +++ b/pkg/ext-proc/backend/fake.go @@ -3,7 +3,7 @@ package backend import ( "context" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -17,7 +17,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi if err, ok := f.Err[pod]; ok { return nil, err } - klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod]) + log.FromContext(ctx).V(logutil.VERBOSE).Info("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod]) return f.Res[pod], nil } diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler.go b/pkg/ext-proc/backend/inferencemodel_reconciler.go index 72ea063e9..4959845cc 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler.go +++ b/pkg/ext-proc/backend/inferencemodel_reconciler.go @@ -3,13 +3,14 @@ package backend import ( "context" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -27,38 +28,39 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } - klogV := klog.V(logutil.DEFAULT) - klogV.InfoS("Reconciling InferenceModel", "name", req.NamespacedName) + logger := log.FromContext(ctx) + loggerDefault := logger.V(logutil.DEFAULT) + loggerDefault.Info("Reconciling InferenceModel", "name", req.NamespacedName) infModel := &v1alpha1.InferenceModel{} if err := c.Get(ctx, req.NamespacedName, infModel); err != nil { if errors.IsNotFound(err) { - klogV.InfoS("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName) + loggerDefault.Info("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName) c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) return ctrl.Result{}, nil } - klogV.ErrorS(err, "Unable to get InferenceModel", "name", req.NamespacedName) + loggerDefault.Error(err, "Unable to get InferenceModel", "name", req.NamespacedName) return ctrl.Result{}, err } else if !infModel.DeletionTimestamp.IsZero() { - klogV.InfoS("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName) + loggerDefault.Info("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName) c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) return ctrl.Result{}, nil } - c.updateDatastore(infModel) + c.updateDatastore(logger, infModel) return ctrl.Result{}, nil } -func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) { - klogV := klog.V(logutil.DEFAULT) +func (c *InferenceModelReconciler) updateDatastore(logger logr.Logger, infModel *v1alpha1.InferenceModel) { + loggerDefault := logger.V(logutil.DEFAULT) if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name { - klogV.InfoS("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName) - klogV.InfoS("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName) + loggerDefault.Info("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName) + loggerDefault.Info("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName) c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel) return } - klogV.InfoS("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName) + loggerDefault.Info("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName) // If we get here. The model is not relevant to this pool, remove. c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) } diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go b/pkg/ext-proc/backend/inferencemodel_reconciler_test.go index c5ef8d147..4e1958181 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go +++ b/pkg/ext-proc/backend/inferencemodel_reconciler_test.go @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) var ( @@ -46,6 +47,8 @@ var ( ) func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) { + logger := logutil.NewTestLogger() + tests := []struct { name string datastore *K8sDatastore @@ -135,7 +138,7 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) { Datastore: test.datastore, PoolNamespacedName: types.NamespacedName{Name: test.datastore.inferencePool.Name}, } - reconciler.updateDatastore(test.incomingService) + reconciler.updateDatastore(logger, test.incomingService) if ok := mapsEqual(reconciler.Datastore.InferenceModels, test.wantInferenceModels); !ok { t.Error("Maps are not equal") diff --git a/pkg/ext-proc/backend/inferencepool_reconciler.go b/pkg/ext-proc/backend/inferencepool_reconciler.go index 9504b4e0f..e44a278ae 100644 --- a/pkg/ext-proc/backend/inferencepool_reconciler.go +++ b/pkg/ext-proc/backend/inferencepool_reconciler.go @@ -4,12 +4,14 @@ import ( "context" "reflect" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -29,29 +31,31 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques if req.NamespacedName.Name != c.PoolNamespacedName.Name || req.NamespacedName.Namespace != c.PoolNamespacedName.Namespace { return ctrl.Result{}, nil } - klogV := klog.V(logutil.DEFAULT) - klogV.InfoS("Reconciling InferencePool", "name", req.NamespacedName) + + logger := log.FromContext(ctx) + loggerDefault := logger.V(logutil.DEFAULT) + loggerDefault.Info("Reconciling InferencePool", "name", req.NamespacedName) serverPool := &v1alpha1.InferencePool{} if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil { - klogV.ErrorS(err, "Unable to get InferencePool", "name", req.NamespacedName) + loggerDefault.Error(err, "Unable to get InferencePool", "name", req.NamespacedName) return ctrl.Result{}, err } if c.Datastore.inferencePool == nil || !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) { - c.updateDatastore(serverPool) + c.updateDatastore(logger, serverPool) c.Datastore.flushPodsAndRefetch(ctx, c.Client, serverPool) } else { - c.updateDatastore(serverPool) + c.updateDatastore(logger, serverPool) } return ctrl.Result{}, nil } -func (c *InferencePoolReconciler) updateDatastore(serverPool *v1alpha1.InferencePool) { +func (c *InferencePoolReconciler) updateDatastore(logger logr.Logger, serverPool *v1alpha1.InferencePool) { pool, _ := c.Datastore.getInferencePool() if pool == nil || serverPool.ObjectMeta.ResourceVersion != pool.ObjectMeta.ResourceVersion { - klog.V(logutil.DEFAULT).InfoS("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta)) + logger.V(logutil.DEFAULT).Info("Updating inference pool", "target", klog.KMetadata(&serverPool.ObjectMeta)) c.Datastore.setInferencePool(serverPool) } } diff --git a/pkg/ext-proc/backend/inferencepool_reconciler_test.go b/pkg/ext-proc/backend/inferencepool_reconciler_test.go index f16524a54..1da7d61b0 100644 --- a/pkg/ext-proc/backend/inferencepool_reconciler_test.go +++ b/pkg/ext-proc/backend/inferencepool_reconciler_test.go @@ -6,6 +6,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) var ( @@ -41,6 +42,8 @@ var ( ) func TestUpdateDatastore_InferencePoolReconciler(t *testing.T) { + logger := logutil.NewTestLogger() + tests := []struct { name string datastore *K8sDatastore @@ -74,7 +77,7 @@ func TestUpdateDatastore_InferencePoolReconciler(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { inferencePoolReconciler := &InferencePoolReconciler{Datastore: test.datastore} - inferencePoolReconciler.updateDatastore(test.incomingPool) + inferencePoolReconciler.updateDatastore(logger, test.incomingPool) gotPool := inferencePoolReconciler.Datastore.inferencePool if !reflect.DeepEqual(gotPool, test.wantPool) { diff --git a/pkg/ext-proc/backend/pod_reconciler.go b/pkg/ext-proc/backend/pod_reconciler.go index 60d014ce6..b914ea8d2 100644 --- a/pkg/ext-proc/backend/pod_reconciler.go +++ b/pkg/ext-proc/backend/pod_reconciler.go @@ -8,9 +8,9 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -23,24 +23,25 @@ type PodReconciler struct { } func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) inferencePool, err := c.Datastore.getInferencePool() if err != nil { - klog.V(logutil.DEFAULT).Infof("Skipping reconciling Pod because the InferencePool is not available yet: %v", err) + logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet", "error", err) // When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue. return ctrl.Result{}, nil } else if inferencePool.Namespace != req.Namespace { return ctrl.Result{}, nil } - klog.V(logutil.VERBOSE).Info("reconciling Pod", req.NamespacedName) + logger.V(logutil.VERBOSE).Info("Pod being reconciled", "name", req.NamespacedName) pod := &corev1.Pod{} if err := c.Get(ctx, req.NamespacedName, pod); err != nil { - klog.Error(err, ": unable to get pod") if apierrors.IsNotFound(err) { c.Datastore.pods.Delete(pod) return ctrl.Result{}, nil } + logger.V(logutil.DEFAULT).Error(err, "Unable to get pod", "name", req.NamespacedName) return ctrl.Result{}, err } diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index d64b80b30..ce7389864 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -6,8 +6,8 @@ import ( "sync" "time" + "github.com/go-logr/logr" "go.uber.org/multierr" - klog "k8s.io/klog/v2" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -59,14 +59,14 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { return nil, false } -func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { +func (p *Provider) Init(logger logr.Logger, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { p.refreshPodsOnce() - if err := p.refreshMetricsOnce(); err != nil { - klog.ErrorS(err, "Failed to init metrics") + if err := p.refreshMetricsOnce(logger); err != nil { + logger.Error(err, "Failed to init metrics") } - klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics()) + logger.Info("Initialized pods and metrics", "metrics", p.AllPodMetrics()) // periodically refresh pods go func() { @@ -80,8 +80,8 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm go func() { for { time.Sleep(refreshMetricsInterval) - if err := p.refreshMetricsOnce(); err != nil { - klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics") + if err := p.refreshMetricsOnce(logger); err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to refresh metrics") } } }() @@ -90,16 +90,16 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm go func() { for { time.Sleep(refreshPrometheusMetricsInterval) - p.flushPrometheusMetricsOnce() + p.flushPrometheusMetricsOnce(logger) } }() // Periodically print out the pods and metrics for DEBUGGING. - if klogV := klog.V(logutil.DEBUG); klogV.Enabled() { + if logger := logger.V(logutil.DEBUG); logger.Enabled() { go func() { for { time.Sleep(5 * time.Second) - klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics()) + logger.Info("Current Pods and metrics gathered", "metrics", p.AllPodMetrics()) } }() } @@ -137,20 +137,20 @@ func (p *Provider) refreshPodsOnce() { p.datastore.pods.Range(addNewPods) } -func (p *Provider) refreshMetricsOnce() error { - klogV := klog.V(logutil.TRACE) +func (p *Provider) refreshMetricsOnce(logger logr.Logger) error { + loggerTrace := logger.V(logutil.TRACE) ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() start := time.Now() defer func() { d := time.Since(start) // TODO: add a metric instead of logging - klogV.InfoS("Metrics refreshed", "duration", d) + loggerTrace.Info("Metrics refreshed", "duration", d) }() var wg sync.WaitGroup errCh := make(chan error) processOnePod := func(key, value any) bool { - klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value) + loggerTrace.Info("Pod and metric being processed", "pod", key, "metric", value) pod := key.(Pod) existing := value.(*PodMetrics) wg.Add(1) @@ -162,7 +162,7 @@ func (p *Provider) refreshMetricsOnce() error { return } p.UpdatePodMetrics(pod, updated) - klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics) + loggerTrace.Info("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics) }() return true } @@ -185,8 +185,8 @@ func (p *Provider) refreshMetricsOnce() error { return errs } -func (p *Provider) flushPrometheusMetricsOnce() { - klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics") +func (p *Provider) flushPrometheusMetricsOnce(logger logr.Logger) { + logger.V(logutil.DEBUG).Info("Flushing Prometheus Metrics") pool, _ := p.datastore.getInferencePool() if pool == nil { diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index ddd7f0d66..955750463 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -8,6 +8,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) var ( @@ -38,6 +39,8 @@ var ( ) func TestProvider(t *testing.T) { + logger := logutil.NewTestLogger() + tests := []struct { name string pmc PodMetricsClient @@ -90,7 +93,7 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := NewProvider(test.pmc, test.datastore) - err := p.Init(time.Millisecond, time.Millisecond, time.Millisecond) + err := p.Init(logger, time.Millisecond, time.Millisecond, time.Millisecond) if test.initErr != (err != nil) { t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr) } diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index 4c3804ce5..4558a6642 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -9,10 +9,11 @@ import ( "strings" "time" + "github.com/go-logr/logr" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "go.uber.org/multierr" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -40,17 +41,20 @@ func (p *PodMetricsClientImpl) FetchMetrics( pod backend.Pod, existing *backend.PodMetrics, ) (*backend.PodMetrics, error) { + logger := log.FromContext(ctx) + loggerDefault := logger.V(logutil.DEFAULT) + // Currently the metrics endpoint is hard-coded, which works with vLLM. // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. url := fmt.Sprintf("http://%s/metrics", pod.Address) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { - klog.V(logutil.DEFAULT).ErrorS(err, "Failed create HTTP request", "method", http.MethodGet, "url", url) + loggerDefault.Error(err, "Failed create HTTP request", "method", http.MethodGet, "url", url) return nil, fmt.Errorf("failed to create request: %v", err) } resp, err := http.DefaultClient.Do(req) if err != nil { - klog.V(logutil.DEFAULT).ErrorS(err, "Failed to fetch metrics", "pod", pod) + loggerDefault.Error(err, "Failed to fetch metrics", "pod", pod) return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err) } defer func() { @@ -58,7 +62,7 @@ func (p *PodMetricsClientImpl) FetchMetrics( }() if resp.StatusCode != http.StatusOK { - klog.V(logutil.DEFAULT).ErrorS(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode) + loggerDefault.Error(nil, "Unexpected status code returned", "pod", pod, "statusCode", resp.StatusCode) return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) } @@ -67,35 +71,36 @@ func (p *PodMetricsClientImpl) FetchMetrics( if err != nil { return nil, err } - return promToPodMetrics(metricFamilies, existing) + return promToPodMetrics(logger, metricFamilies, existing) } // promToPodMetrics updates internal pod metrics with scraped prometheus metrics. // A combined error is returned if errors occur in one or more metric processing. // it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map. func promToPodMetrics( + logger logr.Logger, metricFamilies map[string]*dto.MetricFamily, existing *backend.PodMetrics, ) (*backend.PodMetrics, error) { var errs error updated := existing.Clone() - runningQueueSize, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName) + runningQueueSize, err := getLatestMetric(logger, metricFamilies, RunningQueueSizeMetricName) errs = multierr.Append(errs, err) if err == nil { updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue()) } - waitingQueueSize, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName) + waitingQueueSize, err := getLatestMetric(logger, metricFamilies, WaitingQueueSizeMetricName) errs = multierr.Append(errs, err) if err == nil { updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue()) } - cachePercent, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName) + cachePercent, err := getLatestMetric(logger, metricFamilies, KVCacheUsagePercentMetricName) errs = multierr.Append(errs, err) if err == nil { updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue() } - loraMetrics, _, err := getLatestLoraMetric(metricFamilies) + loraMetrics, _, err := getLatestLoraMetric(logger, metricFamilies) errs = multierr.Append(errs, err) /* TODO: uncomment once this is available in vllm. kvCap, _, err := getGaugeLatestValue(metricFamilies, KvCacheMaxTokenCapacityMetricName) @@ -135,10 +140,10 @@ func promToPodMetrics( // reason its specially fetched is because each label key value pair permutation generates new series // and only most recent is useful. The value of each series is the creation timestamp so we can // retrieve the latest by sorting the value. -func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) { +func getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) { loraRequests, ok := metricFamilies[LoraRequestInfoMetricName] if !ok { - klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", LoraRequestInfoMetricName) + logger.V(logutil.DEFAULT).Error(nil, "Metric family not found", "name", LoraRequestInfoMetricName) return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName) } var latestTs float64 @@ -154,10 +159,10 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr // getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric. // Since vllm doesn't set the timestamp in metric, this metric essentially gets the first metric. -func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) { +func getLatestMetric(logger logr.Logger, metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) { mf, ok := metricFamilies[metricName] if !ok { - klog.V(logutil.DEFAULT).ErrorS(nil, "Metric family not found", "name", metricName) + logger.V(logutil.DEFAULT).Error(nil, "Metric family not found", "name", metricName) return nil, fmt.Errorf("metric family %q not found", metricName) } if len(mf.GetMetric()) == 0 { @@ -171,6 +176,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str latest = m } } - klog.V(logutil.TRACE).InfoS("Metric value selected", "value", latest, "metric", metricName) + logger.V(logutil.TRACE).Info("Metric value selected", "value", latest, "metric", metricName) return latest, nil } diff --git a/pkg/ext-proc/backend/vllm/metrics_test.go b/pkg/ext-proc/backend/vllm/metrics_test.go index 3d4225e83..0a718cd79 100644 --- a/pkg/ext-proc/backend/vllm/metrics_test.go +++ b/pkg/ext-proc/backend/vllm/metrics_test.go @@ -8,9 +8,12 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) func TestPromToPodMetrics(t *testing.T) { + logger := logutil.NewTestLogger() + testCases := []struct { name string metricFamilies map[string]*dto.MetricFamily @@ -219,7 +222,7 @@ func TestPromToPodMetrics(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - updated, err := promToPodMetrics(tc.metricFamilies, tc.initialPodMetrics) + updated, err := promToPodMetrics(logger, tc.metricFamilies, tc.initialPodMetrics) if tc.expectedErr != nil { assert.Error(t, err) } else { diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index a36f7ae3b..8ce2956f8 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -1,6 +1,7 @@ package handlers import ( + "context" "encoding/json" "errors" "fmt" @@ -9,7 +10,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/protobuf/types/known/structpb" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" @@ -18,25 +19,30 @@ import ( // HandleRequestBody handles body of the request to the backend server, such as parsing the "model" // parameter. // Envoy sends the request body to ext proc before sending the request to the backend server. -func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klogV := klog.V(logutil.VERBOSE) - klogV.InfoS("Handling request body") +func (s *Server) HandleRequestBody( + ctx context.Context, + reqCtx *RequestContext, + req *extProcPb.ProcessingRequest, +) (*extProcPb.ProcessingResponse, error) { + logger := log.FromContext(ctx) + loggerVerbose := logger.V(logutil.VERBOSE) + loggerVerbose.Info("Handling request body") // Unmarshal request body (must be JSON). v := req.Request.(*extProcPb.ProcessingRequest_RequestBody) var rb map[string]interface{} if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil { - klog.V(logutil.DEFAULT).ErrorS(err, "Error unmarshaling request body") + logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body") return nil, fmt.Errorf("error unmarshaling request body: %v", err) } - klogV.InfoS("Request body unmarshalled", "body", rb) + loggerVerbose.Info("Request body unmarshalled", "body", rb) // Resolve target models. model, ok := rb["model"].(string) if !ok { return nil, errors.New("model not found in request") } - klogV.InfoS("Model requested", "model", model) + loggerVerbose.Info("Model requested", "model", model) modelName := model // NOTE: The nil checking for the modelObject means that we DO allow passthrough currently. @@ -47,7 +53,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model) } if len(modelObj.Spec.TargetModels) > 0 { - modelName = backend.RandomWeightedDraw(modelObj, 0) + modelName = backend.RandomWeightedDraw(logger, modelObj, 0) if modelName == "" { return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name) } @@ -57,7 +63,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces ResolvedTargetModel: modelName, Critical: backend.IsCritical(modelObj), } - klogV.InfoS("LLM request assembled", "request", llmReq) + loggerVerbose.Info("LLM request assembled", "request", llmReq) requestBody := v.RequestBody.Body var err error @@ -66,17 +72,18 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces rb["model"] = llmReq.ResolvedTargetModel requestBody, err = json.Marshal(rb) if err != nil { - klog.V(logutil.DEFAULT).ErrorS(err, "Error marshaling request body") + logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body") return nil, fmt.Errorf("error marshaling request body: %v", err) } - klogV.InfoS("Updated request body marshalled", "body", string(requestBody)) + loggerVerbose.Info("Updated request body marshalled", "body", string(requestBody)) } - targetPod, err := s.scheduler.Schedule(llmReq) + targetPod, err := s.scheduler.Schedule(ctx, llmReq) if err != nil { return nil, fmt.Errorf("failed to find target pod: %w", err) } - klogV.InfoS("Target model and pod selected", "model", llmReq.ResolvedTargetModel, "pod", targetPod) + logger.V(logutil.DEFAULT).Info("Request handled", + "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod) reqCtx.Model = llmReq.Model reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel @@ -102,7 +109,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } // Print headers for debugging for _, header := range headers { - klog.V(logutil.DEBUG).InfoS("Request body header", "key", header.Header.Key, "value", header.Header.RawValue) + logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue) } resp := &extProcPb.ProcessingResponse{ @@ -136,10 +143,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces return resp, nil } -func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse { +func HandleRequestHeaders( + ctx context.Context, + reqCtx *RequestContext, + req *extProcPb.ProcessingRequest, +) *extProcPb.ProcessingResponse { r := req.Request h := r.(*extProcPb.ProcessingRequest_RequestHeaders) - klog.V(logutil.VERBOSE).InfoS("Handling request headers", "headers", h) + log.FromContext(ctx).V(logutil.VERBOSE).Info("Handling request headers", "headers", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestHeaders{ diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index 012b0b8d6..06da8106e 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -1,20 +1,26 @@ package handlers import ( + "context" "encoding/json" "fmt" configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) // HandleResponseHeaders processes response headers from the backend model server. -func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(logutil.VERBOSE).InfoS("Processing ResponseHeaders") +func (s *Server) HandleResponseHeaders( + ctx context.Context, + reqCtx *RequestContext, + req *extProcPb.ProcessingRequest, +) (*extProcPb.ProcessingResponse, error) { + loggerVerbose := log.FromContext(ctx).V(logutil.VERBOSE) + loggerVerbose.Info("Processing ResponseHeaders") h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders) - klog.V(logutil.VERBOSE).InfoS("Headers before", "headers", h) + loggerVerbose.Info("Headers before", "headers", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ @@ -65,8 +71,14 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr "completion_tokens": 100 } }*/ -func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(logutil.VERBOSE).InfoS("Processing HandleResponseBody") +func (s *Server) HandleResponseBody( + ctx context.Context, + reqCtx *RequestContext, + req *extProcPb.ProcessingRequest, +) (*extProcPb.ProcessingResponse, error) { + logger := log.FromContext(ctx) + loggerVerbose := logger.V(logutil.VERBOSE) + loggerVerbose.Info("Processing HandleResponseBody") body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) res := Response{} @@ -81,7 +93,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) // will add the processing for streaming case. reqCtx.ResponseComplete = true - klog.V(logutil.VERBOSE).InfoS("Response generated", "response", res) + loggerVerbose.Info("Response generated", "response", res) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ diff --git a/pkg/ext-proc/handlers/response_test.go b/pkg/ext-proc/handlers/response_test.go index df3380662..67875e051 100644 --- a/pkg/ext-proc/handlers/response_test.go +++ b/pkg/ext-proc/handlers/response_test.go @@ -1,10 +1,12 @@ package handlers import ( + "context" "testing" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/google/go-cmp/cmp" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) const ( @@ -34,6 +36,8 @@ const ( ) func TestHandleResponseBody(t *testing.T) { + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + tests := []struct { name string req *extProcPb.ProcessingRequest_ResponseBody @@ -70,8 +74,7 @@ func TestHandleResponseBody(t *testing.T) { t.Run(test.name, func(t *testing.T) { server := &Server{} reqCtx := &RequestContext{} - _, err := server.HandleResponseBody(reqCtx, &extProcPb.ProcessingRequest{Request: test.req}) - + _, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req}) if err != nil { if !test.wantErr { t.Fatalf("HandleResponseBody returned unexpected error: %v, want %v", err, test.wantErr) diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index a3cfcada5..6be747dac 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -1,6 +1,8 @@ package handlers import ( + "context" + "errors" "io" "time" @@ -8,7 +10,7 @@ import ( envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" @@ -37,7 +39,7 @@ type Server struct { } type Scheduler interface { - Schedule(b *scheduling.LLMRequest) (targetPod backend.Pod, err error) + Schedule(ctx context.Context, b *scheduling.LLMRequest) (targetPod backend.Pod, err error) } // PodProvider is an interface to provide set of pods in the backend and information such as metrics. @@ -51,8 +53,11 @@ type ModelDataStore interface { } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { - klog.V(logutil.VERBOSE).InfoS("Processing") ctx := srv.Context() + logger := log.FromContext(ctx) + loggerVerbose := logger.V(logutil.VERBOSE) + loggerVerbose.Info("Processing") + // Create request context to share states during life time of an HTTP request. // See https://github.com/envoyproxy/envoy/issues/17540. reqCtx := &RequestContext{} @@ -65,13 +70,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } req, err := srv.Recv() - if err == io.EOF { + if err == io.EOF || errors.Is(err, context.Canceled) { return nil } if err != nil { // This error occurs very frequently, though it doesn't seem to have any impact. // TODO Figure out if we can remove this noise. - klog.V(logutil.VERBOSE).ErrorS(err, "Cannot receive stream request") + loggerVerbose.Error(err, "Cannot receive stream request") return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) } @@ -79,34 +84,34 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { switch v := req.Request.(type) { case *extProcPb.ProcessingRequest_RequestHeaders: reqCtx.RequestReceivedTimestamp = time.Now() - resp = HandleRequestHeaders(reqCtx, req) - klog.V(logutil.VERBOSE).InfoS("Request context after HandleRequestHeaders", "context", reqCtx) + resp = HandleRequestHeaders(ctx, reqCtx, req) + loggerVerbose.Info("Request context after HandleRequestHeaders", "context", reqCtx) case *extProcPb.ProcessingRequest_RequestBody: - resp, err = s.HandleRequestBody(reqCtx, req) + resp, err = s.HandleRequestBody(ctx, reqCtx, req) if err == nil { metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel) metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize) } - klog.V(logutil.VERBOSE).InfoS("Request context after HandleRequestBody", "context", reqCtx) + loggerVerbose.Info("Request context after HandleRequestBody", "context", reqCtx) case *extProcPb.ProcessingRequest_ResponseHeaders: - resp, err = s.HandleResponseHeaders(reqCtx, req) - klog.V(logutil.VERBOSE).InfoS("Request context after HandleResponseHeaders", "context", reqCtx) + resp, err = s.HandleResponseHeaders(ctx, reqCtx, req) + loggerVerbose.Info("Request context after HandleResponseHeaders", "context", reqCtx) case *extProcPb.ProcessingRequest_ResponseBody: - resp, err = s.HandleResponseBody(reqCtx, req) + resp, err = s.HandleResponseBody(ctx, reqCtx, req) if err == nil && reqCtx.ResponseComplete { reqCtx.ResponseCompleteTimestamp = time.Now() - metrics.RecordRequestLatencies(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp) + metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp) metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize) metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens) } - klog.V(logutil.VERBOSE).InfoS("Request context after HandleResponseBody", "context", reqCtx) + loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx) default: - klog.V(logutil.DEFAULT).ErrorS(nil, "Unknown Request type", "request", v) + logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v) return status.Error(codes.Unknown, "unknown request type") } if err != nil { - klog.V(logutil.DEFAULT).ErrorS(err, "Failed to process request", "request", req) + logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req) switch status.Code(err) { // This code can be returned by scheduler when there is no capacity for sheddable // requests. @@ -125,9 +130,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } - klog.V(logutil.VERBOSE).InfoS("Response generated", "response", resp) + loggerVerbose.Info("Response generated", "response", resp) if err := srv.Send(resp); err != nil { - klog.V(logutil.DEFAULT).ErrorS(err, "Send failed") + logger.V(logutil.DEFAULT).Error(err, "Send failed") return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } } diff --git a/pkg/ext-proc/health.go b/pkg/ext-proc/health.go index aabb150d9..8b684d39f 100644 --- a/pkg/ext-proc/health.go +++ b/pkg/ext-proc/health.go @@ -3,24 +3,25 @@ package main import ( "context" + "github.com/go-logr/logr" "google.golang.org/grpc/codes" healthPb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" - klog "k8s.io/klog/v2" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type healthServer struct { + logger logr.Logger datastore *backend.K8sDatastore } func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { if !s.datastore.HasSynced() { - klog.V(logutil.VERBOSE).InfoS("gRPC health check not serving", "service", in.Service) + s.logger.V(logutil.VERBOSE).Info("gRPC health check not serving", "service", in.Service) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil } - klog.V(logutil.VERBOSE).InfoS("gRPC health check serving", "service", in.Service) + s.logger.V(logutil.VERBOSE).Info("gRPC health check serving", "service", in.Service) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil } diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 8f4cd8e72..ba593d7da 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -8,6 +8,7 @@ import ( "os" "strconv" + "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus/promhttp" uberzap "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -18,7 +19,6 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/component-base/metrics/legacyregistry" - klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -79,7 +79,8 @@ var ( "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+ "then a self-signed certificate is used.") - scheme = runtime.NewScheme() + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") ) func init() { @@ -103,7 +104,7 @@ func run() error { // Validate flags if err := validateFlags(); err != nil { - klog.ErrorS(err, "Failed to validate flags") + setupLog.Error(err, "Failed to validate flags") return err } @@ -112,20 +113,20 @@ func run() error { flag.VisitAll(func(f *flag.Flag) { flags[f.Name] = f.Value }) - klog.InfoS("Flags processed", "flags", flags) + setupLog.Info("Flags processed", "flags", flags) datastore := backend.NewK8sDataStore() // Init runtime. cfg, err := ctrl.GetConfig() if err != nil { - klog.ErrorS(err, "Failed to get rest config") + setupLog.Error(err, "Failed to get rest config") return err } mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) if err != nil { - klog.ErrorS(err, "Failed to create controller manager", "config", cfg) + setupLog.Error(err, "Failed to create controller manager", "config", cfg) return err } @@ -143,18 +144,20 @@ func run() error { CertPath: *certPath, } if err := serverRunner.SetupWithManager(mgr); err != nil { - klog.ErrorS(err, "Failed to setup ext-proc server") + setupLog.Error(err, "Failed to setup ext-proc server") return err } // Register health server. - if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil { + if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil { return err } // Register ext-proc server. - if err := mgr.Add(serverRunner.AsRunnable(datastore, &vllm.PodMetricsClientImpl{})); err != nil { - klog.ErrorS(err, "Failed to register ext-proc server") + if err := mgr.Add(serverRunner.AsRunnable( + ctrl.Log.WithName("ext-proc"), datastore, &vllm.PodMetricsClientImpl{}, + )); err != nil { + setupLog.Error(err, "Failed to register ext-proc server") return err } @@ -164,12 +167,12 @@ func run() error { } // Start the manager. This blocks until a signal is received. - klog.InfoS("Controller manager starting") + setupLog.Info("Controller manager starting") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - klog.ErrorS(err, "Error starting controller manager") + setupLog.Error(err, "Error starting controller manager") return err } - klog.InfoS("Controller manager terminated") + setupLog.Info("Controller manager terminated") return nil } @@ -189,16 +192,18 @@ func initLogging(opts *zap.Options) { logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller())) ctrl.SetLogger(logger) - klog.SetLogger(logger) } // registerHealthServer adds the Health gRPC server as a Runnable to the given manager. -func registerHealthServer(mgr manager.Manager, ds *backend.K8sDatastore, port int) error { +func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds *backend.K8sDatastore, port int) error { srv := grpc.NewServer() - healthPb.RegisterHealthServer(srv, &healthServer{datastore: ds}) + healthPb.RegisterHealthServer(srv, &healthServer{ + logger: logger, + datastore: ds, + }) if err := mgr.Add( runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil { - klog.ErrorS(err, "Failed to register health server") + setupLog.Error(err, "Failed to register health server") return err } return nil @@ -226,7 +231,7 @@ func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) err Name: "metrics", Server: srv, }); err != nil { - klog.ErrorS(err, "Failed to register metrics HTTP handler") + setupLog.Error(err, "Failed to register metrics HTTP handler") return err } return nil @@ -239,19 +244,19 @@ func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Ha ) httpClient, err := rest.HTTPClientFor(cfg) if err != nil { - klog.ErrorS(err, "Failed to create http client for metrics auth") + setupLog.Error(err, "Failed to create http client for metrics auth") return nil, err } filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient) if err != nil { - klog.ErrorS(err, "Failed to create metrics filter for auth") + setupLog.Error(err, "Failed to create metrics filter for auth") return nil, err } - metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint) + metricsLogger := ctrl.Log.WithName("metrics").WithValues("path", defaultMetricsEndpoint) metricsAuthHandler, err := filter(metricsLogger, h) if err != nil { - klog.ErrorS(err, "Failed to create metrics auth handler") + setupLog.Error(err, "Failed to create metrics auth handler") return nil, err } return metricsAuthHandler, nil diff --git a/pkg/ext-proc/metrics/metrics.go b/pkg/ext-proc/metrics/metrics.go index 1412af6e7..e3226f471 100644 --- a/pkg/ext-proc/metrics/metrics.go +++ b/pkg/ext-proc/metrics/metrics.go @@ -1,12 +1,13 @@ package metrics import ( + "context" "sync" "time" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -144,9 +145,9 @@ func RecordRequestSizes(modelName, targetModelName string, reqSize int) { } // RecordRequestLatencies records duration of request. -func RecordRequestLatencies(modelName, targetModelName string, received time.Time, complete time.Time) bool { +func RecordRequestLatencies(ctx context.Context, modelName, targetModelName string, received time.Time, complete time.Time) bool { if !complete.After(received) { - klog.V(logutil.DEFAULT).ErrorS(nil, "Request latency values are invalid", + log.FromContext(ctx).V(logutil.DEFAULT).Error(nil, "Request latency values are invalid", "modelName", modelName, "targetModelName", targetModelName, "completeTime", complete, "receivedTime", received) return false } diff --git a/pkg/ext-proc/metrics/metrics_test.go b/pkg/ext-proc/metrics/metrics_test.go index 348f707e9..d24afdb11 100644 --- a/pkg/ext-proc/metrics/metrics_test.go +++ b/pkg/ext-proc/metrics/metrics_test.go @@ -1,22 +1,26 @@ package metrics import ( + "context" "os" "testing" "time" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) -const RequestTotalMetric = InferenceModelComponent + "_request_total" -const RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds" -const RequestSizesMetric = InferenceModelComponent + "_request_sizes" -const ResponseSizesMetric = InferenceModelComponent + "_response_sizes" -const InputTokensMetric = InferenceModelComponent + "_input_tokens" -const OutputTokensMetric = InferenceModelComponent + "_output_tokens" -const KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" -const QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" +const ( + RequestTotalMetric = InferenceModelComponent + "_request_total" + RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds" + RequestSizesMetric = InferenceModelComponent + "_request_sizes" + ResponseSizesMetric = InferenceModelComponent + "_response_sizes" + InputTokensMetric = InferenceModelComponent + "_input_tokens" + OutputTokensMetric = InferenceModelComponent + "_output_tokens" + KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" + QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" +) func TestRecordRequestCounterandSizes(t *testing.T) { type requests struct { @@ -83,12 +87,12 @@ func TestRecordRequestCounterandSizes(t *testing.T) { if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestSizes, RequestSizesMetric); err != nil { t.Error(err) } - }) } } func TestRecordRequestLatencies(t *testing.T) { + ctx := logutil.NewTestLoggerIntoContext(context.Background()) timeBaseline := time.Now() type requests struct { modelName string @@ -100,35 +104,36 @@ func TestRecordRequestLatencies(t *testing.T) { name string reqs []requests invalid bool - }{{ - name: "multiple requests", - reqs: []requests{ - { - modelName: "m10", - targetModelName: "t10", - receivedTime: timeBaseline, - completeTime: timeBaseline.Add(time.Millisecond * 10), - }, - { - modelName: "m10", - targetModelName: "t10", - receivedTime: timeBaseline, - completeTime: timeBaseline.Add(time.Millisecond * 1600), - }, - { - modelName: "m10", - targetModelName: "t11", - receivedTime: timeBaseline, - completeTime: timeBaseline.Add(time.Millisecond * 60), - }, - { - modelName: "m20", - targetModelName: "t20", - receivedTime: timeBaseline, - completeTime: timeBaseline.Add(time.Millisecond * 120), + }{ + { + name: "multiple requests", + reqs: []requests{ + { + modelName: "m10", + targetModelName: "t10", + receivedTime: timeBaseline, + completeTime: timeBaseline.Add(time.Millisecond * 10), + }, + { + modelName: "m10", + targetModelName: "t10", + receivedTime: timeBaseline, + completeTime: timeBaseline.Add(time.Millisecond * 1600), + }, + { + modelName: "m10", + targetModelName: "t11", + receivedTime: timeBaseline, + completeTime: timeBaseline.Add(time.Millisecond * 60), + }, + { + modelName: "m20", + targetModelName: "t20", + receivedTime: timeBaseline, + completeTime: timeBaseline.Add(time.Millisecond * 120), + }, }, }, - }, { name: "invalid elapsed time", reqs: []requests{ @@ -137,14 +142,16 @@ func TestRecordRequestLatencies(t *testing.T) { targetModelName: "t10", receivedTime: timeBaseline.Add(time.Millisecond * 10), completeTime: timeBaseline, - }}, + }, + }, invalid: true, - }} + }, + } Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, req := range scenario.reqs { - success := RecordRequestLatencies(req.modelName, req.targetModelName, req.receivedTime, req.completeTime) + success := RecordRequestLatencies(ctx, req.modelName, req.targetModelName, req.receivedTime, req.completeTime) if success == scenario.invalid { t.Errorf("got record success(%v), but the request expects invalid(%v)", success, scenario.invalid) } @@ -277,7 +284,6 @@ func TestInferencePoolMetrics(t *testing.T) { Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { - RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg) RecordInferencePoolAvgQueueSize(scenario.poolName, scenario.queueSizeAvg) diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index ac7a287ce..e028c59a4 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -4,14 +4,14 @@ import ( "errors" "math" - klog "k8s.io/klog/v2" + "github.com/go-logr/logr" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type Filter interface { Name() string - Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) + Filter(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) } // filter applies current filterFunc, and then recursively applies next filters depending success or @@ -41,10 +41,11 @@ func (f *filter) Name() string { return f.name } -func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - klog.V(logutil.VERBOSE).InfoS("Running a filter", "name", f.Name(), "request", req, "podCount", len(pods)) +func (f *filter) Filter(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + loggerTrace := logger.V(logutil.TRACE) + loggerTrace.Info("Running a filter", "name", f.Name(), "podCount", len(pods)) - filtered, err := f.filter(req, pods) + filtered, err := f.filter(logger, req, pods) next := f.nextOnSuccessOrFailure if err == nil && len(filtered) > 0 { @@ -55,9 +56,9 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend if f.nextOnSuccess != nil { next = f.nextOnSuccess } - klog.V(logutil.VERBOSE).InfoS("Filter succeeded", "filter", f.Name(), "next", next.Name(), "filteredPodCount", len(filtered)) + loggerTrace.Info("Filter succeeded", "filter", f.Name(), "next", next.Name(), "filteredPodCount", len(filtered)) // On success, pass the filtered result to the next filter. - return next.Filter(req, filtered) + return next.Filter(logger, req, filtered) } else { if f.nextOnFailure == nil && f.nextOnSuccessOrFailure == nil { // No succeeding filters to run, return. @@ -66,18 +67,18 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend if f.nextOnFailure != nil { next = f.nextOnFailure } - klog.V(logutil.VERBOSE).InfoS("Filter failed", "filter", f.Name(), "next", next.Name()) + loggerTrace.Info("Filter failed", "filter", f.Name(), "next", next.Name()) // On failure, pass the initial set of pods to the next filter. - return next.Filter(req, pods) + return next.Filter(logger, req, pods) } } // filterFunc filters a set of input pods to a subset. -type filterFunc func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) +type filterFunc func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) // toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc. func toFilterFunc(pp podPredicate) filterFunc { - return func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + return func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { filtered := []*backend.PodMetrics{} for _, pod := range pods { pass := pp(req, pod) @@ -99,7 +100,7 @@ func toFilterFunc(pp podPredicate) filterFunc { // the least one as it gives more choices for the next filter, which on aggregate gave better // results. // TODO: Compare this strategy with other strategies such as top K. -func leastQueuingFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func leastQueuingFilterFunc(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { min := math.MaxInt max := 0 filtered := []*backend.PodMetrics{} @@ -131,7 +132,7 @@ func lowQueueingPodPredicate(_ *LLMRequest, pod *backend.PodMetrics) bool { // should consider them all instead of the absolute minimum one. This worked better than picking the // least one as it gives more choices for the next filter, which on aggregate gave better results. // TODO: Compare this strategy with other strategies such as top K. -func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func leastKVCacheFilterFunc(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { min := math.MaxFloat64 var max float64 = 0 filtered := []*backend.PodMetrics{} diff --git a/pkg/ext-proc/scheduling/filter_test.go b/pkg/ext-proc/scheduling/filter_test.go index 224dc83f6..ee1a8c331 100644 --- a/pkg/ext-proc/scheduling/filter_test.go +++ b/pkg/ext-proc/scheduling/filter_test.go @@ -4,11 +4,15 @@ import ( "errors" "testing" + "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) func TestFilter(t *testing.T) { + logger := logutil.NewTestLogger() + tests := []struct { name string req *LLMRequest @@ -19,7 +23,7 @@ func TestFilter(t *testing.T) { }{ { name: "simple filter without successor, failure", - filter: &filter{filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + filter: &filter{filter: func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { return nil, errors.New("filter error") }}, err: true, @@ -201,7 +205,7 @@ func TestFilter(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := test.filter.Filter(test.req, test.input) + got, err := test.filter.Filter(logger, test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } @@ -214,6 +218,8 @@ func TestFilter(t *testing.T) { } func TestFilterFunc(t *testing.T) { + logger := logutil.NewTestLogger() + tests := []struct { name string f filterFunc @@ -395,7 +401,7 @@ func TestFilterFunc(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := test.f(test.req, test.input) + got, err := test.f(logger, test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 505648989..16cf90b87 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -2,12 +2,14 @@ package scheduling import ( + "context" "fmt" "math/rand" + "github.com/go-logr/logr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -82,8 +84,8 @@ var ( // request to make room for critical requests. nextOnFailure: &filter{ name: "drop request", - filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - klog.V(logutil.DEFAULT).InfoS("Request dropped", "request", req) + filter: func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + logger.V(logutil.DEFAULT).Info("Request dropped", "request", req) return []*backend.PodMetrics{}, status.Errorf( codes.ResourceExhausted, "dropping request due to limited backend resources") }, @@ -110,14 +112,15 @@ type PodMetricsProvider interface { } // Schedule finds the target pod based on metrics and the requested lora adapter. -func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { - klog.V(logutil.VERBOSE).InfoS("Scheduling a request", "request", req, "metrics", s.podMetricsProvider.AllPodMetrics()) - pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) +func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod backend.Pod, err error) { + logger := log.FromContext(ctx).WithValues("request", req) + logger.V(logutil.VERBOSE).Info("Scheduling a request", "metrics", s.podMetricsProvider.AllPodMetrics()) + pods, err := s.filter.Filter(logger, req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } - klog.V(logutil.VERBOSE).InfoS("Selecting a random pod from the candidates", "candidatePods", pods) + logger.V(logutil.VERBOSE).Info("Selecting a random pod from the candidates", "candidatePods", pods) i := rand.Intn(len(pods)) return pods[i].Pod, nil } diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index ed260b046..fb9741d24 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -13,10 +13,10 @@ import ( "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "k8s.io/apimachinery/pkg/types" - klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" @@ -108,14 +108,15 @@ func (r *ExtProcServerRunner) SetupWithManager(mgr ctrl.Manager) error { // AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server. // The runnable implements LeaderElectionRunnable with leader election disabled. func (r *ExtProcServerRunner) AsRunnable( + logger logr.Logger, podDatastore *backend.K8sDatastore, podMetricsClient backend.PodMetricsClient, ) manager.Runnable { return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { // Initialize backend provider pp := backend.NewProvider(podMetricsClient, podDatastore) - if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil { - klog.ErrorS(err, "Failed to initialize backend provider") + if err := pp.Init(logger.WithName("provider"), r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil { + logger.Error(err, "Failed to initialize backend provider") return err } @@ -127,10 +128,10 @@ func (r *ExtProcServerRunner) AsRunnable( cert, err = tls.LoadX509KeyPair(r.CertPath+"/tls.crt", r.CertPath+"/tls.key") } else { // Create tls based credential. - cert, err = createSelfSignedTLSCertificate() + cert, err = createSelfSignedTLSCertificate(logger) } if err != nil { - klog.ErrorS(err, "Failed to create self signed certificate") + logger.Error(err, "Failed to create self signed certificate") return err } @@ -152,11 +153,11 @@ func (r *ExtProcServerRunner) AsRunnable( })) } -func createSelfSignedTLSCertificate() (tls.Certificate, error) { +func createSelfSignedTLSCertificate(logger logr.Logger) (tls.Certificate, error) { serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) if err != nil { - klog.ErrorS(err, "Failed to create serial number for self-signed cert") + logger.Error(err, "Failed to create serial number for self-signed cert") return tls.Certificate{}, err } now := time.Now() @@ -175,13 +176,13 @@ func createSelfSignedTLSCertificate() (tls.Certificate, error) { priv, err := rsa.GenerateKey(rand.Reader, 4096) if err != nil { - klog.ErrorS(err, "Failed to generate key for self-signed cert") + logger.Error(err, "Failed to generate key for self-signed cert") return tls.Certificate{}, err } derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) if err != nil { - klog.ErrorS(err, "Failed to create self-signed certificate") + logger.Error(err, "Failed to create self-signed certificate") return tls.Certificate{}, err } @@ -189,7 +190,7 @@ func createSelfSignedTLSCertificate() (tls.Certificate, error) { privBytes, err := x509.MarshalPKCS8PrivateKey(priv) if err != nil { - klog.ErrorS(err, "Failed to marshal private key for self-signed certificate") + logger.Error(err, "Failed to marshal private key for self-signed certificate") return tls.Certificate{}, err } keyBytes := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) diff --git a/pkg/ext-proc/server/runserver_test.go b/pkg/ext-proc/server/runserver_test.go index df2081aa5..1badb8fd9 100644 --- a/pkg/ext-proc/server/runserver_test.go +++ b/pkg/ext-proc/server/runserver_test.go @@ -6,11 +6,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) func TestRunnable(t *testing.T) { // Make sure AsRunnable() does not use leader election. - runner := server.NewDefaultExtProcServerRunner().AsRunnable(nil, nil) + runner := server.NewDefaultExtProcServerRunner().AsRunnable(logutil.NewTestLogger(), nil, nil) r, ok := runner.(manager.LeaderElectionRunnable) if !ok { t.Fatal("runner is not LeaderElectionRunnable") diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index c83dbcb91..9eca2edc6 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -8,9 +8,11 @@ import ( "github.com/bojand/ghz/printer" "github.com/bojand/ghz/runner" + "github.com/go-logr/logr" "github.com/jhump/protoreflect/desc" + uberzap "go.uber.org/zap" "google.golang.org/protobuf/proto" - klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" @@ -41,24 +43,29 @@ func main() { } func run() error { - klog.InitFlags(nil) + opts := zap.Options{ + Development: true, + } + opts.BindFlags(flag.CommandLine) flag.Parse() + logger := zap.New(zap.UseFlagOptions(&opts), zap.RawZapOpts(uberzap.AddCaller())) + if *localServer { - test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels()) + test.StartExtProc(logger, port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels()) time.Sleep(time.Second) // wait until server is up - klog.InfoS("Server started") + logger.Info("Server started") } report, err := runner.Run( "envoy.service.ext_proc.v3.ExternalProcessor.Process", *svrAddr, runner.WithInsecure(true), - runner.WithBinaryDataFunc(generateRequest), + runner.WithBinaryDataFunc(generateRequestFunc(logger)), runner.WithTotalRequests(uint(*totalRequests)), ) if err != nil { - klog.ErrorS(err, "Runner failed") + logger.Error(err, "Runner failed") return err } @@ -71,14 +78,16 @@ func run() error { return nil } -func generateRequest(mtd *desc.MethodDescriptor, callData *runner.CallData) []byte { - numModels := *numFakePods * (*numModelsPerPod) - req := test.GenerateRequest(modelName(int(callData.RequestNumber) % numModels)) - data, err := proto.Marshal(req) - if err != nil { - logutil.Fatal(err, "Failed to marshal request", "request", req) +func generateRequestFunc(logger logr.Logger) func(mtd *desc.MethodDescriptor, callData *runner.CallData) []byte { + return func(mtd *desc.MethodDescriptor, callData *runner.CallData) []byte { + numModels := *numFakePods * (*numModelsPerPod) + req := test.GenerateRequest(logger, modelName(int(callData.RequestNumber)%numModels)) + data, err := proto.Marshal(req) + if err != nil { + logutil.Fatal(logger, err, "Failed to marshal request", "request", req) + } + return data } - return data } func fakeModels() map[string]*v1alpha1.InferenceModel { diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index 4c0007220..cb99a36be 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -7,9 +7,9 @@ import ( "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" "google.golang.org/grpc" "google.golang.org/grpc/reflection" - klog "k8s.io/klog/v2" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" @@ -17,7 +17,13 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) -func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { +func StartExtProc( + logger logr.Logger, + port int, + refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, + pods []*backend.PodMetrics, + models map[string]*v1alpha1.InferenceModel, +) *grpc.Server { ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) for _, pod := range pods { @@ -26,35 +32,35 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refresh } pmc := &backend.FakePodMetricsClient{Res: pms} pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods))) - if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil { - logutil.Fatal(err, "Failed to initialize") + if err := pp.Init(logger, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil { + logutil.Fatal(logger, err, "Failed to initialize") } - return startExtProc(port, pp, models) + return startExtProc(logger, port, pp, models) } // startExtProc starts an extProc server with fake pods. -func startExtProc(port int, pp *backend.Provider, models map[string]*v1alpha1.InferenceModel) *grpc.Server { +func startExtProc(logger logr.Logger, port int, pp *backend.Provider, models map[string]*v1alpha1.InferenceModel) *grpc.Server { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - logutil.Fatal(err, "Failed to listen", "port", port) + logutil.Fatal(logger, err, "Failed to listen", "port", port) } s := grpc.NewServer() extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(pp, scheduling.NewScheduler(pp), "target-pod", &backend.FakeDataStore{Res: models})) - klog.InfoS("gRPC server starting", "port", port) + logger.Info("gRPC server starting", "port", port) reflection.Register(s) go func() { err := s.Serve(lis) if err != nil { - logutil.Fatal(err, "Ext-proc failed with the err") + logutil.Fatal(logger, err, "Ext-proc failed with the err") } }() return s } -func GenerateRequest(model string) *extProcPb.ProcessingRequest { +func GenerateRequest(logger logr.Logger, model string) *extProcPb.ProcessingRequest { j := map[string]interface{}{ "model": model, "prompt": "hello", @@ -64,7 +70,7 @@ func GenerateRequest(model string) *extProcPb.ProcessingRequest { llmReq, err := json.Marshal(j) if err != nil { - logutil.Fatal(err, "Failed to unmarshal LLM request") + logutil.Fatal(logger, err, "Failed to unmarshal LLM request") } req := &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_RequestBody{ diff --git a/pkg/ext-proc/util/logging/fatal.go b/pkg/ext-proc/util/logging/fatal.go index 65926824d..1f85b4500 100644 --- a/pkg/ext-proc/util/logging/fatal.go +++ b/pkg/ext-proc/util/logging/fatal.go @@ -1,11 +1,15 @@ package logging -import "k8s.io/klog/v2" +import ( + "os" -// Fatal calls klog.ErrorS followed by klog.FlushAndExit(1). + "github.com/go-logr/logr" +) + +// Fatal calls logger.Error followed by os.Exit(1). // // This is a utility function and should not be used in production code! -func Fatal(err error, msg string, keysAndValues ...interface{}) { - klog.ErrorS(err, msg, keysAndValues...) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) +func Fatal(logger logr.Logger, err error, msg string, keysAndValues ...interface{}) { + logger.Error(err, msg, keysAndValues...) + os.Exit(1) } diff --git a/pkg/ext-proc/util/logging/logger.go b/pkg/ext-proc/util/logging/logger.go new file mode 100644 index 000000000..086a012fc --- /dev/null +++ b/pkg/ext-proc/util/logging/logger.go @@ -0,0 +1,20 @@ +package logging + +import ( + "context" + + "github.com/go-logr/logr" + uberzap "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +// NewTestLogger creates a new Zap logger using the dev mode. +func NewTestLogger() logr.Logger { + return zap.New(zap.UseDevMode(true), zap.RawZapOpts(uberzap.AddCaller())) +} + +// NewTestLoggerIntoContext creates a new Zap logger using the dev mode and inserts it into the given context. +func NewTestLoggerIntoContext(ctx context.Context) context.Context { + return log.IntoContext(ctx, zap.New(zap.UseDevMode(true), zap.RawZapOpts(uberzap.AddCaller()))) +} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 6424663bb..a99b6bd7d 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "errors" - "flag" "fmt" "io" "os" @@ -26,7 +25,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" k8syaml "k8s.io/apimachinery/pkg/util/yaml" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -47,6 +45,7 @@ var ( k8sClient k8sclient.Client testEnv *envtest.Environment scheme = runtime.NewScheme() + logger = logutil.NewTestLogger().V(logutil.VERBOSE) ) func TestKubeInferenceModelRequest(t *testing.T) { @@ -62,7 +61,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }{ { name: "select lower queue and kv cache, no active lora", - req: extprocutils.GenerateRequest("my-model"), + req: extprocutils.GenerateRequest(logger, "my-model"), // pod-1 will be picked because it has relatively low queue size and low KV cache. pods: []*backend.PodMetrics{ { @@ -115,7 +114,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, { name: "select active lora, low queue", - req: extprocutils.GenerateRequest("sql-lora"), + req: extprocutils.GenerateRequest(logger, "sql-lora"), // pod-1 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. pods: []*backend.PodMetrics{ @@ -180,7 +179,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, { name: "select no lora despite active model, avoid excessive queue size", - req: extprocutils.GenerateRequest("sql-lora"), + req: extprocutils.GenerateRequest(logger, "sql-lora"), // pod-2 will be picked despite it NOT having the requested model being active // as it's above the affinity for queue size. Also is critical, so we should // still honor request despite all queues > 5 @@ -246,7 +245,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, { name: "noncritical and all models past threshold, shed request", - req: extprocutils.GenerateRequest("sql-lora-sheddable"), + req: extprocutils.GenerateRequest(logger, "sql-lora-sheddable"), // no pods will be picked as all models are either above kv threshold, // queue threshold, or both. pods: []*backend.PodMetrics{ @@ -297,7 +296,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, { name: "noncritical, but one server has capacity, do not shed", - req: extprocutils.GenerateRequest("sql-lora-sheddable"), + req: extprocutils.GenerateRequest(logger, "sql-lora-sheddable"), // pod 0 will be picked as all other models are above threshold pods: []*backend.PodMetrics{ { @@ -418,9 +417,9 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP serverCtx, stopServer := context.WithCancel(context.Background()) go func() { if err := serverRunner.AsRunnable( - backend.NewK8sDataStore(backend.WithPods(pods)), pmc, + logger.WithName("ext-proc"), backend.NewK8sDataStore(backend.WithPods(pods)), pmc, ).Start(serverCtx); err != nil { - logutil.Fatal(err, "Failed to start ext-proc server") + logutil.Fatal(logger, err, "Failed to start ext-proc server") } }() @@ -431,13 +430,13 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP // Create a grpc connection conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - logutil.Fatal(err, "Failed to connect", "address", address) + logutil.Fatal(logger, err, "Failed to connect", "address", address) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) if err != nil { - logutil.Fatal(err, "Failed to create client") + logutil.Fatal(logger, err, "Failed to create client") } return client, func() { cancel() @@ -455,7 +454,7 @@ func BeforeSuit() { } cfg, err := testEnv.Start() if err != nil { - logutil.Fatal(err, "Failed to start test environment", "config", cfg) + logutil.Fatal(logger, err, "Failed to start test environment", "config", cfg) } utilruntime.Must(clientgoscheme.AddToScheme(scheme)) @@ -463,15 +462,16 @@ func BeforeSuit() { k8sClient, err = k8sclient.New(cfg, k8sclient.Options{Scheme: scheme}) if err != nil { - logutil.Fatal(err, "Failed to start k8s Client") + logutil.Fatal(logger, err, "Failed to start k8s Client") } else if k8sClient == nil { - logutil.Fatal(nil, "No error, but returned kubernetes client is nil", "config", cfg) + logutil.Fatal(logger, nil, "No error, but returned kubernetes client is nil", "config", cfg) } // Init runtime. + ctrl.SetLogger(logger) mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) if err != nil { - logutil.Fatal(err, "Failed to create controller manager") + logutil.Fatal(logger, err, "Failed to create controller manager") } serverRunner = runserver.NewDefaultExtProcServerRunner() @@ -481,50 +481,46 @@ func BeforeSuit() { serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(mgr); err != nil { - logutil.Fatal(err, "Failed to setup server runner") + logutil.Fatal(logger, err, "Failed to setup server runner") } // Start the controller manager in go routine, not blocking go func() { if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - logutil.Fatal(err, "Failed to start manager") + logutil.Fatal(logger, err, "Failed to start manager") } }() - klog.InfoS("Setting up hermetic ExtProc server") - klog.InitFlags(nil) - flag.Parse() - // Configure klog verbosity levels to print ext proc logs. - _ = flag.Lookup("v").Value.Set("3") + logger.Info("Setting up hermetic ExtProc server") // Unmarshal CRDs from file into structs manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") docs, err := readDocuments(manifestsPath) if err != nil { - logutil.Fatal(err, "Can't read object manifests", "path", manifestsPath) + logutil.Fatal(logger, err, "Can't read object manifests", "path", manifestsPath) } for _, doc := range docs { inferenceModel := &v1alpha1.InferenceModel{} if err = yaml.Unmarshal(doc, inferenceModel); err != nil { - logutil.Fatal(err, "Can't unmarshal object", "document", doc) + logutil.Fatal(logger, err, "Can't unmarshal object", "document", doc) } if inferenceModel.Kind == "InferenceModel" { - klog.InfoS("Creating inference model", "model", inferenceModel) + logger.Info("Creating inference model", "model", inferenceModel) if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { - logutil.Fatal(err, "Unable to create inferenceModel", "modelName", inferenceModel.Name) + logutil.Fatal(logger, err, "Unable to create inferenceModel", "modelName", inferenceModel.Name) } } } for _, doc := range docs { inferencePool := &v1alpha1.InferencePool{} if err = yaml.Unmarshal(doc, inferencePool); err != nil { - logutil.Fatal(err, "Can't unmarshal object", "document", doc) + logutil.Fatal(logger, err, "Can't unmarshal object", "document", doc) } if inferencePool.Kind == "InferencePool" { - klog.InfoS("Creating inference pool", "pool", inferencePool) + logger.Info("Creating inference pool", "pool", inferencePool) if err := k8sClient.Create(context.Background(), inferencePool); err != nil { - logutil.Fatal(err, "Unable to create inferencePool", "poolName", inferencePool.Name) + logutil.Fatal(logger, err, "Unable to create inferencePool", "poolName", inferencePool.Name) } } }