Skip to content

Refactor metric defer() statements to gRPC metric interceptor #1876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func handle() {
}()
}

var metricsManager *metrics.MetricsManager = nil
if *runControllerService && *httpEndpoint != "" {
mm := metrics.NewMetricsManager()
mm.InitializeHttpHandler(*httpEndpoint, *metricsPath)
Expand All @@ -151,6 +152,7 @@ func handle() {
if metrics.IsGKEComponentVersionAvailable() {
mm.EmitGKEComponentVersion()
}
metricsManager = &mm
}

if len(*extraVolumeLabelsStr) > 0 && !*runControllerService {
Expand Down Expand Up @@ -261,7 +263,7 @@ func handle() {
gce.WaitForOpBackoff.Steps = *waitForOpBackoffSteps
gce.WaitForOpBackoff.Cap = *waitForOpBackoffCap

gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing)
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager)
}

func notEmpty(v string) bool {
Expand Down
102 changes: 17 additions & 85 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"math/rand"
neturl "net/url"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -303,12 +302,14 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre

func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Apply Parameters (case-insensitive). We leave validation of
// the values to the cloud provider.
params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags)
metrics.UpdateRequestMetadataFromParams(ctx, params)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error())
}

// Validate arguments
volumeCapabilities := req.GetVolumeCapabilities()
capacityRange := req.GetCapacityRange()
Expand All @@ -328,17 +329,6 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "VolumeCapabilities is invalid: %v", err.Error())
}

// Apply Parameters (case-insensitive). We leave validation of
// the values to the cloud provider.
params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags)
diskTypeForMetric = params.DiskType
enableConfidentialCompute = strconv.FormatBool(params.EnableConfidentialCompute)
hasStoragePools := len(params.StoragePools) > 0
enableStoragePools = strconv.FormatBool(hasStoragePools)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error())
}
// https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume
// mutable_parameters MUST take precedence over the values from parameters.
mutableParams := req.GetMutableParameters()
Expand Down Expand Up @@ -782,14 +772,6 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
return nil, status.Error(codes.InvalidArgument, "volume ID must be provided")
}

diskType := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools

defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerModifyVolume", err, diskType, enableConfidentialCompute, enableStoragePools)
}()

project, volKey, err := common.VolumeIDToKey(volumeID)
if err != nil {
// Cannot find volume associated with this ID because VolumeID is not in the correct format
Expand All @@ -806,6 +788,7 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
klog.V(4).Infof("Modify Volume Parameters for %s: %v", volumeID, volumeModifyParams)

existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionBeta)
metrics.UpdateRequestMetadataFromDisk(ctx, existingDisk)

if err != nil {
err = fmt.Errorf("Failed to get volume: %w", err)
Expand All @@ -816,9 +799,9 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
err = status.Errorf(codes.Internal, "failed to get volume : %s", volumeID)
return nil, err
}
diskType = existingDisk.GetPDType()

// Check if the disk supports dynamic IOPS/Throughput provisioning
diskType := existingDisk.GetPDType()
supportsIopsChange := gceCS.diskSupportsIopsChange(diskType)
supportsThroughputChange := gceCS.diskSupportsThroughputChange(diskType)
if !supportsIopsChange && !supportsThroughputChange {
Expand All @@ -834,8 +817,6 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
return nil, err
}

enableStoragePools = strconv.FormatBool(existingDisk.GetEnableStoragePools())

err = gceCS.CloudProvider.UpdateDisk(ctx, project, volKey, existingDisk, volumeModifyParams)
if err != nil {
klog.Errorf("Failed to modify volume %s: %v", volumeID, err)
Expand Down Expand Up @@ -883,12 +864,6 @@ func getGCEApiVersion(multiWriter bool) gce.GCEAPIVersion {
func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) {
// List disks with same name
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
existingZones := []string{gceCS.CloudProvider.GetDefaultZone()}
zones, err := getDefaultZonesInRegion(ctx, gceCS, existingZones)
if err != nil {
Expand All @@ -910,7 +885,7 @@ func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *
}
disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, zonalVolKey, gce.GCEAPIVersionV1)
// TODO: Consolidate the parameters here, rather than taking the last.
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
err := gceCS.CloudProvider.DeleteDisk(ctx, project, zonalVolKey)
if err != nil {
deleteDiskErrs = append(deleteDiskErrs, gceCS.CloudProvider.DeleteDisk(ctx, project, volKey))
Expand All @@ -927,12 +902,6 @@ func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *

func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
volumeID := req.GetVolumeId()
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
if err != nil {
Expand All @@ -948,7 +917,7 @@ func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, re
}
defer gceCS.volumeLocks.Release(volumeID)
disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey)
if err != nil {
return nil, common.LoggedError("Failed to delete disk: ", err)
Expand All @@ -960,12 +929,6 @@ func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, re

func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Only valid requests will be accepted
_, _, _, err = gceCS.validateControllerPublishVolumeRequest(ctx, req)
if err != nil {
Expand All @@ -978,7 +941,7 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
}

resp, err, disk := gceCS.executeControllerPublishVolume(ctx, req)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err)
gceCS.errorBackoff.next(backoffId, common.CodeForError(err))
Expand Down Expand Up @@ -1192,12 +1155,6 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con

func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
_, _, err = gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1209,7 +1166,7 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
return nil, status.Errorf(gceCS.errorBackoff.code(backoffId), "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
}
resp, err, disk := gceCS.executeControllerUnpublishVolume(ctx, req)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err)
gceCS.errorBackoff.next(backoffId, common.CodeForError(err))
Expand Down Expand Up @@ -1316,12 +1273,6 @@ func (gceCS *GCEControllerServer) parameterProcessor() *common.ParameterProcesso

func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ValidateVolumeCapabilities", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
if req.GetVolumeCapabilities() == nil || len(req.GetVolumeCapabilities()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities must be provided")
}
Expand All @@ -1348,7 +1299,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
defer gceCS.volumeLocks.Release(volumeID)

disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
if gce.IsGCENotFoundError(err) {
return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error())
Expand Down Expand Up @@ -1564,12 +1515,6 @@ func (gceCS *GCEControllerServer) ControllerGetCapabilities(ctx context.Context,

func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Validate arguments
volumeID := req.GetSourceVolumeId()
if len(req.Name) == 0 {
Expand All @@ -1595,7 +1540,7 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C

// Check if volume exists
disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
if gce.IsGCENotFoundError(err) {
return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error())
Expand Down Expand Up @@ -1823,12 +1768,6 @@ func isCSISnapshotReady(status string) (bool, error) {

func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("DeleteSnapshot", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Validate arguments
snapshotID := req.GetSnapshotId()
if len(snapshotID) == 0 {
Expand Down Expand Up @@ -1913,14 +1852,7 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li
}

func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {

var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID must be provided")
Expand Down Expand Up @@ -1950,7 +1882,7 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re
}

sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(sourceDisk)
metrics.UpdateRequestMetadataFromDisk(ctx, sourceDisk)
resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes)

if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)

Expand Down Expand Up @@ -170,12 +171,12 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err
}
}

func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelTracing bool) {
func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelTracing bool, metricsManager *metrics.MetricsManager) {
maxLogChar = grpcLogCharCap

klog.V(4).Infof("Driver: %v", gceDriver.name)
//Start the nonblocking GRPC
s := NewNonBlockingGRPCServer(enableOtelTracing)
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
// TODO(#34): Only start specific servers based on a flag.
// In the future have this only run specific combinations of servers depending on which version this is.
// The schema for that was in util. basically it was just s.start but with some nil servers.
Expand Down
11 changes: 8 additions & 3 deletions pkg/gce-pd-csi-driver/gce-pd-driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExe
return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider)
}

func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
vendorVersion := "test-vendor"
func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer {
gceDriver := GetGCEDriver()
errorBackoffInitialDuration := 200 * time.Millisecond
errorBackoffMaxDuration := 5 * time.Minute
Expand All @@ -55,7 +54,13 @@ func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute)
SupportsThroughputChange: []string{"hyperdisk-balanced", "hyperdisk-throughput", "hyperdisk-ml"},
}

controllerServer := NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig)
return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig)
}

func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
vendorVersion := "test-vendor"
gceDriver := GetGCEDriver()
controllerServer := controllerServerForTest(cloudProvider)
err := gceDriver.SetupGCEDriver(driver, vendorVersion, nil, nil, nil, controllerServer, nil)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
Expand Down
23 changes: 16 additions & 7 deletions pkg/gce-pd-csi-driver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"

csi "github.com/container-storage-interface/spec/lib/go/csi"
)
Expand All @@ -40,15 +41,16 @@ type NonBlockingGRPCServer interface {
ForceStop()
}

func NewNonBlockingGRPCServer(enableOtelTracing bool) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{otelTracing: enableOtelTracing}
func NewNonBlockingGRPCServer(enableOtelTracing bool, metricsManager *metrics.MetricsManager) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{otelTracing: enableOtelTracing, metricsManager: metricsManager}
}

// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
otelTracing bool
wg sync.WaitGroup
server *grpc.Server
otelTracing bool
metricsManager *metrics.MetricsManager
}

func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
Expand All @@ -73,10 +75,17 @@ func (s *nonBlockingGRPCServer) ForceStop() {
}

func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
grpcInterceptor := grpc.UnaryInterceptor(logGRPC)
interceptors := []grpc.UnaryServerInterceptor{logGRPC}
if s.metricsManager != nil {
metricsInterceptor := metrics.MetricInterceptor{
MetricsManager: s.metricsManager,
}
interceptors = append(interceptors, metricsInterceptor.UnaryInterceptor())
}
if s.otelTracing {
grpcInterceptor = grpc.ChainUnaryInterceptor(logGRPC, otelgrpc.UnaryServerInterceptor())
interceptors = append(interceptors, otelgrpc.UnaryServerInterceptor())
}
grpcInterceptor := grpc.ChainUnaryInterceptor(interceptors...)

opts := []grpc.ServerOption{
grpcInterceptor,
Expand Down
Loading