-
Notifications
You must be signed in to change notification settings - Fork 92
refresh metrics per pod periodically #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
91fe439
2301c0e
71ec51f
ea944a4
e048b6f
1798358
b68029a
0359982
f994d7f
23e4c17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package backend | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
klog "k8s.io/klog/v2" | ||
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" | ||
) | ||
|
||
type PodMetricsRefresher struct { | ||
done chan struct{} | ||
interval time.Duration | ||
timeout time.Duration | ||
|
||
// refresher holds provider & pod so it can update the metrics concurrent-safely. | ||
pod Pod | ||
provider *Provider | ||
} | ||
|
||
func NewPodMetricsRefresher(provider *Provider, pod Pod, interval, timeout time.Duration) *PodMetricsRefresher { | ||
return &PodMetricsRefresher{ | ||
done: make(chan struct{}), | ||
interval: interval, | ||
timeout: timeout, | ||
pod: pod, | ||
provider: provider, | ||
} | ||
} | ||
|
||
func (r *PodMetricsRefresher) start() { | ||
go func() { | ||
klog.V(logutil.DEFAULT).InfoS("Starting refresher", "pod", r.pod) | ||
for { | ||
select { | ||
case <-r.done: | ||
return | ||
default: | ||
} | ||
|
||
err := r.refreshMetrics() | ||
if err != nil { | ||
klog.ErrorS(err, "Failed to refresh metrics", "pod", r.pod) | ||
} | ||
|
||
time.Sleep(r.interval) | ||
} | ||
}() | ||
} | ||
|
||
func (r *PodMetricsRefresher) refreshMetrics() error { | ||
ctx, cancel := context.WithTimeout(context.Background(), r.timeout) | ||
defer cancel() | ||
|
||
pod := r.pod | ||
existing, found := r.provider.GetPodMetrics(pod) | ||
if !found { | ||
// As refresher is running in the background, it's possible that the pod is deleted but | ||
// the refresh goroutine doesn't read the done channel yet. In this case, we just return nil. | ||
// The refresher will be stopped after this interval. | ||
return nil | ||
} | ||
|
||
klog.V(logutil.DEBUG).InfoS("Refresh metrics", "pod", pod, "metrics", existing.Metrics) | ||
updated, err := r.provider.pmc.FetchMetrics(ctx, r.pod, existing) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
r.provider.UpdatePodMetrics(pod, updated) | ||
return nil | ||
} | ||
|
||
func (r *PodMetricsRefresher) stop() { | ||
klog.V(logutil.DEFAULT).InfoS("Stopping refresher", "pod", r.pod) | ||
close(r.done) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,25 +2,25 @@ package backend | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"go.uber.org/multierr" | ||
klog "k8s.io/klog/v2" | ||
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" | ||
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" | ||
) | ||
|
||
const ( | ||
fetchMetricsTimeout = 5 * time.Second | ||
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336 | ||
metricsValidityPeriod = 5 * time.Second | ||
ahg-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { | ||
p := &Provider{ | ||
podMetrics: sync.Map{}, | ||
pmc: pmc, | ||
datastore: datastore, | ||
podMetrics: sync.Map{}, | ||
podMetricsRefresher: sync.Map{}, | ||
pmc: pmc, | ||
datastore: datastore, | ||
} | ||
return p | ||
} | ||
|
@@ -29,60 +29,78 @@ func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { | |
type Provider struct { | ||
// key: Pod, value: *PodMetrics | ||
podMetrics sync.Map | ||
pmc PodMetricsClient | ||
datastore *K8sDatastore | ||
// key: Pod, value: *PodMetricsRefresher | ||
podMetricsRefresher sync.Map | ||
pmc PodMetricsClient | ||
datastore *K8sDatastore | ||
} | ||
|
||
type PodMetricsClient interface { | ||
// FetchMetrics fetches metrics for the given pod. | ||
// The returned PodMetrics and the existing one should not be the same object. | ||
// Otherwise, there will be race. | ||
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) | ||
} | ||
|
||
func (p *Provider) AllPodMetrics() []*PodMetrics { | ||
func isPodMetricsStale(pm *PodMetrics) bool { | ||
return time.Since(pm.Metrics.UpdatedTime) > metricsValidityPeriod | ||
} | ||
|
||
func (p *Provider) AllFreshPodMetrics() []*PodMetrics { | ||
return p.allPodMetrics(false) | ||
} | ||
|
||
func (p *Provider) AllStalePodMetrics() []*PodMetrics { | ||
return p.allPodMetrics(true) | ||
} | ||
|
||
func (p *Provider) allPodMetrics(stale bool) []*PodMetrics { | ||
res := []*PodMetrics{} | ||
fn := func(k, v any) bool { | ||
res = append(res, v.(*PodMetrics)) | ||
m := v.(*PodMetrics) | ||
|
||
if !stale { | ||
if isPodMetricsStale(m) { | ||
// exclude stale metrics for scheduler | ||
klog.V(logutil.DEBUG).InfoS("Pod metrics is stale, skipping", "pod", m.Pod) | ||
} else { | ||
res = append(res, m) | ||
} | ||
|
||
} else { | ||
if isPodMetricsStale(m) { | ||
res = append(res, m) | ||
} | ||
} | ||
|
||
return true | ||
} | ||
p.podMetrics.Range(fn) | ||
return res | ||
} | ||
|
||
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { | ||
pm.Metrics.UpdatedTime = time.Now() | ||
p.podMetrics.Store(pod, pm) | ||
klog.V(logutil.DEBUG).InfoS("Updated metrics", "pod", pod, "metrics", pm.Metrics) | ||
} | ||
|
||
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { | ||
val, ok := p.podMetrics.Load(pod) | ||
if ok { | ||
// For now, the only caller of GetPodMetrics is the refresher, so we | ||
// don't need to exclude the stale metrics. | ||
return val.(*PodMetrics), true | ||
} | ||
return nil, false | ||
} | ||
|
||
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { | ||
p.refreshPodsOnce() | ||
|
||
if err := p.refreshMetricsOnce(); err != nil { | ||
klog.ErrorS(err, "Failed to init metrics") | ||
} | ||
|
||
klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics()) | ||
|
||
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout, refreshPrometheusMetricsInterval time.Duration) error { | ||
// periodically refresh pods | ||
go func() { | ||
for { | ||
p.refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout) | ||
time.Sleep(refreshPodsInterval) | ||
p.refreshPodsOnce() | ||
} | ||
}() | ||
|
||
// periodically refresh metrics | ||
go func() { | ||
for { | ||
time.Sleep(refreshMetricsInterval) | ||
if err := p.refreshMetricsOnce(); err != nil { | ||
klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics") | ||
} | ||
} | ||
}() | ||
|
||
|
@@ -99,7 +117,8 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm | |
go func() { | ||
for { | ||
time.Sleep(5 * time.Second) | ||
klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics()) | ||
klogV.InfoS("Current Pods and metrics gathered", "fresh metrics", p.AllFreshPodMetrics(), | ||
"stale metrics", p.AllStalePodMetrics()) | ||
} | ||
}() | ||
} | ||
|
@@ -109,7 +128,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm | |
|
||
// refreshPodsOnce lists pods and updates keys in the podMetrics map. | ||
// Note this function doesn't update the PodMetrics value, it's done separately. | ||
func (p *Provider) refreshPodsOnce() { | ||
func (p *Provider) refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout time.Duration) { | ||
// merge new pods with cached ones. | ||
// add new pod to the map | ||
addNewPods := func(k, v any) bool { | ||
|
@@ -120,8 +139,13 @@ func (p *Provider) refreshPodsOnce() { | |
Metrics: Metrics{ | ||
ActiveModels: make(map[string]int), | ||
}, | ||
// Metrics are considered stale until they are first refreshed. | ||
} | ||
p.podMetrics.Store(pod, new) | ||
|
||
refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval, refreshMetricsTimeout) | ||
refresher.start() | ||
p.podMetricsRefresher.Store(pod, refresher) | ||
} | ||
return true | ||
} | ||
|
@@ -130,61 +154,17 @@ func (p *Provider) refreshPodsOnce() { | |
pod := k.(Pod) | ||
if _, ok := p.datastore.pods.Load(pod); !ok { | ||
p.podMetrics.Delete(pod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we remove this, given the LoadAndDelete below? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The map loads the pod is different from the map deletes the pod. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused, in line 161 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching up! Sorry for misunderstanding you at the beginning. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need another map in the first place? can't we put the PodMetricsRefresher in the PodMetrics struct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ahg-g There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we should be able to do that, I sent out #350 which does a major refactor to the datastore/provider layer. It consolidating storage in one place under datastore, and so it does in-place updates to the metrics. Please take a look and advice is there are any concurrency issues that I may have missed. |
||
if v, ok := p.podMetricsRefresher.LoadAndDelete(pod); ok { | ||
refresher := v.(*PodMetricsRefresher) | ||
refresher.stop() | ||
} | ||
} | ||
return true | ||
} | ||
p.podMetrics.Range(mergeFn) | ||
p.datastore.pods.Range(addNewPods) | ||
} | ||
|
||
func (p *Provider) refreshMetricsOnce() error { | ||
klogV := klog.V(logutil.TRACE) | ||
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) | ||
defer cancel() | ||
start := time.Now() | ||
defer func() { | ||
d := time.Since(start) | ||
// TODO: add a metric instead of logging | ||
klogV.InfoS("Metrics refreshed", "duration", d) | ||
}() | ||
var wg sync.WaitGroup | ||
errCh := make(chan error) | ||
processOnePod := func(key, value any) bool { | ||
klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value) | ||
pod := key.(Pod) | ||
existing := value.(*PodMetrics) | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
updated, err := p.pmc.FetchMetrics(ctx, pod, existing) | ||
if err != nil { | ||
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err) | ||
return | ||
} | ||
p.UpdatePodMetrics(pod, updated) | ||
klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics) | ||
}() | ||
return true | ||
} | ||
p.podMetrics.Range(processOnePod) | ||
|
||
// Wait for metric collection for all pods to complete and close the error channel in a | ||
// goroutine so this is unblocking, allowing the code to proceed to the error collection code | ||
// below. | ||
// Note we couldn't use a buffered error channel with a size because the size of the podMetrics | ||
// sync.Map is unknown beforehand. | ||
go func() { | ||
wg.Wait() | ||
close(errCh) | ||
}() | ||
|
||
var errs error | ||
for err := range errCh { | ||
errs = multierr.Append(errs, err) | ||
} | ||
return errs | ||
} | ||
|
||
func (p *Provider) flushPrometheusMetricsOnce() { | ||
klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics") | ||
|
||
|
@@ -197,7 +177,7 @@ func (p *Provider) flushPrometheusMetricsOnce() { | |
var kvCacheTotal float64 | ||
var queueTotal int | ||
|
||
podMetrics := p.AllPodMetrics() | ||
podMetrics := append(p.AllFreshPodMetrics(), p.AllStalePodMetrics()...) | ||
if len(podMetrics) == 0 { | ||
return | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.