Skip to content

Commit 5cfcdeb

Browse files
authored
[receiver/k8s_cluster] Do not store unused data in the k8s API cache (open-telemetry#23417)
This change removes unused k8s informer data from the cache to reduce RAM utilization. Tried it on a cluster with 40 nodes and 1000 pods, and it gave up to 30% reduction in RAM usage.
1 parent 260f34f commit 5cfcdeb

13 files changed

+589
-26
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: 'enhancement'
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: receiver/k8s_cluster
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: Do not store unused data in the k8s API cache to reduce RAM usage
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [23417]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"
5+
6+
import (
7+
appsv1 "k8s.io/api/apps/v1"
8+
batchv1 "k8s.io/api/batch/v1"
9+
corev1 "k8s.io/api/core/v1"
10+
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
15+
)
16+
17+
// transformObject transforms the k8s object by removing the data that is not utilized by the receiver.
18+
// Only highly utilized objects are transformed here while others are kept as is.
19+
func transformObject(object interface{}) (interface{}, error) {
20+
switch o := object.(type) {
21+
case *corev1.Pod:
22+
return pod.Transform(o), nil
23+
case *corev1.Node:
24+
return node.Transform(o), nil
25+
case *appsv1.ReplicaSet:
26+
return replicaset.Transform(o), nil
27+
case *batchv1.Job:
28+
return jobs.Transform(o), nil
29+
}
30+
return object, nil
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package k8sclusterreceiver
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
corev1 "k8s.io/api/core/v1"
11+
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
13+
)
14+
15+
func TestTransformObject(t *testing.T) {
16+
i := 1
17+
intPtr := &i
18+
tests := []struct {
19+
name string
20+
object interface{}
21+
want interface{}
22+
same bool
23+
}{
24+
{
25+
name: "pod",
26+
object: testutils.NewPodWithContainer(
27+
"1",
28+
testutils.NewPodSpecWithContainer("container-name"),
29+
testutils.NewPodStatusWithContainer("container-name", "container-id"),
30+
),
31+
want: func() *corev1.Pod {
32+
pod := testutils.NewPodWithContainer(
33+
"1",
34+
testutils.NewPodSpecWithContainer("container-name"),
35+
testutils.NewPodStatusWithContainer("container-name", "container-id"),
36+
)
37+
pod.Spec.Containers[0].Image = ""
38+
pod.Status.ContainerStatuses[0].State = corev1.ContainerState{}
39+
return pod
40+
}(),
41+
same: false,
42+
},
43+
{
44+
name: "node",
45+
object: testutils.NewNode("1"),
46+
want: testutils.NewNode("1"),
47+
same: false,
48+
},
49+
{
50+
name: "replicaset",
51+
object: testutils.NewReplicaSet("1"),
52+
want: testutils.NewReplicaSet("1"),
53+
same: false,
54+
},
55+
{
56+
name: "job",
57+
object: testutils.NewJob("1"),
58+
want: testutils.NewJob("1"),
59+
same: false,
60+
},
61+
{
62+
// This is a case where we don't transform the object.
63+
name: "hpa",
64+
object: testutils.NewHPA("1"),
65+
want: testutils.NewHPA("1"),
66+
same: true,
67+
},
68+
{
69+
name: "invalid_type",
70+
object: intPtr,
71+
want: intPtr,
72+
same: true,
73+
},
74+
}
75+
for _, tt := range tests {
76+
t.Run(tt.name, func(t *testing.T) {
77+
got, err := transformObject(tt.object)
78+
assert.NoError(t, err)
79+
assert.Equal(t, tt.want, got)
80+
if tt.same {
81+
assert.Same(t, tt.object, got)
82+
} else {
83+
assert.NotSame(t, tt.object, got)
84+
}
85+
})
86+
}
87+
}

receiver/k8sclusterreceiver/internal/collection/collector_test.go

-16
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,6 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
6262
},
6363
},
6464
},
65-
{
66-
name: "Empty container id skips container resource",
67-
metadataStore: &metadata.Store{},
68-
resource: testutils.NewPodWithContainer(
69-
"0",
70-
testutils.NewPodSpecWithContainer("container-name"),
71-
testutils.NewPodStatusWithContainer("container-name", ""),
72-
),
73-
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
74-
experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
75-
ResourceIDKey: "k8s.pod.uid",
76-
ResourceID: "test-pod-0-uid",
77-
Metadata: commonPodMetadata,
78-
},
79-
},
80-
},
8165
{
8266
name: "Pod with Owner Reference",
8367
metadataStore: &metadata.Store{},

receiver/k8sclusterreceiver/internal/jobs/jobs.go

+23
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
1010
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
1111
batchv1 "k8s.io/api/batch/v1"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213

1314
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
1415
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
@@ -51,6 +52,28 @@ var podsSuccessfulMetric = &metricspb.MetricDescriptor{
5152
Type: metricspb.MetricDescriptor_GAUGE_INT64,
5253
}
5354

55+
// Transform transforms the job to remove the fields that we don't use to reduce RAM utilization.
56+
// IMPORTANT: Make sure to update this function when using a new job fields.
57+
func Transform(job *batchv1.Job) *batchv1.Job {
58+
return &batchv1.Job{
59+
ObjectMeta: metav1.ObjectMeta{
60+
Name: job.ObjectMeta.Name,
61+
Namespace: job.ObjectMeta.Namespace,
62+
UID: job.ObjectMeta.UID,
63+
Labels: job.ObjectMeta.Labels,
64+
},
65+
Spec: batchv1.JobSpec{
66+
Completions: job.Spec.Completions,
67+
Parallelism: job.Spec.Parallelism,
68+
},
69+
Status: batchv1.JobStatus{
70+
Active: job.Status.Active,
71+
Succeeded: job.Status.Succeeded,
72+
Failed: job.Status.Failed,
73+
},
74+
}
75+
}
76+
5477
func GetMetrics(j *batchv1.Job) []*agentmetricspb.ExportMetricsServiceRequest {
5578
metrics := make([]*metricspb.Metric, 0, 5)
5679
metrics = append(metrics, []*metricspb.Metric{

receiver/k8sclusterreceiver/internal/jobs/jobs_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import (
77
"testing"
88

99
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
12+
batchv1 "k8s.io/api/batch/v1"
13+
corev1 "k8s.io/api/core/v1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1115

1216
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
1317
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
@@ -60,3 +64,69 @@ func TestJobMetrics(t *testing.T) {
6064
testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[2], "k8s.job.successful_pods",
6165
metricspb.MetricDescriptor_GAUGE_INT64, 3)
6266
}
67+
68+
func TestTransform(t *testing.T) {
69+
originalJob := &batchv1.Job{
70+
ObjectMeta: metav1.ObjectMeta{
71+
Name: "my-job",
72+
Namespace: "default",
73+
UID: "my-job-uid",
74+
Labels: map[string]string{
75+
"app": "my-app",
76+
},
77+
},
78+
Spec: batchv1.JobSpec{
79+
Completions: func() *int32 { completions := int32(1); return &completions }(),
80+
Parallelism: func() *int32 { parallelism := int32(1); return &parallelism }(),
81+
Template: corev1.PodTemplateSpec{
82+
ObjectMeta: metav1.ObjectMeta{
83+
Labels: map[string]string{
84+
"app": "my-app",
85+
},
86+
},
87+
Spec: corev1.PodSpec{
88+
Containers: []corev1.Container{
89+
{
90+
Name: "my-container",
91+
Image: "busybox",
92+
Command: []string{"echo", "Hello, World!"},
93+
ImagePullPolicy: corev1.PullAlways,
94+
},
95+
},
96+
RestartPolicy: corev1.RestartPolicyNever,
97+
},
98+
},
99+
},
100+
Status: batchv1.JobStatus{
101+
Active: 1,
102+
Succeeded: 2,
103+
Failed: 3,
104+
Conditions: []batchv1.JobCondition{
105+
{
106+
Type: batchv1.JobComplete,
107+
Status: corev1.ConditionTrue,
108+
},
109+
},
110+
},
111+
}
112+
wantJob := &batchv1.Job{
113+
ObjectMeta: metav1.ObjectMeta{
114+
Name: "my-job",
115+
Namespace: "default",
116+
UID: "my-job-uid",
117+
Labels: map[string]string{
118+
"app": "my-app",
119+
},
120+
},
121+
Spec: batchv1.JobSpec{
122+
Completions: func() *int32 { completions := int32(1); return &completions }(),
123+
Parallelism: func() *int32 { parallelism := int32(1); return &parallelism }(),
124+
},
125+
Status: batchv1.JobStatus{
126+
Active: 1,
127+
Succeeded: 2,
128+
Failed: 3,
129+
},
130+
}
131+
assert.Equal(t, wantJob, Transform(originalJob))
132+
}

receiver/k8sclusterreceiver/internal/node/nodes.go

+23
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
1515
"go.uber.org/zap"
1616
corev1 "k8s.io/api/core/v1"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1718

1819
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps"
1920
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
@@ -34,6 +35,28 @@ var allocatableDesciption = map[string]string{
3435
"storage": "How many bytes of storage remaining that the node can allocate to pods",
3536
}
3637

38+
// Transform transforms the node to remove the fields that we don't use to reduce RAM utilization.
39+
// IMPORTANT: Make sure to update this function when using a new node fields.
40+
func Transform(node *corev1.Node) *corev1.Node {
41+
newNode := &corev1.Node{
42+
ObjectMeta: metav1.ObjectMeta{
43+
Name: node.ObjectMeta.Name,
44+
UID: node.ObjectMeta.UID,
45+
Labels: node.ObjectMeta.Labels,
46+
},
47+
Status: corev1.NodeStatus{
48+
Allocatable: node.Status.Allocatable,
49+
},
50+
}
51+
for _, c := range node.Status.Conditions {
52+
newNode.Status.Conditions = append(newNode.Status.Conditions, corev1.NodeCondition{
53+
Type: c.Type,
54+
Status: c.Status,
55+
})
56+
}
57+
return newNode
58+
}
59+
3760
func GetMetrics(node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string, logger *zap.Logger) []*agentmetricspb.ExportMetricsServiceRequest {
3861
metrics := make([]*metricspb.Metric, 0, len(nodeConditionTypesToReport)+len(allocatableTypesToReport))
3962
// Adding 'node condition type' metrics

receiver/k8sclusterreceiver/internal/node/nodes_test.go

+63
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ import (
77
"testing"
88

99
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
1112
"go.uber.org/zap"
1213
corev1 "k8s.io/api/core/v1"
14+
"k8s.io/apimachinery/pkg/api/resource"
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1316

1417
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
1518
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
@@ -140,3 +143,63 @@ func TestNodeConditionValue(t *testing.T) {
140143
})
141144
}
142145
}
146+
147+
func TestTransform(t *testing.T) {
148+
originalNode := &corev1.Node{
149+
ObjectMeta: metav1.ObjectMeta{
150+
Name: "my-node",
151+
UID: "my-node-uid",
152+
Labels: map[string]string{
153+
"node-role": "worker",
154+
},
155+
},
156+
Status: corev1.NodeStatus{
157+
Conditions: []corev1.NodeCondition{
158+
{
159+
Type: corev1.NodeReady,
160+
Status: corev1.ConditionTrue,
161+
},
162+
},
163+
Capacity: corev1.ResourceList{
164+
corev1.ResourceCPU: resource.MustParse("8"),
165+
corev1.ResourceMemory: resource.MustParse("16Gi"),
166+
},
167+
Allocatable: corev1.ResourceList{
168+
corev1.ResourceCPU: resource.MustParse("4"),
169+
corev1.ResourceMemory: resource.MustParse("8Gi"),
170+
},
171+
Addresses: []corev1.NodeAddress{
172+
{
173+
Type: corev1.NodeHostName,
174+
Address: "my-node-hostname",
175+
},
176+
{
177+
Type: corev1.NodeInternalIP,
178+
Address: "192.168.1.100",
179+
},
180+
},
181+
},
182+
}
183+
wantNode := &corev1.Node{
184+
ObjectMeta: metav1.ObjectMeta{
185+
Name: "my-node",
186+
UID: "my-node-uid",
187+
Labels: map[string]string{
188+
"node-role": "worker",
189+
},
190+
},
191+
Status: corev1.NodeStatus{
192+
Conditions: []corev1.NodeCondition{
193+
{
194+
Type: corev1.NodeReady,
195+
Status: corev1.ConditionTrue,
196+
},
197+
},
198+
Allocatable: corev1.ResourceList{
199+
corev1.ResourceCPU: resource.MustParse("4"),
200+
corev1.ResourceMemory: resource.MustParse("8Gi"),
201+
},
202+
},
203+
}
204+
assert.Equal(t, wantNode, Transform(originalNode))
205+
}

0 commit comments

Comments
 (0)