Skip to content

Commit 977fadd

Browse files
committed
Introduce CSI Metrics Library
1 parent d9213e0 commit 977fadd

File tree

7 files changed

+582
-40
lines changed

7 files changed

+582
-40
lines changed

connection/connection.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"time"
2626

27+
"github.com/kubernetes-csi/csi-lib-utils/metrics"
2728
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
2829
"google.golang.org/grpc"
2930
"k8s.io/klog"
@@ -58,8 +59,8 @@ const terminationLogPath = "/dev/termination-log"
5859
//
5960
// For other connections, the default behavior from gRPC is used and
6061
// loss of connection is not detected reliably.
61-
func Connect(address string, options ...Option) (*grpc.ClientConn, error) {
62-
return connect(address, []grpc.DialOption{}, options)
62+
func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
63+
return connect(address, metricsManager, []grpc.DialOption{}, options)
6364
}
6465

6566
// Option is the type of all optional parameters for Connect.
@@ -93,7 +94,10 @@ type options struct {
9394
}
9495

9596
// connect is the internal implementation of Connect. It has more options to enable testing.
96-
func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
97+
func connect(
98+
address string,
99+
metricsManager metrics.CSIMetricsManager,
100+
dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
97101
var o options
98102
for _, option := range connectOptions {
99103
option(&o)
@@ -103,7 +107,10 @@ func connect(address string, dialOptions []grpc.DialOption, connectOptions []Opt
103107
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
104108
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
105109
grpc.WithBlock(), // Block until connection succeeds.
106-
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
110+
grpc.WithChainUnaryInterceptor(
111+
LogGRPC, // Log all messages.
112+
extendedCSIMetricsManager{metricsManager}.recordMetricsInterceptor, // Record metrics for each gRPC call.
113+
),
107114
)
108115
unixPrefix := "unix://"
109116
if strings.HasPrefix(address, "/") {
@@ -179,3 +186,26 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
179186
klog.V(5).Infof("GRPC error: %v", err)
180187
return err
181188
}
189+
190+
type extendedCSIMetricsManager struct {
191+
metrics.CSIMetricsManager
192+
}
193+
194+
// recordMetricsInterceptor is a gPRC unary interceptor for recording metrics for CSI operations.
195+
func (cmm extendedCSIMetricsManager) recordMetricsInterceptor(
196+
ctx context.Context,
197+
method string,
198+
req, reply interface{},
199+
cc *grpc.ClientConn,
200+
invoker grpc.UnaryInvoker,
201+
opts ...grpc.CallOption) error {
202+
start := time.Now()
203+
err := invoker(ctx, method, req, reply, cc, opts...)
204+
duration := time.Since(start)
205+
cmm.RecordMetrics(
206+
method, /* operationName */
207+
err, /* operationErr */
208+
duration, /* operationDuration */
209+
)
210+
return err
211+
}

connection/connection_test.go

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ limitations under the License.
1717
package connection
1818

1919
import (
20+
"bufio"
2021
"context"
2122
"io/ioutil"
2223
"net"
2324
"os"
2425
"path"
26+
"strings"
2527
"sync"
2628
"testing"
2729
"time"
@@ -35,6 +37,9 @@ import (
3537
"github.com/stretchr/testify/require"
3638

3739
"github.com/container-storage-interface/spec/lib/go/csi"
40+
41+
"github.com/kubernetes-csi/csi-lib-utils/metrics"
42+
"k8s.io/component-base/metrics/testutil"
3843
)
3944

4045
func tmpDir(t *testing.T) string {
@@ -84,7 +89,7 @@ func TestConnect(t *testing.T) {
8489
addr, stopServer := startServer(t, tmp, nil, nil)
8590
defer stopServer()
8691

87-
conn, err := Connect(addr)
92+
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
8893
if assert.NoError(t, err, "connect via absolute path") &&
8994
assert.NotNil(t, conn, "got a connection") {
9095
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
@@ -99,7 +104,7 @@ func TestConnectUnix(t *testing.T) {
99104
addr, stopServer := startServer(t, tmp, nil, nil)
100105
defer stopServer()
101106

102-
conn, err := Connect("unix:///" + addr)
107+
conn, err := Connect("unix:///"+addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
103108
if assert.NoError(t, err, "connect with unix:/// prefix") &&
104109
assert.NotNil(t, conn, "got a connection") {
105110
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
@@ -139,7 +144,7 @@ func TestWaitForServer(t *testing.T) {
139144
startTimeServer = time.Now()
140145
_, stopServer = startServer(t, tmp, nil, nil)
141146
}()
142-
conn, err := Connect(path.Join(tmp, serverSock))
147+
conn, err := Connect(path.Join(tmp, serverSock), metrics.NewCSIMetricsManager("fake.csi.driver.io"))
143148
if assert.NoError(t, err, "connect via absolute path") {
144149
endTime := time.Now()
145150
assert.NotNil(t, conn, "got a connection")
@@ -158,7 +163,7 @@ func TestTimout(t *testing.T) {
158163

159164
startTime := time.Now()
160165
timeout := 5 * time.Second
161-
conn, err := connect(path.Join(tmp, "no-such.sock"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil)
166+
conn, err := connect(path.Join(tmp, "no-such.sock"), metrics.NewCSIMetricsManager("fake.csi.driver.io"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil)
162167
endTime := time.Now()
163168
if assert.Error(t, err, "connection should fail") {
164169
assert.InEpsilon(t, timeout, endTime.Sub(startTime), 1, "connection timeout")
@@ -177,7 +182,7 @@ func TestReconnect(t *testing.T) {
177182
}()
178183

179184
// Allow reconnection (the default).
180-
conn, err := Connect(addr)
185+
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
181186
if assert.NoError(t, err, "connect via absolute path") &&
182187
assert.NotNil(t, conn, "got a connection") {
183188
defer conn.Close()
@@ -222,7 +227,7 @@ func TestDisconnect(t *testing.T) {
222227
}()
223228

224229
reconnectCount := 0
225-
conn, err := Connect(addr, OnConnectionLoss(func() bool {
230+
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"), OnConnectionLoss(func() bool {
226231
reconnectCount++
227232
// Don't reconnect.
228233
return false
@@ -273,7 +278,7 @@ func TestExplicitReconnect(t *testing.T) {
273278
}()
274279

275280
reconnectCount := 0
276-
conn, err := Connect(addr, OnConnectionLoss(func() bool {
281+
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"), OnConnectionLoss(func() bool {
277282
reconnectCount++
278283
// Reconnect.
279284
return true
@@ -314,3 +319,89 @@ func TestExplicitReconnect(t *testing.T) {
314319
assert.Equal(t, 1, reconnectCount, "connection loss callback should be called once")
315320
}
316321
}
322+
323+
func TestConnectMetrics(t *testing.T) {
324+
tmp := tmpDir(t)
325+
defer os.RemoveAll(tmp)
326+
addr, stopServer := startServer(t, tmp, nil, nil)
327+
defer stopServer()
328+
329+
cmm := metrics.NewCSIMetricsManager("fake.csi.driver.io")
330+
conn, err := Connect(addr, cmm)
331+
if assert.NoError(t, err, "connect via absolute path") &&
332+
assert.NotNil(t, conn, "got a connection") {
333+
defer conn.Close()
334+
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
335+
336+
if err := conn.Invoke(context.Background(), "/csi.v1.Controller/ControllerGetCapabilities", nil, nil); assert.Error(t, err) {
337+
errStatus, _ := status.FromError(err)
338+
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
339+
}
340+
}
341+
342+
expectedMetrics := `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
343+
# TYPE csi_sidecar_operations_seconds histogram
344+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.1"} 1
345+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.25"} 1
346+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.5"} 1
347+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="1"} 1
348+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="2.5"} 1
349+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="5"} 1
350+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="10"} 1
351+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="15"} 1
352+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="25"} 1
353+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="50"} 1
354+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="120"} 1
355+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="300"} 1
356+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="600"} 1
357+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="+Inf"} 1
358+
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 0
359+
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 1
360+
`
361+
362+
if err := testutil.GatherAndCompare(
363+
cmm.GetRegistry(), strings.NewReader(expectedMetrics)); err != nil {
364+
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
365+
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
366+
if err != nil {
367+
t.Errorf("Expected metrics not found -- %v", err)
368+
}
369+
}
370+
}
371+
372+
func verifyMetricsError(t *testing.T, err error, metricToIgnore string) error {
373+
errStringArr := strings.Split(err.Error(), "got:")
374+
375+
if len(errStringArr) != 2 {
376+
return err
377+
}
378+
379+
want := errStringArr[0]
380+
got := strings.TrimSpace(errStringArr[1])
381+
382+
if want == "" || got == "" {
383+
return err
384+
}
385+
386+
wantArr := strings.Split(err.Error(), "want:")
387+
if len(wantArr) != 2 {
388+
return err
389+
}
390+
391+
want = strings.TrimSpace(wantArr[1])
392+
393+
//fmt.Printf("_want_ %q\r\n_got_ %q", want, got)
394+
gotScanner := bufio.NewScanner(strings.NewReader(got))
395+
wantScanner := bufio.NewScanner(strings.NewReader(want))
396+
for gotScanner.Scan() {
397+
wantScanner.Scan()
398+
wantLine := wantScanner.Text()
399+
gotLine := gotScanner.Text()
400+
if wantLine != gotLine && !strings.HasPrefix(gotLine, metricToIgnore) {
401+
t.Errorf("\r\nMetric Want: %q\r\nMetric Got: %q\r\n", wantLine, gotLine)
402+
return err
403+
}
404+
}
405+
406+
return nil
407+
}

go.mod

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,17 @@ go 1.12
44

55
require (
66
github.com/container-storage-interface/spec v1.1.0
7-
github.com/davecgh/go-spew v1.1.1 // indirect
87
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
9-
github.com/gogo/protobuf v1.2.1 // indirect
108
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
11-
github.com/golang/protobuf v1.3.1
12-
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
9+
github.com/golang/protobuf v1.3.2
1310
github.com/googleapis/gnostic v0.2.0 // indirect
14-
github.com/json-iterator/go v1.1.6 // indirect
15-
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
16-
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
1711
github.com/onsi/ginkgo v1.10.2 // indirect
18-
github.com/onsi/gomega v1.7.0 // indirect
1912
github.com/pkg/errors v0.8.1 // indirect
20-
github.com/spf13/pflag v1.0.5 // indirect
21-
github.com/stretchr/testify v1.3.0
22-
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect
23-
golang.org/x/net v0.0.0-20190328230028-74de082e2cca
24-
golang.org/x/oauth2 v0.0.0-20190319182350-c85d3e98c914 // indirect
25-
golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65 // indirect
26-
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
27-
google.golang.org/appengine v1.5.0 // indirect
28-
google.golang.org/genproto v0.0.0-20190327125643-d831d65fe17d // indirect
13+
github.com/stretchr/testify v1.4.0
14+
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9
2915
google.golang.org/grpc v1.19.1
30-
gopkg.in/inf.v0 v0.9.1 // indirect
31-
gopkg.in/yaml.v2 v2.2.2 // indirect
32-
k8s.io/api v0.0.0-20190313235455-40a48860b5ab
33-
k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1 // indirect
16+
k8s.io/api v0.17.0
3417
k8s.io/client-go v11.0.0+incompatible
35-
k8s.io/klog v0.2.0
36-
k8s.io/kube-openapi v0.0.0-20190320154901-5e45bb682580 // indirect
37-
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 // indirect
38-
sigs.k8s.io/yaml v1.1.0 // indirect
18+
k8s.io/component-base v0.17.0
19+
k8s.io/klog v1.0.0
3920
)

0 commit comments

Comments
 (0)