Skip to content

Commit 763b95d

Browse files
authored
Merge pull request #11 from jsafrane/connection
Add Connect function
2 parents f4bb89e + 155f487 commit 763b95d

File tree

11 files changed

+4332
-6
lines changed

11 files changed

+4332
-6
lines changed

Gopkg.lock

Lines changed: 49 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connection/connection.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package connection
18+
19+
import (
20+
"context"
21+
"errors"
22+
"net"
23+
"strings"
24+
"time"
25+
26+
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
27+
"google.golang.org/grpc"
28+
"k8s.io/klog"
29+
)
30+
31+
const (
32+
// Interval of logging connection errors
33+
connectionLoggingInterval = 10 * time.Second
34+
)
35+
36+
// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
37+
// file or have format '<protocol>://', following gRPC name resolution mechanism at
38+
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
39+
//
40+
// The function tries to connect indefinitely every second until it connects. The function automatically disables TLS
41+
// and adds interceptor for logging of all gRPC messages at level 5.
42+
//
43+
// For a connection to a Unix Domain socket, the behavior after
44+
// loosing the connection is configurable. The default is to
45+
// log the connection loss and reestablish a connection. Applications
46+
// which need to know about a connection loss can be notified by
47+
// passing a callback with OnConnectionLoss and in that callback
48+
// can decide what to do:
49+
// - exit the application with os.Exit
50+
// - invalidate cached information
51+
// - disable the reconnect, which will cause all gRPC method calls to fail with status.Unavailable
52+
//
53+
// For other connections, the default behavior from gRPC is used and
54+
// loss of connection is not detected reliably.
55+
func Connect(address string, options ...Option) (*grpc.ClientConn, error) {
56+
return connect(address, []grpc.DialOption{}, options)
57+
}
58+
59+
// Option is the type of all optional parameters for Connect.
60+
type Option func(o *options)
61+
62+
// OnConnectionLoss registers a callback that will be invoked when the
63+
// connection got lost. If that callback returns true, the connection
64+
// is restablished. Otherwise the connection is left as it is and
65+
// all future gRPC calls using it will fail with status.Unavailable.
66+
func OnConnectionLoss(reconnect func() bool) Option {
67+
return func(o *options) {
68+
o.reconnect = reconnect
69+
}
70+
}
71+
72+
type options struct {
73+
reconnect func() bool
74+
}
75+
76+
// connect is the internal implementation of Connect. It has more options to enable testing.
77+
func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
78+
var o options
79+
for _, option := range connectOptions {
80+
option(&o)
81+
}
82+
83+
dialOptions = append(dialOptions,
84+
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
85+
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
86+
grpc.WithBlock(), // Block until connection succeeds.
87+
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
88+
)
89+
unixPrefix := "unix://"
90+
if strings.HasPrefix(address, "/") {
91+
// It looks like filesystem path.
92+
address = unixPrefix + address
93+
}
94+
95+
if strings.HasPrefix(address, unixPrefix) {
96+
// state variables for the custom dialer
97+
haveConnected := false
98+
lostConnection := false
99+
reconnect := true
100+
101+
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
102+
if haveConnected && !lostConnection {
103+
// We have detected a loss of connection for the first time. Decide what to do...
104+
// Record this once. TODO (?): log at regular time intervals.
105+
klog.Errorf("Lost connection to %s.", address)
106+
// Inform caller and let it decide? Default is to reconnect.
107+
if o.reconnect != nil {
108+
reconnect = o.reconnect()
109+
}
110+
lostConnection = true
111+
}
112+
if !reconnect {
113+
return nil, errors.New("connection lost, reconnecting disabled")
114+
}
115+
conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout)
116+
if err == nil {
117+
// Connection restablished.
118+
haveConnected = true
119+
lostConnection = false
120+
}
121+
return conn, err
122+
}))
123+
} else if o.reconnect != nil {
124+
return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses")
125+
}
126+
127+
klog.Infof("Connecting to %s", address)
128+
129+
// Connect in background.
130+
var conn *grpc.ClientConn
131+
var err error
132+
ready := make(chan bool)
133+
go func() {
134+
conn, err = grpc.Dial(address, dialOptions...)
135+
close(ready)
136+
}()
137+
138+
// Log error every connectionLoggingInterval
139+
ticker := time.NewTicker(connectionLoggingInterval)
140+
defer ticker.Stop()
141+
142+
// Wait until Dial() succeeds.
143+
for {
144+
select {
145+
case <-ticker.C:
146+
klog.Warningf("Still connecting to %s", address)
147+
148+
case <-ready:
149+
return conn, err
150+
}
151+
}
152+
}
153+
154+
// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message.
155+
func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
156+
klog.V(5).Infof("GRPC call: %s", method)
157+
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
158+
err := invoker(ctx, method, req, reply, cc, opts...)
159+
klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply))
160+
klog.V(5).Infof("GRPC error: %v", err)
161+
return err
162+
}

0 commit comments

Comments
 (0)