-
Notifications
You must be signed in to change notification settings - Fork 92
refresh metrics per pod periodically #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
91fe439
2301c0e
71ec51f
ea944a4
e048b6f
1798358
b68029a
0359982
f994d7f
23e4c17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package backend | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
klog "k8s.io/klog/v2" | ||
) | ||
|
||
const ( | ||
// TODO: make it configurable | ||
fetchMetricsTimeout = 1 * time.Second | ||
) | ||
|
||
type PodMetricsRefresher struct { | ||
done chan struct{} | ||
interval time.Duration | ||
|
||
// refresher holds provider & pod so it can update the metrics concurrent-safely. | ||
pod Pod | ||
provider *Provider | ||
ahg-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func NewPodMetricsRefresher(provider *Provider, pod Pod, interval time.Duration) *PodMetricsRefresher { | ||
return &PodMetricsRefresher{ | ||
done: make(chan struct{}), | ||
interval: interval, | ||
pod: pod, | ||
provider: provider, | ||
} | ||
} | ||
|
||
func (r *PodMetricsRefresher) start() { | ||
go func() { | ||
klog.V(2).Infof("Starting refresher for pod %v", r.pod) | ||
for { | ||
select { | ||
case <-r.done: | ||
return | ||
default: | ||
} | ||
|
||
err := r.refreshMetrics() | ||
if err != nil { | ||
klog.Errorf("Failed to refresh metrics for pod %s: %v", r.pod, err) | ||
} | ||
|
||
time.Sleep(r.interval) | ||
} | ||
}() | ||
} | ||
|
||
func (r *PodMetricsRefresher) refreshMetrics() error { | ||
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) | ||
defer cancel() | ||
|
||
pod := r.pod | ||
existing, found := r.provider.GetPodMetrics(pod) | ||
if !found { | ||
// As refresher is running in the background, it's possible that the pod is deleted but | ||
// the refresh goroutine doesn't read the done channel yet. In this case, we just return nil. | ||
// The refresher will be stopped after this interval. | ||
return nil | ||
} | ||
|
||
klog.V(4).Infof("Processing pod %v and metric %v", pod, existing.Metrics) | ||
updated, err := r.provider.pmc.FetchMetrics(ctx, r.pod, existing) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
r.provider.UpdatePodMetrics(pod, updated) | ||
return nil | ||
} | ||
|
||
func (r *PodMetricsRefresher) stop() { | ||
klog.V(2).Infof("Stopping refresher for pod %v", r.pod) | ||
close(r.done) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,23 +2,25 @@ package backend | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"go.uber.org/multierr" | ||
klog "k8s.io/klog/v2" | ||
) | ||
|
||
const ( | ||
fetchMetricsTimeout = 5 * time.Second | ||
// TODO: make it configurable. One idea is to provide a configuration singleton | ||
// and put fields like refreshMetricsInterval in it. So far, we have to pass these | ||
// fields across several layers. | ||
metricsValidityPeriod = 5 * time.Second | ||
ahg-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { | ||
p := &Provider{ | ||
podMetrics: sync.Map{}, | ||
pmc: pmc, | ||
datastore: datastore, | ||
podMetrics: sync.Map{}, | ||
podMetricsRefresher: sync.Map{}, | ||
pmc: pmc, | ||
datastore: datastore, | ||
} | ||
return p | ||
} | ||
|
@@ -27,60 +29,73 @@ func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { | |
type Provider struct { | ||
// key: Pod, value: *PodMetrics | ||
podMetrics sync.Map | ||
pmc PodMetricsClient | ||
datastore *K8sDatastore | ||
// key: Pod, value: *PodMetricsRefresher | ||
podMetricsRefresher sync.Map | ||
pmc PodMetricsClient | ||
datastore *K8sDatastore | ||
} | ||
|
||
type PodMetricsClient interface { | ||
// FetchMetrics fetches metrics for the given pod. | ||
// The returned PodMetrics and the existing one should not be the same object. | ||
// Otherwise, there will be race. | ||
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) | ||
} | ||
|
||
func isPodMetricsStale(pm *PodMetrics) bool { | ||
return time.Since(pm.UpdatedTime) > metricsValidityPeriod | ||
} | ||
|
||
func (p *Provider) AllPodMetrics() []*PodMetrics { | ||
ahg-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return p.allPodMetrics(false) | ||
} | ||
|
||
func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics { | ||
return p.allPodMetrics(true) | ||
} | ||
|
||
func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics { | ||
res := []*PodMetrics{} | ||
fn := func(k, v any) bool { | ||
res = append(res, v.(*PodMetrics)) | ||
m := v.(*PodMetrics) | ||
|
||
if !staleIncluded && isPodMetricsStale(m) { | ||
// exclude stale metrics for scheduler | ||
klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) | ||
return true | ||
} | ||
|
||
res = append(res, m) | ||
return true | ||
} | ||
p.podMetrics.Range(fn) | ||
return res | ||
} | ||
|
||
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { | ||
pm.UpdatedTime = time.Now() | ||
p.podMetrics.Store(pod, pm) | ||
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, pm.Metrics) | ||
} | ||
|
||
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { | ||
val, ok := p.podMetrics.Load(pod) | ||
if ok { | ||
// For now, the only caller of GetPodMetrics is the refresher, so we | ||
// don't need to exclude the stale metrics. | ||
return val.(*PodMetrics), true | ||
} | ||
return nil, false | ||
} | ||
|
||
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { | ||
p.refreshPodsOnce() | ||
|
||
if err := p.refreshMetricsOnce(); err != nil { | ||
klog.Errorf("Failed to init metrics: %v", err) | ||
} | ||
|
||
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) | ||
p.refreshPodsOnce(refreshMetricsInterval) | ||
ahg-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// periodically refresh pods | ||
go func() { | ||
for { | ||
time.Sleep(refreshPodsInterval) | ||
p.refreshPodsOnce() | ||
} | ||
}() | ||
|
||
// periodically refresh metrics | ||
go func() { | ||
for { | ||
time.Sleep(refreshMetricsInterval) | ||
if err := p.refreshMetricsOnce(); err != nil { | ||
klog.V(4).Infof("Failed to refresh metrics: %v", err) | ||
} | ||
p.refreshPodsOnce(refreshMetricsInterval) | ||
} | ||
}() | ||
|
||
|
@@ -89,7 +104,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio | |
go func() { | ||
for { | ||
time.Sleep(5 * time.Second) | ||
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics()) | ||
podMetrics := p.AllPodMetricsIncludingStale() | ||
stalePodMetrics := make([]*PodMetrics, 0) | ||
freshPodMetrics := make([]*PodMetrics, 0, len(podMetrics)) | ||
spacewander marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, pm := range podMetrics { | ||
if isPodMetricsStale(pm) { | ||
stalePodMetrics = append(stalePodMetrics, pm) | ||
} else { | ||
freshPodMetrics = append(freshPodMetrics, pm) | ||
} | ||
} | ||
klog.Infof("===DEBUG: Current Pods and metrics: %+v", freshPodMetrics) | ||
klog.Infof("===DEBUG: Stale Pods and metrics: %+v", stalePodMetrics) | ||
} | ||
}() | ||
} | ||
|
@@ -99,7 +125,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio | |
|
||
// refreshPodsOnce lists pods and updates keys in the podMetrics map. | ||
// Note this function doesn't update the PodMetrics value, it's done separately. | ||
func (p *Provider) refreshPodsOnce() { | ||
func (p *Provider) refreshPodsOnce(refreshMetricsInterval time.Duration) { | ||
// merge new pods with cached ones. | ||
// add new pod to the map | ||
addNewPods := func(k, v any) bool { | ||
|
@@ -110,8 +136,13 @@ func (p *Provider) refreshPodsOnce() { | |
Metrics: Metrics{ | ||
ActiveModels: make(map[string]int), | ||
}, | ||
// Metrics are considered stale until they are first refreshed. | ||
} | ||
p.podMetrics.Store(pod, new) | ||
|
||
refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval) | ||
refresher.start() | ||
p.podMetricsRefresher.Store(pod, refresher) | ||
} | ||
return true | ||
} | ||
|
@@ -120,56 +151,13 @@ func (p *Provider) refreshPodsOnce() { | |
pod := k.(Pod) | ||
if _, ok := p.datastore.pods.Load(pod); !ok { | ||
p.podMetrics.Delete(pod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we remove this, given the LoadAndDelete below? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The map loads the pod is different from the map deletes the pod. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused, in line 161 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching up! Sorry for misunderstanding you at the beginning. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need another map in the first place? can't we put the PodMetricsRefresher in the PodMetrics struct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ahg-g There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we should be able to do that, I sent out #350 which does a major refactor to the datastore/provider layer. It consolidating storage in one place under datastore, and so it does in-place updates to the metrics. Please take a look and advice is there are any concurrency issues that I may have missed. |
||
if v, ok := p.podMetrics.LoadAndDelete(pod); ok { | ||
refresher := v.(*PodMetricsRefresher) | ||
refresher.stop() | ||
} | ||
} | ||
return true | ||
} | ||
p.podMetrics.Range(mergeFn) | ||
p.datastore.pods.Range(addNewPods) | ||
} | ||
|
||
func (p *Provider) refreshMetricsOnce() error { | ||
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) | ||
defer cancel() | ||
start := time.Now() | ||
defer func() { | ||
d := time.Since(start) | ||
// TODO: add a metric instead of logging | ||
klog.V(4).Infof("Refreshed metrics in %v", d) | ||
}() | ||
var wg sync.WaitGroup | ||
errCh := make(chan error) | ||
processOnePod := func(key, value any) bool { | ||
klog.V(4).Infof("Processing pod %v and metric %v", key, value) | ||
pod := key.(Pod) | ||
existing := value.(*PodMetrics) | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
updated, err := p.pmc.FetchMetrics(ctx, pod, existing) | ||
if err != nil { | ||
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err) | ||
return | ||
} | ||
p.UpdatePodMetrics(pod, updated) | ||
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) | ||
}() | ||
return true | ||
} | ||
p.podMetrics.Range(processOnePod) | ||
|
||
// Wait for metric collection for all pods to complete and close the error channel in a | ||
// goroutine so this is unblocking, allowing the code to proceed to the error collection code | ||
// below. | ||
// Note we couldn't use a buffered error channel with a size because the size of the podMetrics | ||
// sync.Map is unknown beforehand. | ||
go func() { | ||
wg.Wait() | ||
close(errCh) | ||
}() | ||
|
||
var errs error | ||
for err := range errCh { | ||
errs = multierr.Append(errs, err) | ||
} | ||
return errs | ||
} |
Uh oh!
There was an error while loading. Please reload this page.