Skip to content

Commit 91fc723

Browse files
authored
Merge branch 'kubernetes-sigs:main' into main
2 parents 10b05ee + ddc3d69 commit 91fc723

28 files changed

+553
-315
lines changed

.github/ISSUE_TEMPLATE/blank_issue.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
name: Blank Issue
3+
about: Create a new issue from scratch
4+
title: ''
5+
labels: needs-triage
6+
assignees: ''
7+
8+
---

.github/ISSUE_TEMPLATE/bug_request.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
---
22
name: Bug Report
33
about: Report a bug you encountered
4-
labels: kind/bug
4+
title: ''
5+
labels: kind/bug, needs-triage
6+
assignees: ''
57

68
---
79

.github/ISSUE_TEMPLATE/config.yml

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
blank_issues_enabled: false

.github/ISSUE_TEMPLATE/feature_request.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name: Feature request
33
about: Suggest an idea for this project
44
title: ''
5-
labels: ''
5+
labels: needs-triage
66
assignees: ''
77

88
---
@@ -12,4 +12,3 @@ assignees: ''
1212
**What would you like to be added**:
1313

1414
**Why is this needed**:
15-

.github/ISSUE_TEMPLATE/new-release.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ about: Propose a new release
44
title: Release v0.x.0
55
labels: ''
66
assignees: ''
7+
78
---
89

910
- [Introduction](#introduction)

Makefile

+5-1
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,12 @@ vet: ## Run go vet against code.
123123
# test: manifests generate fmt vet envtest image-build ## Run tests.
124124
# KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out
125125

126+
.PHONY: test-unit
127+
test-unit: ## Run unit tests.
128+
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./pkg/... -race -coverprofile cover.out
129+
126130
.PHONY: test-integration
127-
test-integration: ## Run tests.
131+
test-integration: ## Run integration tests.
128132
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./test/integration/epp/... -race -coverprofile cover.out
129133

130134
.PHONY: test-e2e

README.md

+50-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,56 @@
22
[![Go Reference](https://pkg.go.dev/badge/sigs.k8s.io/gateway-api-inference-extension.svg)](https://pkg.go.dev/sigs.k8s.io/gateway-api-inference-extension)
33
[![License](https://img.shields.io/github/license/kubernetes-sigs/gateway-api-inference-extension)](/LICENSE)
44

5-
# Gateway API Inference Extension
5+
# Gateway API Inference Extension (GIE)
6+
7+
This project offers tools for AI Inference, enabling developers to build [Inference Gateways].
8+
9+
[Inference Gateways]:#concepts-and-definitions
10+
11+
## Concepts and Definitions
12+
13+
The following are some key industry terms that are important to understand for
14+
this project:
15+
16+
- **Model**: A generative AI model that has learned patterns from data and is
17+
used for inference. Models vary in size and architecture, from smaller
18+
domain-specific models to massive multi-billion parameter neural networks that
19+
are optimized for diverse language tasks.
20+
- **Inference**: The process of running a generative AI model, such as a large
21+
language model, diffusion model etc, to generate text, embeddings, or other
22+
outputs from input data.
23+
- **Model server**: A service (in our case, containerized) responsible for
24+
receiving inference requests and returning predictions from a model.
25+
- **Accelerator**: specialized hardware, such as Graphics Processing Units
26+
(GPUs) that can be attached to Kubernetes nodes to speed up computations,
27+
particularly for training and inference tasks.
28+
29+
And the following are more specific terms to this project:
30+
31+
- **Scheduler**: Makes decisions about which endpoint is optimal (best cost /
32+
best performance) for an inference request based on `Metrics and Capabilities`
33+
from [Model Serving](/docs/proposals/003-model-server-protocol/README.md).
34+
- **Metrics and Capabilities**: Data provided by model serving platforms about
35+
performance, availability and capabilities to optimize routing. Includes
36+
things like [Prefix Cache] status or [LoRA Adapters] availability.
37+
- **Endpoint Selector**: A `Scheduler` combined with `Metrics and Capabilities`
38+
systems is often referred to together as an [Endpoint Selection Extension]
39+
(this is also sometimes referred to as an "endpoint picker", or "EPP").
40+
- **Inference Gateway**: A proxy/load-balancer which has been coupled with a
41+
`Endpoint Selector`. It provides optimized routing and load balancing for
42+
serving Kubernetes self-hosted generative Artificial Intelligence (AI)
43+
workloads. It simplifies the deployment, management, and observability of AI
44+
inference workloads.
45+
46+
For deeper insights and more advanced concepts, refer to our [proposals](/docs/proposals).
47+
48+
[Inference]:https://www.digitalocean.com/community/tutorials/llm-inference-optimization
49+
[Gateway API]:https://github.com/kubernetes-sigs/gateway-api
50+
[Prefix Cache]:https://docs.vllm.ai/en/stable/design/v1/prefix_caching.html
51+
[LoRA Adapters]:https://docs.vllm.ai/en/stable/features/lora.html
52+
[Endpoint Selection Extension]:https://gateway-api-inference-extension.sigs.k8s.io/#endpoint-selection-extension
53+
54+
## Technical Overview
655

756
This extension upgrades an [ext-proc](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/ext_proc_filter)-capable proxy or gateway - such as Envoy Gateway, kGateway, or the GKE Gateway - to become an **inference gateway** - supporting inference platform teams self-hosting large language models on Kubernetes. This integration makes it easy to expose and control access to your local [OpenAI-compatible chat completion endpoints](https://platform.openai.com/docs/api-reference/chat) to other workloads on or off cluster, or to integrate your self-hosted models alongside model-as-a-service providers in a higher level **AI Gateway** like LiteLLM, Solo AI Gateway, or Apigee.
857

docs/proposals/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Proposals Best Practices
2+
3+
4+
## Naming
5+
The directory of the proposal should lead with a 4-digit PR number (will move to 5,6,... should our PR count get that high), followed by kebab-cased title. The PR number is not known until the PR is cut, so development can use a placeholder, ex. XXXX-my-proposal. PR number is used b/c it is unique & chronological, allowing the default ordering of proposals to follow the timeline of development.

pkg/epp/backend/metrics/pod_metrics.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ type podMetrics struct {
4141
ds Datastore
4242
interval time.Duration
4343

44-
parentCtx context.Context
45-
once sync.Once // ensure the StartRefreshLoop is only called once.
46-
done chan struct{}
44+
once sync.Once // ensure the StartRefreshLoop is only called once.
45+
done chan struct{}
4746

4847
logger logr.Logger
4948
}
@@ -79,8 +78,8 @@ func toInternalPod(in *corev1.Pod) *Pod {
7978
}
8079

8180
// start starts a goroutine exactly once to periodically update metrics. The goroutine will be
82-
// stopped either when stop() is called, or the parentCtx is cancelled.
83-
func (pm *podMetrics) startRefreshLoop() {
81+
// stopped either when stop() is called, or the given ctx is cancelled.
82+
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
8483
pm.once.Do(func() {
8584
go func() {
8685
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
@@ -90,7 +89,7 @@ func (pm *podMetrics) startRefreshLoop() {
9089
select {
9190
case <-pm.done:
9291
return
93-
case <-pm.parentCtx.Done():
92+
case <-ctx.Done():
9493
return
9594
case <-ticker.C: // refresh metrics periodically
9695
if err := pm.refreshMetrics(); err != nil {

pkg/epp/backend/metrics/types.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,17 @@ type PodMetricsFactory struct {
4343
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
4444
pod := toInternalPod(in)
4545
pm := &podMetrics{
46-
pmc: f.pmc,
47-
ds: ds,
48-
interval: f.refreshMetricsInterval,
49-
parentCtx: parentCtx,
50-
once: sync.Once{},
51-
done: make(chan struct{}),
52-
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
46+
pmc: f.pmc,
47+
ds: ds,
48+
interval: f.refreshMetricsInterval,
49+
once: sync.Once{},
50+
done: make(chan struct{}),
51+
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
5352
}
5453
pm.pod.Store(pod)
5554
pm.metrics.Store(newMetrics())
5655

57-
pm.startRefreshLoop()
56+
pm.startRefreshLoop(parentCtx)
5857
return pm
5958
}
6059

pkg/epp/controller/pod_reconciler.go

+22
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
"k8s.io/client-go/tools/record"
2727
ctrl "sigs.k8s.io/controller-runtime"
2828
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/event"
2930
"sigs.k8s.io/controller-runtime/pkg/log"
31+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3032
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3133
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3234
podutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pod"
@@ -63,8 +65,28 @@ func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
6365
}
6466

6567
func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
68+
filter := predicate.Funcs{
69+
CreateFunc: func(ce event.CreateEvent) bool {
70+
pod := ce.Object.(*corev1.Pod)
71+
return c.Datastore.PoolLabelsMatch(pod.GetLabels())
72+
},
73+
UpdateFunc: func(ue event.UpdateEvent) bool {
74+
oldPod := ue.ObjectOld.(*corev1.Pod)
75+
newPod := ue.ObjectNew.(*corev1.Pod)
76+
return c.Datastore.PoolLabelsMatch(oldPod.GetLabels()) || c.Datastore.PoolLabelsMatch(newPod.GetLabels())
77+
},
78+
DeleteFunc: func(de event.DeleteEvent) bool {
79+
pod := de.Object.(*corev1.Pod)
80+
return c.Datastore.PoolLabelsMatch(pod.GetLabels())
81+
},
82+
GenericFunc: func(ge event.GenericEvent) bool {
83+
pod := ge.Object.(*corev1.Pod)
84+
return c.Datastore.PoolLabelsMatch(pod.GetLabels())
85+
},
86+
}
6687
return ctrl.NewControllerManagedBy(mgr).
6788
For(&corev1.Pod{}).
89+
WithEventFilter(filter).
6890
Complete(c)
6991
}
7092

pkg/epp/datastore/datastore.go

+3
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ func (ds *datastore) PoolHasSynced() bool {
150150
func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool {
151151
ds.poolAndModelsMu.RLock()
152152
defer ds.poolAndModelsMu.RUnlock()
153+
if ds.pool == nil {
154+
return false
155+
}
153156
poolSelector := selectorFromInferencePoolSelector(ds.pool.Spec.Selector)
154157
podSet := labels.Set(podLabels)
155158
return poolSelector.Matches(podSet)

pkg/epp/datastore/datastore_test.go

+91
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,94 @@ func TestMetrics(t *testing.T) {
355355
})
356356
}
357357
}
358+
359+
func TestPods(t *testing.T) {
360+
updatedPod := &corev1.Pod{
361+
ObjectMeta: metav1.ObjectMeta{
362+
Name: "pod1",
363+
},
364+
Spec: corev1.PodSpec{
365+
NodeName: "node-1",
366+
},
367+
}
368+
tests := []struct {
369+
name string
370+
op func(ctx context.Context, ds Datastore)
371+
existingPods []*corev1.Pod
372+
wantPods []*corev1.Pod
373+
}{
374+
{
375+
name: "Add new pod, no existing pods, should add",
376+
existingPods: []*corev1.Pod{},
377+
wantPods: []*corev1.Pod{pod1},
378+
op: func(ctx context.Context, ds Datastore) {
379+
ds.PodUpdateOrAddIfNotExist(pod1)
380+
},
381+
},
382+
{
383+
name: "Add new pod, with existing pods, should add",
384+
existingPods: []*corev1.Pod{pod1},
385+
wantPods: []*corev1.Pod{pod1, pod2},
386+
op: func(ctx context.Context, ds Datastore) {
387+
ds.PodUpdateOrAddIfNotExist(pod2)
388+
},
389+
},
390+
{
391+
name: "Update existing pod, new field, should update",
392+
existingPods: []*corev1.Pod{pod1},
393+
wantPods: []*corev1.Pod{updatedPod},
394+
op: func(ctx context.Context, ds Datastore) {
395+
ds.PodUpdateOrAddIfNotExist(updatedPod)
396+
},
397+
},
398+
{
399+
name: "Update existing pod, no new fields, should not update",
400+
existingPods: []*corev1.Pod{pod1},
401+
wantPods: []*corev1.Pod{pod1},
402+
op: func(ctx context.Context, ds Datastore) {
403+
incoming := &corev1.Pod{
404+
ObjectMeta: metav1.ObjectMeta{
405+
Name: "pod1",
406+
Namespace: "default",
407+
},
408+
}
409+
ds.PodUpdateOrAddIfNotExist(incoming)
410+
},
411+
},
412+
{
413+
name: "Delete the pod",
414+
wantPods: []*corev1.Pod{pod1},
415+
op: func(ctx context.Context, ds Datastore) {
416+
ds.PodDelete(pod2NamespacedName)
417+
},
418+
},
419+
{
420+
name: "Delete the pod that doesn't exist",
421+
existingPods: []*corev1.Pod{pod1},
422+
wantPods: []*corev1.Pod{pod1},
423+
op: func(ctx context.Context, ds Datastore) {
424+
ds.PodDelete(pod2NamespacedName)
425+
},
426+
},
427+
}
428+
for _, test := range tests {
429+
t.Run(test.name, func(t *testing.T) {
430+
ctx := context.Background()
431+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
432+
ds := NewDatastore(t.Context(), pmf)
433+
for _, pod := range test.existingPods {
434+
ds.PodUpdateOrAddIfNotExist(pod)
435+
}
436+
437+
test.op(ctx, ds)
438+
var gotPods []*corev1.Pod
439+
for _, pm := range ds.PodGetAll() {
440+
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().NamespacedName.Name, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().Address}}
441+
gotPods = append(gotPods, pod)
442+
}
443+
if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) {
444+
t.Logf("got (%v) != want (%v);", gotPods, test.wantPods)
445+
}
446+
})
447+
}
448+
}

pkg/epp/handlers/request.go

+3
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
138138
// The above PR will address endpoint admission, but currently any request without a body will be
139139
// routed to a random upstream pod.
140140
pod := GetRandomPod(s.datastore)
141+
if pod == nil {
142+
return errutil.Error{Code: errutil.Internal, Msg: "no pods available in datastore"}
143+
}
141144
pool, err := s.datastore.PoolGet()
142145
if err != nil {
143146
return err

pkg/epp/handlers/server.go

+3
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,9 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
449449

450450
func GetRandomPod(ds datastore.Datastore) *backendmetrics.Pod {
451451
pods := ds.PodGetAll()
452+
if len(pods) == 0 {
453+
return nil
454+
}
452455
number := rand.Intn(len(pods))
453456
pod := pods[number]
454457
return pod.GetPod()

0 commit comments

Comments
 (0)