Skip to content

Commit 4bb3be3

Browse files
committed
removed ctx from datastore struct.
instead added ctx to one function that required it which is more aligned with best practices. Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent a13a123 commit 4bb3be3

File tree

8 files changed

+18
-21
lines changed

8 files changed

+18
-21
lines changed

cmd/epp/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func run() error {
167167

168168
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
169169
// Setup runner.
170-
datastore := datastore.NewDatastore(ctx, pmf)
170+
datastore := datastore.NewDatastore(pmf)
171171

172172
serverRunner := &runserver.ExtProcServerRunner{
173173
GrpcPort: *grpcPort,

pkg/epp/controller/inferencemodel_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func TestInferenceModelReconciler(t *testing.T) {
192192
WithIndex(&v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName).
193193
Build()
194194
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
195-
ds := datastore.NewDatastore(t.Context(), pmf)
195+
ds := datastore.NewDatastore(pmf)
196196
for _, m := range test.modelsInStore {
197197
ds.ModelSetIfOlder(m)
198198
}

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestInferencePoolReconciler(t *testing.T) {
9595
ctx := context.Background()
9696

9797
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
98-
datastore := datastore.NewDatastore(ctx, pmf)
98+
datastore := datastore.NewDatastore(pmf)
9999
inferencePoolReconciler := &InferencePoolReconciler{PoolNamespacedName: namespacedName, Client: fakeClient, Datastore: datastore}
100100

101101
// Step 1: Inception, only ready pods matching pool1 are added to the store.

pkg/epp/controller/pod_reconciler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
5959
return ctrl.Result{}, err
6060
}
6161

62-
c.updateDatastore(logger, pod, pool)
62+
c.updateDatastore(ctx, logger, pod, pool)
6363
return ctrl.Result{}, nil
6464
}
6565

@@ -69,13 +69,13 @@ func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
6969
Complete(c)
7070
}
7171

72-
func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) {
72+
func (c *PodReconciler) updateDatastore(ctx context.Context, logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) {
7373
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
7474
if !pod.DeletionTimestamp.IsZero() || !c.Datastore.PoolLabelsMatch(pod.Labels) || !podIsReady(pod) {
7575
logger.V(logutil.DEBUG).Info("Pod removed or not added", "name", namespacedName)
7676
c.Datastore.PodDelete(namespacedName)
7777
} else {
78-
if c.Datastore.PodUpdateOrAddIfNotExist(pod, pool) {
78+
if c.Datastore.PodUpdateOrAddIfNotExist(ctx, pod, pool) {
7979
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
8080
} else {
8181
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ func TestPodReconciler(t *testing.T) {
181181
Build()
182182

183183
// Configure the initial state of the datastore.
184-
store := datastore.NewDatastore(t.Context(), pmf)
184+
store := datastore.NewDatastore(pmf)
185185
store.PoolSet(test.pool)
186186
for _, pod := range test.existingPods {
187-
store.PodUpdateOrAddIfNotExist(pod, pool)
187+
store.PodUpdateOrAddIfNotExist(t.Context(), pod, pool)
188188
}
189189

190190
podReconciler := &PodReconciler{Client: fakeClient, Datastore: store}

pkg/epp/datastore/datastore.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,16 @@ type Datastore interface {
6060
PodGetAll() []backendmetrics.PodMetrics
6161
// PodList lists pods matching the given predicate.
6262
PodList(func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
63-
PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool
63+
PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *v1alpha2.InferencePool) bool
6464
PodDelete(namespacedName types.NamespacedName)
6565
PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool)
6666

6767
// Clears the store state, happens when the pool gets deleted.
6868
Clear()
6969
}
7070

71-
func NewDatastore(parentCtx context.Context, pmf *backendmetrics.PodMetricsFactory) *datastore {
71+
func NewDatastore(pmf *backendmetrics.PodMetricsFactory) *datastore {
7272
store := &datastore{
73-
parentCtx: parentCtx,
7473
poolAndModelsMu: sync.RWMutex{},
7574
models: make(map[string]*v1alpha2.InferenceModel),
7675
pods: &sync.Map{},
@@ -80,8 +79,6 @@ func NewDatastore(parentCtx context.Context, pmf *backendmetrics.PodMetricsFacto
8079
}
8180

8281
type datastore struct {
83-
// parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore.
84-
parentCtx context.Context
8582
// poolAndModelsMu is used to synchronize access to pool and the models map.
8683
poolAndModelsMu sync.RWMutex
8784
pool *v1alpha2.InferencePool
@@ -228,15 +225,15 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b
228225
return res
229226
}
230227

231-
func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool {
228+
func (ds *datastore) PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *v1alpha2.InferencePool) bool {
232229
namespacedName := types.NamespacedName{
233230
Name: pod.Name,
234231
Namespace: pod.Namespace,
235232
}
236233
var pm backendmetrics.PodMetrics
237234
existing, ok := ds.pods.Load(namespacedName)
238235
if !ok {
239-
pm = ds.pmf.NewPodMetrics(ds.parentCtx, pod, ds)
236+
pm = ds.pmf.NewPodMetrics(ctx, pod, ds)
240237
ds.pods.Store(namespacedName, pm)
241238
} else {
242239
pm = existing.(backendmetrics.PodMetrics)
@@ -262,7 +259,7 @@ func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client,
262259
if podIsReady(&pod) {
263260
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
264261
activePods[pod.Name] = true
265-
if ds.PodUpdateOrAddIfNotExist(&pod, pool) {
262+
if ds.PodUpdateOrAddIfNotExist(ctx, &pod, pool) {
266263
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
267264
} else {
268265
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)

pkg/epp/datastore/datastore_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestPool(t *testing.T) {
7272
for _, tt := range tests {
7373
t.Run(tt.name, func(t *testing.T) {
7474
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
75-
datastore := NewDatastore(context.Background(), pmf)
75+
datastore := NewDatastore(pmf)
7676
datastore.PoolSet(tt.inferencePool)
7777
gotPool, gotErr := datastore.PoolGet()
7878
if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
@@ -204,7 +204,7 @@ func TestModel(t *testing.T) {
204204
for _, test := range tests {
205205
t.Run(test.name, func(t *testing.T) {
206206
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
207-
ds := NewDatastore(t.Context(), pmf)
207+
ds := NewDatastore(pmf)
208208
for _, m := range test.existingModels {
209209
ds.ModelSetIfOlder(m)
210210
}
@@ -318,10 +318,10 @@ func TestMetrics(t *testing.T) {
318318
ctx, cancel := context.WithCancel(context.Background())
319319
defer cancel()
320320
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
321-
ds := NewDatastore(ctx, pmf)
321+
ds := NewDatastore(pmf)
322322
ds.PoolSet(inferencePool)
323323
for _, pod := range test.storePods {
324-
ds.PodUpdateOrAddIfNotExist(pod, inferencePool)
324+
ds.PodUpdateOrAddIfNotExist(ctx, pod, inferencePool)
325325
}
326326
assert.EventuallyWithT(t, func(t *assert.CollectT) {
327327
got := ds.PodGetAll()

test/integration/epp/hermetic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1628,7 +1628,7 @@ func BeforeSuite() func() {
16281628
pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond)
16291629
// Adjust from defaults
16301630
serverRunner.PoolName = "vllm-llama3-8b-instruct-pool"
1631-
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
1631+
serverRunner.Datastore = datastore.NewDatastore(pmf)
16321632
serverRunner.SecureServing = false
16331633

16341634
if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {

0 commit comments

Comments
 (0)