Skip to content

Added controller and datastore package #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/ext-proc/backend/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

type FakePodMetricsClient struct {
Err map[types.NamespacedName]error
Res map[types.NamespacedName]*PodMetrics
Res map[types.NamespacedName]*datastore.PodMetrics
}

func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *PodMetrics) (*PodMetrics, error) {
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) {
if err, ok := f.Err[existing.NamespacedName]; ok {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
"go.uber.org/multierr"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)
Expand All @@ -17,7 +18,7 @@ const (
fetchMetricsTimeout = 5 * time.Second
)

func NewProvider(pmc PodMetricsClient, datastore Datastore) *Provider {
func NewProvider(pmc PodMetricsClient, datastore datastore.Datastore) *Provider {
p := &Provider{
pmc: pmc,
datastore: datastore,
Expand All @@ -28,11 +29,11 @@ func NewProvider(pmc PodMetricsClient, datastore Datastore) *Provider {
// Provider provides backend pods and information such as metrics.
type Provider struct {
pmc PodMetricsClient
datastore Datastore
datastore datastore.Datastore
}

type PodMetricsClient interface {
FetchMetrics(ctx context.Context, existing *PodMetrics) (*PodMetrics, error)
FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error)
}

func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
Expand Down Expand Up @@ -100,7 +101,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error {
errCh := make(chan error)
processOnePod := func(key, value any) bool {
loggerTrace.Info("Pod and metric being processed", "pod", key, "metric", value)
existing := value.(*PodMetrics)
existing := value.(*datastore.PodMetrics)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
52 changes: 24 additions & 28 deletions pkg/ext-proc/backend/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
)

var (
pod1 = &PodMetrics{
Pod: Pod{
pod1 = &datastore.PodMetrics{
Pod: datastore.Pod{
NamespacedName: types.NamespacedName{
Name: "pod1",
},
},
Metrics: Metrics{
Metrics: datastore.Metrics{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
Expand All @@ -30,13 +31,13 @@ var (
},
},
}
pod2 = &PodMetrics{
Pod: Pod{
pod2 = &datastore.PodMetrics{
Pod: datastore.Pod{
NamespacedName: types.NamespacedName{
Name: "pod2",
},
},
Metrics: Metrics{
Metrics: datastore.Metrics{
WaitingQueueSize: 1,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
Expand All @@ -52,37 +53,33 @@ func TestProvider(t *testing.T) {
tests := []struct {
name string
pmc PodMetricsClient
datastore Datastore
want []*PodMetrics
datastore datastore.Datastore
want []*datastore.PodMetrics
}{
{
name: "Probing metrics success",
pmc: &FakePodMetricsClient{
Res: map[types.NamespacedName]*PodMetrics{
Res: map[types.NamespacedName]*datastore.PodMetrics{
pod1.NamespacedName: pod1,
pod2.NamespacedName: pod2,
},
},
datastore: &datastore{
pods: populateMap(pod1, pod2),
},
want: []*PodMetrics{
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
want: []*datastore.PodMetrics{
pod1,
pod2,
},
},
{
name: "Only pods in the datastore are probed",
pmc: &FakePodMetricsClient{
Res: map[types.NamespacedName]*PodMetrics{
Res: map[types.NamespacedName]*datastore.PodMetrics{
pod1.NamespacedName: pod1,
pod2.NamespacedName: pod2,
},
},
datastore: &datastore{
pods: populateMap(pod1),
},
want: []*PodMetrics{
datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil),
want: []*datastore.PodMetrics{
pod1,
},
},
Expand All @@ -92,19 +89,18 @@ func TestProvider(t *testing.T) {
Err: map[types.NamespacedName]error{
pod2.NamespacedName: errors.New("injected error"),
},
Res: map[types.NamespacedName]*PodMetrics{
Res: map[types.NamespacedName]*datastore.PodMetrics{
pod1.NamespacedName: pod1,
},
},
datastore: &datastore{
pods: populateMap(pod1, pod2),
},
want: []*PodMetrics{
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),

want: []*datastore.PodMetrics{
pod1,
// Failed to fetch pod2 metrics so it remains the default values.
{
Pod: Pod{NamespacedName: pod2.NamespacedName},
Metrics: Metrics{
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
Metrics: datastore.Metrics{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0,
MaxActiveModels: 0,
Expand All @@ -122,7 +118,7 @@ func TestProvider(t *testing.T) {
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
metrics := test.datastore.PodGetAll()
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *PodMetrics) bool {
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
return a.String() < b.String()
}))
assert.Equal(t, "", diff, "Unexpected diff (+got/-want)")
Expand All @@ -131,10 +127,10 @@ func TestProvider(t *testing.T) {
}
}

func populateMap(pods ...*PodMetrics) *sync.Map {
func populateMap(pods ...*datastore.PodMetrics) *sync.Map {
newMap := &sync.Map{}
for _, pod := range pods {
newMap.Store(pod.NamespacedName, &PodMetrics{Pod: Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
}
return newMap
}
10 changes: 5 additions & 5 deletions pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/prometheus/common/expfmt"
"go.uber.org/multierr"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

Expand All @@ -38,8 +38,8 @@ type PodMetricsClientImpl struct{}
// FetchMetrics fetches metrics from a given pod.
func (p *PodMetricsClientImpl) FetchMetrics(
ctx context.Context,
existing *backend.PodMetrics,
) (*backend.PodMetrics, error) {
existing *datastore.PodMetrics,
) (*datastore.PodMetrics, error) {
logger := log.FromContext(ctx)
loggerDefault := logger.V(logutil.DEFAULT)

Expand Down Expand Up @@ -79,8 +79,8 @@ func (p *PodMetricsClientImpl) FetchMetrics(
func promToPodMetrics(
logger logr.Logger,
metricFamilies map[string]*dto.MetricFamily,
existing *backend.PodMetrics,
) (*backend.PodMetrics, error) {
existing *datastore.PodMetrics,
) (*datastore.PodMetrics, error) {
var errs error
updated := existing.Clone()
runningQueueSize, err := getLatestMetric(logger, metricFamilies, RunningQueueSizeMetricName)
Expand Down
14 changes: 7 additions & 7 deletions pkg/ext-proc/backend/vllm/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

Expand All @@ -17,9 +17,9 @@ func TestPromToPodMetrics(t *testing.T) {
testCases := []struct {
name string
metricFamilies map[string]*dto.MetricFamily
expectedMetrics *backend.Metrics
expectedMetrics *datastore.Metrics
expectedErr error
initialPodMetrics *backend.PodMetrics
initialPodMetrics *datastore.PodMetrics
}{
{
name: "all metrics available",
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestPromToPodMetrics(t *testing.T) {
},
},
},
expectedMetrics: &backend.Metrics{
expectedMetrics: &datastore.Metrics{
RunningQueueSize: 15,
WaitingQueueSize: 25,
KVCacheUsagePercent: 0.9,
Expand All @@ -117,7 +117,7 @@ func TestPromToPodMetrics(t *testing.T) {
},
MaxActiveModels: 2,
},
initialPodMetrics: &backend.PodMetrics{},
initialPodMetrics: &datastore.PodMetrics{},
expectedErr: nil,
},
{
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestPromToPodMetrics(t *testing.T) {
},
},
},
expectedMetrics: &backend.Metrics{
expectedMetrics: &datastore.Metrics{
RunningQueueSize: 15,
WaitingQueueSize: 25,
KVCacheUsagePercent: 0.9,
Expand All @@ -216,7 +216,7 @@ func TestPromToPodMetrics(t *testing.T) {
},
MaxActiveModels: 0,
},
initialPodMetrics: &backend.PodMetrics{},
initialPodMetrics: &datastore.PodMetrics{},
expectedErr: errors.New("strconv.Atoi: parsing '2a': invalid syntax"),
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package backend
package controller

import (
"context"
Expand All @@ -12,14 +12,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

type InferenceModelReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
Datastore Datastore
Datastore datastore.Datastore
PoolNamespacedName types.NamespacedName
}

Expand Down
Loading