Skip to content

Commit 23d286e

Browse files
authored
Merge pull request #1 from pohly/connection-tests
connection: add tests
2 parents 635b0f9 + fad9258 commit 23d286e

File tree

8 files changed

+2653
-6
lines changed

8 files changed

+2653
-6
lines changed

Gopkg.lock

Lines changed: 7 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connection/connection.go

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package connection
1818

1919
import (
2020
"context"
21+
"errors"
22+
"net"
2123
"strings"
2224
"time"
2325

@@ -34,19 +36,94 @@ const (
3436
// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
3537
// file or have format '<protocol>://', following gRPC name resolution mechanism at
3638
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
39+
//
3740
// The function tries to connect indefinitely every second until it connects. The function automatically disables TLS
3841
// and adds interceptor for logging of all gRPC messages at level 5.
39-
func Connect(address string) (*grpc.ClientConn, error) {
40-
dialOptions := []grpc.DialOption{
42+
//
43+
// For a connection to a Unix Domain socket, the behavior after
44+
// loosing the connection is configurable. The default is to
45+
// log the connection loss and reestablish a connection. Applications
46+
// which need to know about a connection loss can be notified by
47+
// passing a callback with OnConnectionLoss and in that callback
48+
// can decide what to do:
49+
// - exit the application with os.Exit
50+
// - invalidate cached information
51+
// - disable the reconnect, which will cause all gRPC method calls to fail with status.Unavailable
52+
//
53+
// For other connections, the default behavior from gRPC is used and
54+
// loss of connection is not detected reliably.
55+
func Connect(address string, options ...Option) (*grpc.ClientConn, error) {
56+
return connect(address, []grpc.DialOption{}, options)
57+
}
58+
59+
// Option is the type of all optional parameters for Connect.
60+
type Option func(o *options)
61+
62+
// OnConnectionLoss registers a callback that will be invoked when the
63+
// connection got lost. If that callback returns true, the connection
64+
// is restablished. Otherwise the connection is left as it is and
65+
// all future gRPC calls using it will fail with status.Unavailable.
66+
func OnConnectionLoss(reconnect func() bool) Option {
67+
return func(o *options) {
68+
o.reconnect = reconnect
69+
}
70+
}
71+
72+
type options struct {
73+
reconnect func() bool
74+
}
75+
76+
// connect is the internal implementation of Connect. It has more options to enable testing.
77+
func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
78+
var o options
79+
for _, option := range connectOptions {
80+
option(&o)
81+
}
82+
83+
dialOptions = append(dialOptions,
4184
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
4285
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
4386
grpc.WithBlock(), // Block until connection succeeds.
4487
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
45-
}
88+
)
89+
unixPrefix := "unix://"
4690
if strings.HasPrefix(address, "/") {
4791
// It looks like filesystem path.
48-
address = "unix://" + address
92+
address = unixPrefix + address
93+
}
94+
95+
if strings.HasPrefix(address, unixPrefix) {
96+
// state variables for the custom dialer
97+
haveConnected := false
98+
lostConnection := false
99+
reconnect := true
100+
101+
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
102+
if haveConnected && !lostConnection {
103+
// We have detected a loss of connection for the first time. Decide what to do...
104+
// Record this once. TODO (?): log at regular time intervals.
105+
glog.Errorf("Lost connection to %s.", address)
106+
// Inform caller and let it decide? Default is to reconnect.
107+
if o.reconnect != nil {
108+
reconnect = o.reconnect()
109+
}
110+
lostConnection = true
111+
}
112+
if !reconnect {
113+
return nil, errors.New("connection lost, reconnecting disabled")
114+
}
115+
conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout)
116+
if err == nil {
117+
// Connection restablished.
118+
haveConnected = true
119+
lostConnection = false
120+
}
121+
return conn, err
122+
}))
123+
} else if o.reconnect != nil {
124+
return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses")
49125
}
126+
50127
glog.Infof("Connecting to %s", address)
51128

52129
// Connect in background.

0 commit comments

Comments
 (0)