9
9
from tarantool .error import NetworkError
10
10
from tarantool .utils import ENCODING_DEFAULT
11
11
from tarantool .const import (
12
+ CONNECTION_TIMEOUT ,
12
13
SOCKET_TIMEOUT ,
13
14
RECONNECT_MAX_ATTEMPTS ,
14
15
RECONNECT_DELAY ,
15
- NODES_REFRESH_INTERVAL
16
+ DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS ,
16
17
)
17
18
19
+ from tarantool .utils import (
20
+ ENCODING_DEFAULT
21
+ )
18
22
19
23
class RoundRobinStrategy (object ):
20
24
def __init__ (self , addrs ):
@@ -88,26 +92,36 @@ class MeshConnection(Connection):
88
92
end
89
93
'''
90
94
91
- def __init__ (self , addrs ,
95
+ def __init__ (self , host , port ,
92
96
user = None ,
93
97
password = None ,
94
98
socket_timeout = SOCKET_TIMEOUT ,
95
99
reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS ,
96
100
reconnect_delay = RECONNECT_DELAY ,
97
101
connect_now = True ,
98
102
encoding = ENCODING_DEFAULT ,
103
+ call_16 = False ,
104
+ connection_timeout = CONNECTION_TIMEOUT ,
105
+ cluster_list = None ,
99
106
strategy_class = RoundRobinStrategy ,
100
107
get_nodes_function_name = None ,
101
- nodes_refresh_interval = NODES_REFRESH_INTERVAL ):
102
- self .nattempts = 2 * len (addrs ) + 1
108
+ nodes_refresh_interval = DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS ):
109
+
110
+ addrs = [{"host" :host , "port" :port }]
111
+ if cluster_list :
112
+ for i in cluster_list :
113
+ if i ["host" ] == host or i ["port" ] == port :
114
+ continue
115
+ addrs .append (i )
116
+
103
117
self .strategy = strategy_class (addrs )
104
118
self .strategy_class = strategy_class
105
119
addr = self .strategy .getnext ()
106
120
host = addr ['host' ]
107
121
port = addr ['port' ]
108
122
self .get_nodes_function_name = get_nodes_function_name
109
- self .nodes_refresh_interval = nodes_refresh_interval >= 30 and nodes_refresh_interval or 30
110
- self .last_nodes_refresh = 0
123
+ self .nodes_refresh_interval = nodes_refresh_interval
124
+ self .last_nodes_refresh = time . time ()
111
125
super (MeshConnection , self ).__init__ (host = host ,
112
126
port = port ,
113
127
user = user ,
@@ -116,54 +130,69 @@ def __init__(self, addrs,
116
130
reconnect_max_attempts = reconnect_max_attempts ,
117
131
reconnect_delay = reconnect_delay ,
118
132
connect_now = connect_now ,
119
- encoding = encoding )
120
-
121
- def _opt_reconnect (self ):
122
- nattempts = self .nattempts
123
- while nattempts > 0 :
124
- try :
125
- super (MeshConnection , self )._opt_reconnect ()
126
- break
127
- except NetworkError :
128
- nattempts -= 1
129
- addr = self .strategy .getnext ()
130
- self .host = addr ['host' ]
131
- self .port = addr ['port' ]
132
- else :
133
- raise NetworkError
133
+ encoding = encoding ,
134
+ call_16 = call_16 ,
135
+ connection_timeout = connection_timeout )
134
136
135
- if self .authenticated and self .get_nodes_function_name :
136
- now = time .time ()
137
- if now - self .last_nodes_refresh > self .nodes_refresh_interval :
138
- self .refresh_nodes (now )
137
+ def _opt_refresh_instances (self ):
138
+ """
139
+ Refresh list of cluster instances. If current connection not in server list will change connection.
140
+ """
141
+ now = time .time ()
142
+
143
+ if self .connected and now - self .last_nodes_refresh > self .nodes_refresh_interval / 1000 :
144
+ resp = self .call (self .get_nodes_function_name , reconnect = False )
145
+
146
+ # got data to refresh
147
+ if resp .data and resp .data [0 ]:
148
+ addrs = list (parse_uri (i ) for i in resp .data [0 ])
149
+ self .strategy = self .strategy_class (addrs )
150
+ self .last_nodes_refresh = now
139
151
140
- def refresh_nodes (self , cur_time ):
141
- '''
142
- Refreshes nodes list by calling Lua function with name
143
- self.get_nodes_function_name on the current node. If this field is None
144
- no refresh occurs. Usually you don't need to call this function manually
145
- since it's called automatically during reconnect every
146
- self.nodes_refresh_interval seconds.
147
- '''
148
- resp = super (MeshConnection , self ).call_ex (self .get_nodes_function_name ,
149
- [], reconnect = False )
150
-
151
- if not (resp .data and resp .data [0 ]):
152
- return
153
-
154
- addrs_raw = resp .data [0 ]
155
- if type (addrs_raw ) is list :
156
- addrs = []
157
- for uri_str in addrs_raw :
158
- addr = parse_uri (uri_str )
159
- if addr :
160
- addrs .append (addr )
161
-
162
- self .strategy = self .strategy_class (addrs )
163
- self .last_nodes_refresh = cur_time
164
152
if not {'host' : self .host , 'port' : self .port } in addrs :
165
153
addr = self .strategy .getnext ()
166
154
self .host = addr ['host' ]
167
155
self .port = addr ['port' ]
168
156
self .close ()
169
- self ._opt_reconnect ()
157
+
158
+ if not self .connected :
159
+
160
+ nattempts = (len (self .strategy .addrs ) * 2 ) + 1
161
+
162
+ while nattempts >= 0 :
163
+ try :
164
+ addr = self .strategy .getnext ()
165
+ if addr ['host' ] != self .host or addr ['port' ] != self .port :
166
+ self .host = addr ['host' ]
167
+ self .port = addr ['port' ]
168
+ self ._opt_reconnect ()
169
+ break
170
+ else :
171
+ nattempts -= 1
172
+ except NetworkError :
173
+ continue
174
+ else :
175
+ raise NetworkError
176
+
177
+ def _send_request (self , request ):
178
+ '''
179
+ Send the request to the server through the socket.
180
+ Return an instance of `Response` class.
181
+
182
+ Update instances list from server `get_nodes_function_name` function.
183
+
184
+ :param request: object representing a request
185
+ :type request: `Request` instance
186
+
187
+ :rtype: `Response` instance
188
+ '''
189
+ if self .get_nodes_function_name :
190
+ self ._opt_refresh_instances ()
191
+
192
+ try :
193
+ return super (MeshConnection , self )._send_request (request )
194
+ except NetworkError :
195
+ self .connected = False
196
+ self ._opt_refresh_instances ()
197
+ finally :
198
+ return super (MeshConnection , self )._send_request (request )
0 commit comments