Skip to content

Add Migrated metrics to the connection record metrics client side #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ type ExtendedCSIMetricsManager struct {
metrics.CSIMetricsManager
}

type AdditionalInfo struct {
Migrated string
}
type AdditionalInfoKeyType struct{}

var AdditionalInfoKey AdditionalInfoKeyType

// RecordMetricsClientInterceptor is a gPRC unary interceptor for recording metrics for CSI operations
// in a gRPC client.
func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
Expand All @@ -203,11 +210,35 @@ func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
cmm.RecordMetrics(

var cmmBase metrics.CSIMetricsManager
cmmBase = cmm
if cmm.HaveAdditionalLabel(metrics.LabelMigrated) {
// record migration status
additionalInfo := ctx.Value(AdditionalInfoKey)
migrated := "false"
if additionalInfo != nil {
additionalInfoVal, ok := additionalInfo.(AdditionalInfo)
if !ok {
klog.Errorf("Failed to record migrated status, cannot convert additional info %v", additionalInfo)
return err
}
migrated = additionalInfoVal.Migrated
}
cmmv, metricsErr := cmm.WithLabelValues(map[string]string{metrics.LabelMigrated: migrated})
if metricsErr != nil {
klog.Errorf("Failed to record migrated status, error: %v", metricsErr)
} else {
cmmBase = cmmv
}
}
// Record the default metric
cmmBase.RecordMetrics(
method, /* operationName */
err, /* operationErr */
duration, /* operationDuration */
)

return err
}

Expand Down
153 changes: 99 additions & 54 deletions connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,64 +340,109 @@ func TestExplicitReconnect(t *testing.T) {
}

func TestConnectMetrics(t *testing.T) {
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
cmmServer := metrics.NewCSIMetricsManagerForPlugin("fake.csi.driver.io")
// We have to have a real implementation of the gRPC call, otherwise the metrics
// interceptor is not called. The CSI identity service is used because it's simple.
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, cmmServer)
defer stopServer()

cmm := metrics.NewCSIMetricsManager("fake.csi.driver.io")
conn, err := Connect(addr, cmm)
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")

identityClient := csi.NewIdentityClient(conn)
if _, err := identityClient.GetPluginInfo(context.Background(), &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
errStatus, _ := status.FromError(err)
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
}
testCases := []struct {
name string
expectedMetrics string
ctx context.Context
checkServer bool
cmm metrics.CSIMetricsManager
}{
{
name: "Regular connection test",
expectedMetrics: `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 1
`,
ctx: context.Background(),
cmm: metrics.NewCSIMetricsManager("fake.csi.driver.io"),
checkServer: true,
},
{
name: "connection test with migrated metrics",
expectedMetrics: `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true"} 1
`,
ctx: context.WithValue(context.Background(), AdditionalInfoKey, AdditionalInfo{
Migrated: "true",
}),
cmm: metrics.NewCSIMetricsManagerWithOptions("fake.csi.driver.io", metrics.WithMigration()),
checkServer: false,
},
}
for _, test := range testCases {
t.Logf("Running testcase %v", test.name)
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
cmmServer := metrics.NewCSIMetricsManagerForPlugin("fake.csi.driver.io")
// We have to have a real implementation of the gRPC call, otherwise the metrics
// interceptor is not called. The CSI identity service is used because it's simple.
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, cmmServer)
defer stopServer()

cmm := test.cmm
conn, err := Connect(addr, cmm)
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")

identityClient := csi.NewIdentityClient(conn)
if _, err := identityClient.GetPluginInfo(test.ctx, &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
errStatus, _ := status.FromError(err)
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
}
}

expectedMetrics := `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 1
`

if err := testutil.GatherAndCompare(
cmm.GetRegistry(), strings.NewReader(expectedMetrics), "csi_sidecar_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
if err != nil {
t.Errorf("Expected client metrics not found -- %v", err)
if err := testutil.GatherAndCompare(
cmm.GetRegistry(), strings.NewReader(test.expectedMetrics), "csi_sidecar_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
if err != nil {
t.Errorf("Expected client metrics not found -- %v", err)
}
}
}

expectedMetrics = strings.Replace(expectedMetrics, "csi_sidecar", metrics.SubsystemPlugin, -1)
if err := testutil.GatherAndCompare(
cmmServer.GetRegistry(), strings.NewReader(expectedMetrics), "csi_plugin_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, metrics.SubsystemPlugin+"_operations_seconds_sum")
if err != nil {
t.Errorf("Expected server metrics not found -- %v", err)
if test.checkServer {
expectedMetrics := strings.Replace(test.expectedMetrics, "csi_sidecar", metrics.SubsystemPlugin, -1)
if err := testutil.GatherAndCompare(
cmmServer.GetRegistry(), strings.NewReader(expectedMetrics), "csi_plugin_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, metrics.SubsystemPlugin+"_operations_seconds_sum")
if err != nil {
t.Errorf("Expected server metrics not found -- %v", err)
}
}
}
}
}
Expand Down
Loading