|
| 1 | +package connection |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "net" |
| 6 | + "strings" |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/golang/glog" |
| 10 | + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" |
| 11 | + "google.golang.org/grpc" |
| 12 | +) |
| 13 | + |
| 14 | +const ( |
| 15 | + // Interval of logging connection errors |
| 16 | + connectionLoggingInterval = 10 * time.Second |
| 17 | +) |
| 18 | + |
| 19 | +// Connect opens insecure gRPC connection to a CSI driver. Address must have either '<protocol>://' prefix, or be |
| 20 | +// a path to a socket file. The function tries to connect indefinitely every second until it connects. |
| 21 | +// The function automatically adds interceptor for gRPC message logging. |
| 22 | +func Connect(address string, dialOptions ...grpc.DialOption) (*grpc.ClientConn, error) { |
| 23 | + dialOptions = append(dialOptions, |
| 24 | + grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container. |
| 25 | + grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure. |
| 26 | + grpc.WithBlock(), // Block until connection succeeds. |
| 27 | + grpc.WithUnaryInterceptor(LogGRPC), // Log all messages. |
| 28 | + ) |
| 29 | + if strings.HasPrefix(address, "/") { |
| 30 | + // It looks like filesystem path. |
| 31 | + dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { |
| 32 | + return net.DialTimeout("unix", addr, timeout) |
| 33 | + })) |
| 34 | + } |
| 35 | + glog.Infof("Connecting to %s", address) |
| 36 | + |
| 37 | + // Wait until the connection is ready. |
| 38 | + var conn *grpc.ClientConn |
| 39 | + var err error |
| 40 | + ready := make(chan bool) |
| 41 | + go func() { |
| 42 | + conn, err = grpc.Dial(address, dialOptions...) |
| 43 | + close(ready) |
| 44 | + }() |
| 45 | + |
| 46 | + // Log error every connectionLoggingInterval |
| 47 | + ticker := time.NewTicker(connectionLoggingInterval) |
| 48 | + defer ticker.Stop() |
| 49 | + |
| 50 | + for { |
| 51 | + select { |
| 52 | + case <-ticker.C: |
| 53 | + glog.Warningf("Still connecting to %s", address) |
| 54 | + |
| 55 | + case <-ready: |
| 56 | + return conn, err |
| 57 | + } |
| 58 | + } |
| 59 | +} |
| 60 | + |
| 61 | +// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message. |
| 62 | +func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| 63 | + glog.V(5).Infof("GRPC call: %s", method) |
| 64 | + glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) |
| 65 | + err := invoker(ctx, method, req, reply, cc, opts...) |
| 66 | + glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply)) |
| 67 | + glog.V(5).Infof("GRPC error: %v", err) |
| 68 | + return err |
| 69 | +} |
0 commit comments