Skip to content

Commit 6ee8a5f

Browse files
committed
Add Connect function
1 parent 6bb407d commit 6ee8a5f

File tree

5 files changed

+1573
-1
lines changed

5 files changed

+1573
-1
lines changed

Gopkg.lock

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connection/connection.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package connection
2+
3+
import (
4+
"context"
5+
"net"
6+
"strings"
7+
"time"
8+
9+
"github.com/golang/glog"
10+
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
11+
"google.golang.org/grpc"
12+
)
13+
14+
const (
15+
// Interval of logging connection errors
16+
connectionLoggingInterval = 10 * time.Second
17+
)
18+
19+
// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to a socket file
20+
// or have format '<protocol>://', following gRPC name resolution mechanism at https://github.com/grpc/grpc/blob/master/doc/naming.md.
21+
// The function tries to connect indefinitely every second until it connects. The function automatically adds
22+
// interceptor for gRPC message logging.
23+
func Connect(address string, dialOptions ...grpc.DialOption) (*grpc.ClientConn, error) {
24+
dialOptions = append(dialOptions,
25+
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
26+
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
27+
grpc.WithBlock(), // Block until connection succeeds.
28+
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
29+
)
30+
if strings.HasPrefix(address, "/") {
31+
// It looks like filesystem path.
32+
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
33+
return net.DialTimeout("unix", addr, timeout)
34+
}))
35+
}
36+
glog.Infof("Connecting to %s", address)
37+
38+
// Connect in background.
39+
var conn *grpc.ClientConn
40+
var err error
41+
ready := make(chan bool)
42+
go func() {
43+
conn, err = grpc.Dial(address, dialOptions...)
44+
close(ready)
45+
}()
46+
47+
// Log error every connectionLoggingInterval
48+
ticker := time.NewTicker(connectionLoggingInterval)
49+
defer ticker.Stop()
50+
51+
// Wait until Dial() succeeds.
52+
for {
53+
select {
54+
case <-ticker.C:
55+
glog.Warningf("Still connecting to %s", address)
56+
57+
case <-ready:
58+
return conn, err
59+
}
60+
}
61+
}
62+
63+
// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message.
64+
func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
65+
glog.V(5).Infof("GRPC call: %s", method)
66+
glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
67+
err := invoker(ctx, method, req, reply, cc, opts...)
68+
glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply))
69+
glog.V(5).Infof("GRPC error: %v", err)
70+
return err
71+
}

vendor/github.com/golang/glog/LICENSE

Lines changed: 191 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)