Skip to content

Commit 354c12e

Browse files
committed
Add EndpointSlice consumer helper functions
This commit adds helper functions for EndpointSlice consumers to make it easier to transition from Endpoints to EndpointSlices. The new package provides: 1. EndpointSliceConsumer - Core component that tracks EndpointSlices and provides a unified view of endpoints for a service 2. EndpointSliceInformer - Informer-like interface for EndpointSlices 3. EndpointSliceLister - Lister-like interface for EndpointSlices These helpers handle the complexity of merging multiple slices for the same service and deduplicating endpoints that might appear in multiple slices. Benefits: - Easier migration from Endpoints to EndpointSlices with familiar interfaces - Simplified handling of multiple slices without manual merging and deduplication - Improved performance by leveraging the scalability of the EndpointSlice API - Consistent view of endpoints even as they move between slices Fixes kubernetes#124777 Signed-off-by: Mad Bergo <[email protected]>
1 parent 66931f0 commit 354c12e

File tree

7 files changed

+1076
-0
lines changed

7 files changed

+1076
-0
lines changed

Diff for: staging/src/k8s.io/endpointslice/consumer/README.md

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# EndpointSlice Consumer Helpers
2+
3+
This package provides helper functions for consuming EndpointSlices in Kubernetes.
4+
5+
## Overview
6+
7+
The EndpointSlice API is the mechanism that Kubernetes uses to let Services scale to handle large numbers of backends. However, working with EndpointSlices directly can be challenging because a single Service may have multiple EndpointSlice objects, and endpoints can move between slices.
8+
9+
This package provides helper functions that make it easier to consume EndpointSlices by:
10+
11+
1. Providing a unified view of all endpoints for a service across multiple EndpointSlice objects
12+
2. Handling the complexity of tracking, merging, and deduplicating endpoints from multiple slices
13+
3. Offering informer-like and lister-like interfaces that are familiar to Kubernetes developers
14+
15+
## Components
16+
17+
### EndpointSliceConsumer
18+
19+
The `EndpointSliceConsumer` is the core component that tracks EndpointSlices and provides a unified view of endpoints for a service.
20+
21+
```go
22+
// Create a new consumer
23+
consumer := consumer.NewEndpointSliceConsumer("node1")
24+
25+
// Add an event handler
26+
consumer.AddEventHandler(consumer.EndpointChangeHandlerFunc(func(serviceNN types.NamespacedName, slices []*discovery.EndpointSlice) {
27+
fmt.Printf("Service %s/%s has %d slices\n", serviceNN.Namespace, serviceNN.Name, len(slices))
28+
}))
29+
30+
// Get all endpoints for a service
31+
endpoints := consumer.GetEndpoints(types.NamespacedName{Namespace: "default", Name: "my-service"})
32+
```
33+
34+
### EndpointSliceInformer
35+
36+
The `EndpointSliceInformer` provides an informer-like interface for EndpointSlices that handles merging multiple slices for the same service.
37+
38+
```go
39+
// Create a new informer
40+
informer := consumer.NewEndpointSliceInformer(informerFactory, "node1")
41+
42+
// Add an event handler
43+
informer.AddEventHandler(consumer.EndpointChangeHandlerFunc(func(serviceNN types.NamespacedName, slices []*discovery.EndpointSlice) {
44+
klog.InfoS("Service endpoints changed", "namespace", serviceNN.Namespace, "name", serviceNN.Name, "slices", len(slices))
45+
}))
46+
47+
// Start the informer
48+
if err := informer.Run(ctx); err != nil {
49+
klog.ErrorS(err, "Failed to run EndpointSliceInformer")
50+
return
51+
}
52+
53+
// Get endpoints for a service
54+
endpoints := informer.GetEndpoints(types.NamespacedName{Namespace: "default", Name: "my-service"})
55+
```
56+
57+
### EndpointSliceLister
58+
59+
The `EndpointSliceLister` provides a lister-like interface for EndpointSlices that handles merging multiple slices for the same service.
60+
61+
```go
62+
// Create a new lister
63+
lister := consumer.NewEndpointSliceLister(endpointSliceLister, "node1")
64+
65+
// Get all endpoints for a service
66+
endpoints, err := lister.EndpointSlices("default").GetEndpoints("my-service")
67+
if err != nil {
68+
klog.ErrorS(err, "Failed to get endpoints")
69+
return
70+
}
71+
```
72+
73+
## Migrating from Endpoints to EndpointSlices
74+
75+
This package is designed to help applications migrate from using the Endpoints API to using the EndpointSlices API. The interfaces provided by this package are similar to those used with Endpoints, making the migration easier.
76+
77+
When migrating from Endpoints to EndpointSlices, consider the following:
78+
79+
1. EndpointSlices are more scalable than Endpoints, especially for services with a large number of backends
80+
2. EndpointSlices provide more information about endpoints, such as topology hints and node names
81+
3. EndpointSlices are the preferred way to access endpoint information in Kubernetes
82+
83+
## Example
84+
85+
See the [example_test.go](example_test.go) file for complete examples of how to use this package.
+249
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package consumer
18+
19+
import (
20+
"fmt"
21+
"reflect"
22+
"sort"
23+
"sync"
24+
25+
discovery "k8s.io/api/discovery/v1"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/klog/v2"
28+
)
29+
30+
// EndpointSliceConsumer provides a unified view of endpoints for services
31+
// across multiple EndpointSlice objects.
32+
type EndpointSliceConsumer struct {
33+
// lock protects slicesByService.
34+
lock sync.RWMutex
35+
36+
// slicesByService maps service namespaced names to a map of EndpointSlices
37+
// keyed by their names.
38+
slicesByService map[types.NamespacedName]map[string]*discovery.EndpointSlice
39+
40+
// handlers for endpoint changes
41+
handlers []EndpointChangeHandler
42+
43+
// nodeName is the name of the node this consumer is running on.
44+
// Used to determine if an endpoint is local.
45+
nodeName string
46+
}
47+
48+
// EndpointChangeHandler is called when endpoints for a service change.
49+
type EndpointChangeHandler interface {
50+
// OnEndpointsChange is called when endpoints for a service change.
51+
OnEndpointsChange(serviceNN types.NamespacedName, endpoints []*discovery.EndpointSlice)
52+
}
53+
54+
// EndpointChangeHandlerFunc is a function that implements EndpointChangeHandler.
55+
type EndpointChangeHandlerFunc func(serviceNN types.NamespacedName, endpoints []*discovery.EndpointSlice)
56+
57+
// OnEndpointsChange calls the function.
58+
func (f EndpointChangeHandlerFunc) OnEndpointsChange(serviceNN types.NamespacedName, endpoints []*discovery.EndpointSlice) {
59+
f(serviceNN, endpoints)
60+
}
61+
62+
// NewEndpointSliceConsumer creates a new EndpointSliceConsumer.
63+
func NewEndpointSliceConsumer(nodeName string) *EndpointSliceConsumer {
64+
return &EndpointSliceConsumer{
65+
slicesByService: make(map[types.NamespacedName]map[string]*discovery.EndpointSlice),
66+
nodeName: nodeName,
67+
}
68+
}
69+
70+
// AddEventHandler adds a handler for endpoint changes.
71+
func (c *EndpointSliceConsumer) AddEventHandler(handler EndpointChangeHandler) {
72+
c.lock.Lock()
73+
defer c.lock.Unlock()
74+
c.handlers = append(c.handlers, handler)
75+
}
76+
77+
// OnEndpointSliceAdd is called when an EndpointSlice is added.
78+
func (c *EndpointSliceConsumer) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
79+
serviceNN, sliceName, err := endpointSliceCacheKeys(endpointSlice)
80+
if err != nil {
81+
klog.ErrorS(err, "Error getting endpoint slice cache keys")
82+
return
83+
}
84+
85+
c.lock.Lock()
86+
defer c.lock.Unlock()
87+
88+
slices, ok := c.slicesByService[serviceNN]
89+
if !ok {
90+
slices = make(map[string]*discovery.EndpointSlice)
91+
c.slicesByService[serviceNN] = slices
92+
}
93+
94+
// Check if this is a new slice or an update to an existing one
95+
existingSlice, exists := slices[sliceName]
96+
slices[sliceName] = endpointSlice.DeepCopy()
97+
98+
// Only notify handlers if this is a new slice or the endpoints have changed
99+
if !exists || !reflect.DeepEqual(existingSlice.Endpoints, endpointSlice.Endpoints) {
100+
c.notifyHandlersLocked(serviceNN)
101+
}
102+
}
103+
104+
// OnEndpointSliceUpdate is called when an EndpointSlice is updated.
105+
func (c *EndpointSliceConsumer) OnEndpointSliceUpdate(_, newEndpointSlice *discovery.EndpointSlice) {
106+
c.OnEndpointSliceAdd(newEndpointSlice)
107+
}
108+
109+
// OnEndpointSliceDelete is called when an EndpointSlice is deleted.
110+
func (c *EndpointSliceConsumer) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
111+
serviceNN, sliceName, err := endpointSliceCacheKeys(endpointSlice)
112+
if err != nil {
113+
klog.ErrorS(err, "Error getting endpoint slice cache keys")
114+
return
115+
}
116+
117+
c.lock.Lock()
118+
defer c.lock.Unlock()
119+
120+
slices, ok := c.slicesByService[serviceNN]
121+
if !ok {
122+
return
123+
}
124+
125+
if _, exists := slices[sliceName]; exists {
126+
delete(slices, sliceName)
127+
if len(slices) == 0 {
128+
delete(c.slicesByService, serviceNN)
129+
}
130+
c.notifyHandlersLocked(serviceNN)
131+
}
132+
}
133+
134+
// GetEndpointSlices returns all EndpointSlices for a service.
135+
func (c *EndpointSliceConsumer) GetEndpointSlices(serviceNN types.NamespacedName) []*discovery.EndpointSlice {
136+
c.lock.RLock()
137+
defer c.lock.RUnlock()
138+
139+
slices, ok := c.slicesByService[serviceNN]
140+
if !ok {
141+
return nil
142+
}
143+
144+
result := make([]*discovery.EndpointSlice, 0, len(slices))
145+
for _, slice := range slices {
146+
result = append(result, slice.DeepCopy())
147+
}
148+
149+
// Sort slices by name for consistent results
150+
sort.Slice(result, func(i, j int) bool {
151+
return result[i].Name < result[j].Name
152+
})
153+
154+
return result
155+
}
156+
157+
// GetEndpoints returns all endpoints for a service, merging and deduplicating
158+
// endpoints from all EndpointSlices for the service.
159+
func (c *EndpointSliceConsumer) GetEndpoints(serviceNN types.NamespacedName) []discovery.Endpoint {
160+
slices := c.GetEndpointSlices(serviceNN)
161+
if len(slices) == 0 {
162+
return nil
163+
}
164+
165+
// Use a map to deduplicate endpoints by address
166+
endpointMap := make(map[string]discovery.Endpoint)
167+
168+
for _, slice := range slices {
169+
for _, endpoint := range slice.Endpoints {
170+
if len(endpoint.Addresses) == 0 {
171+
continue
172+
}
173+
174+
// Use the first address as the key for deduplication
175+
key := endpoint.Addresses[0]
176+
177+
// If we already have this endpoint, only replace it if the existing one
178+
// is not local but the new one is
179+
existingEp, exists := endpointMap[key]
180+
isLocal := endpoint.NodeName != nil && *endpoint.NodeName == c.nodeName
181+
existingIsLocal := exists && existingEp.NodeName != nil && *existingEp.NodeName == c.nodeName
182+
183+
if !exists || (isLocal && !existingIsLocal) {
184+
endpointMap[key] = *endpoint.DeepCopy()
185+
}
186+
}
187+
}
188+
189+
// Convert map to slice
190+
result := make([]discovery.Endpoint, 0, len(endpointMap))
191+
for _, endpoint := range endpointMap {
192+
result = append(result, endpoint)
193+
}
194+
195+
// Sort endpoints by address for consistent results
196+
sort.Slice(result, func(i, j int) bool {
197+
if len(result[i].Addresses) == 0 || len(result[j].Addresses) == 0 {
198+
return len(result[i].Addresses) > len(result[j].Addresses)
199+
}
200+
return result[i].Addresses[0] < result[j].Addresses[0]
201+
})
202+
203+
return result
204+
}
205+
206+
// notifyHandlersLocked notifies all handlers of an endpoint change.
207+
// The caller must hold the lock.
208+
func (c *EndpointSliceConsumer) notifyHandlersLocked(serviceNN types.NamespacedName) {
209+
if len(c.handlers) == 0 {
210+
return
211+
}
212+
213+
slices, ok := c.slicesByService[serviceNN]
214+
if !ok {
215+
// Service has been deleted, notify with empty slice
216+
for _, handler := range c.handlers {
217+
handler.OnEndpointsChange(serviceNN, nil)
218+
}
219+
return
220+
}
221+
222+
// Convert map to slice
223+
sliceList := make([]*discovery.EndpointSlice, 0, len(slices))
224+
for _, slice := range slices {
225+
sliceList = append(sliceList, slice.DeepCopy())
226+
}
227+
228+
// Sort slices by name for consistent results
229+
sort.Slice(sliceList, func(i, j int) bool {
230+
return sliceList[i].Name < sliceList[j].Name
231+
})
232+
233+
// Notify handlers
234+
for _, handler := range c.handlers {
235+
handler.OnEndpointsChange(serviceNN, sliceList)
236+
}
237+
}
238+
239+
// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice.
240+
func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) {
241+
var err error
242+
serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
243+
if !ok || serviceName == "" {
244+
err = fmt.Errorf("no %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name)
245+
} else if endpointSlice.Namespace == "" || endpointSlice.Name == "" {
246+
err = fmt.Errorf("expected EndpointSlice name and namespace to be set: %v", endpointSlice)
247+
}
248+
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
249+
}

0 commit comments

Comments
 (0)