Skip to content

Commit dca9fd8

Browse files
Fixed variables names.
Reworked strategy class. Reorked and added tests. Some small cleanup.
1 parent bfbc164 commit dca9fd8

File tree

6 files changed

+215
-136
lines changed

6 files changed

+215
-136
lines changed

README.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,10 @@ How to use the connector
8888
con = tarantool.Connection(host='localhost',
8989
port=3301,
9090
user='test',
91-
password='test',
92-
connect_now=True)
91+
password='test')
9392
9493
95-
resp = con.eval('return "Hello Wrold!"')
94+
resp = con.eval('return "Hello World!"')
9695
if resp.data[0] == "Hello World!":
9796
print('Ok')
9897

tarantool/connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,8 @@ def call(self, func_name, *args):
378378
args = args[0]
379379

380380
request = RequestCall(self, func_name, args, self.call_16)
381-
return self._send_request(request)
381+
response = self._send_request(request)
382+
return response
382383

383384
def eval(self, expr, *args):
384385
'''
@@ -396,7 +397,7 @@ def eval(self, expr, *args):
396397
# This allows to use a tuple or list as an argument
397398
if len(args) == 1 and isinstance(args[0], (list, tuple)):
398399
args = args[0]
399-
400+
400401
request = RequestEval(self, expr, args)
401402
return self._send_request(request)
402403

tarantool/const.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,4 @@
8787
# Default delay between attempts to reconnect (seconds)
8888
RECONNECT_DELAY = 0.1
8989
# Default cluster nodes list refresh interval (seconds)
90-
DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS = 60000
90+
CLUSTER_DISCOVERY_DELAY = 60

tarantool/mesh_connection.py

Lines changed: 94 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
'''
66

77
import time
8+
from itertools import cycle
9+
810
from tarantool.connection import Connection
911
from tarantool.error import NetworkError
1012
from tarantool.utils import ENCODING_DEFAULT
@@ -13,7 +15,7 @@
1315
SOCKET_TIMEOUT,
1416
RECONNECT_MAX_ATTEMPTS,
1517
RECONNECT_DELAY,
16-
DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS,
18+
CLUSTER_DISCOVERY_DELAY,
1719
)
1820

1921
from tarantool.request import (
@@ -23,47 +25,52 @@
2325

2426
class RoundRobinStrategy(object):
2527
"""
26-
Simple roundrobin address rotation
28+
Simple round-robin address rotation
2729
"""
2830
def __init__(self, addrs):
29-
self.addrs = addrs
30-
self.pos = 0
31+
self.reload(addrs)
32+
33+
def reload(self, addrs):
34+
self.addrs = []
35+
for i in addrs:
36+
if i.get("host", None) and i.get("port", None) \
37+
and isinstance(i["port"], int):
38+
if i in self.addrs:
39+
continue
40+
self.addrs.append(i)
41+
self.__gen = cycle(self.addrs)
3142

3243
def getnext(self):
33-
tmp = self.pos
34-
self.pos = (self.pos + 1) % len(self.addrs)
35-
return self.addrs[tmp]
44+
return next(self.__gen)
3645

3746

3847
def parse_uri(uri_str):
39-
if not uri_str or ':' not in uri_str:
40-
return None
41-
uri = uri_str.split(':')
48+
if not uri_str or uri_str.count(":") != 1:
49+
return
50+
uri = uri_str.split(':', 1)
4251
host = uri[0]
4352
try:
4453
port = int(uri[1])
4554
except ValueError:
46-
return None
47-
55+
return
56+
4857
if host and port:
4958
return {'host': host, 'port': port}
50-
else:
51-
return None
5259

5360

5461
class MeshConnection(Connection):
5562
'''
56-
Represents a connection to a cluster of Tarantool servers.
63+
Represents a connection to a cluster of Tarantool servers.
5764
58-
This class uses Connection to connect to one of the nodes of the cluster.
59-
The initial list of nodes is passed to the constructor in 'addrs' parameter.
60-
The class set in 'strategy_class' parameter is used to select a node from
61-
the list and switch nodes in case of unavailability of the current node.
65+
This class uses Connection to connect to one of the nodes of the cluster.
66+
The initial list of nodes is passed to the constructor in 'addrs' parameter.
67+
The class set in 'strategy_class' parameter is used to select a node from
68+
the list and switch nodes in case of unavailability of the current node.
6269
63-
'get_nodes_function_name' param of the constructor sets the name of a stored
64-
Lua function used to refresh the list of available nodes. The function takes
65-
no parameters and returns a list of strings in format 'host:port'. A generic
66-
function for getting the list of nodes looks like this:
70+
'cluster_discovery_function' param of the constructor sets the name of a stored
71+
Lua function used to refresh the list of available nodes. The function takes
72+
no parameters and returns a list of strings in format 'host:port'. A generic
73+
function for getting the list of nodes looks like this:
6774
6875
.. code-block:: lua
6976
@@ -116,29 +123,27 @@ def __init__(self, host=None, port=None,
116123
connection_timeout=CONNECTION_TIMEOUT,
117124
addrs=None,
118125
strategy_class=RoundRobinStrategy,
119-
get_nodes_function_name=None,
120-
nodes_refresh_interval=DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS):
126+
cluster_discovery_function=None,
127+
cluster_discovery_delay=CLUSTER_DISCOVERY_DELAY):
121128

122-
addrs_list = []
129+
if addrs is None:
130+
addrs = []
123131

124132
if host and port:
125-
addrs_list.append({'host':host, 'port':port})
126-
127-
if addrs:
128-
for addr in addrs:
129-
if 'host' in addr and 'port' in addr:
130-
addrs_list.append({'host': addr['host'], 'port': addr['port']})
133+
addrs.insert(0, {'host': host, 'port': port})
131134

132135
self.strategy_class = strategy_class
133-
self.strategy = strategy_class(addrs_list)
134-
135-
if not host and not port:
136-
addr = self.strategy.getnext()
137-
host = addr['host']
138-
port = addr['port']
139-
140-
self.get_nodes_function_name = get_nodes_function_name
141-
self.nodes_refresh_interval = nodes_refresh_interval
136+
self.strategy = strategy_class(addrs)
137+
138+
if not self.strategy.addrs and connect_now:
139+
raise ValueError("Host/port or addrs list must be set")
140+
141+
addr = self.strategy.getnext()
142+
host = addr['host']
143+
port = addr['port']
144+
145+
self.cluster_discovery_function = cluster_discovery_function
146+
self.cluster_discovery_delay = cluster_discovery_delay
142147
self.last_nodes_refresh = time.time()
143148
super(MeshConnection, self).__init__(host=host,
144149
port=port,
@@ -152,67 +157,75 @@ def __init__(self, host=None, port=None,
152157
call_16=call_16,
153158
connection_timeout=connection_timeout)
154159

160+
def _opt_reconnect(self):
161+
'''
162+
Use original opt_reconnect with address rotation
163+
'''
164+
165+
for i in range(len(self.strategy.addrs)+1):
166+
try:
167+
super(MeshConnection, self)._opt_reconnect()
168+
except NetworkError:
169+
addr = self.strategy.getnext()
170+
self.host = addr["host"]
171+
self.port = addr["port"]
172+
continue
173+
174+
if not self._socket:
175+
raise NetworkError
176+
155177
def _opt_refresh_instances(self):
156-
"""
178+
'''
157179
Refresh list of cluster instances.
158180
If current connection not in server list will change connection.
159-
"""
181+
'''
160182
now = time.time()
161183

162-
if self.connected and now - self.last_nodes_refresh > self.nodes_refresh_interval/1000:
163-
request = RequestCall(self, self.get_nodes_function_name, (), self.call_16)
184+
if not self.connected:
185+
return
186+
187+
if now - self.last_nodes_refresh > self.cluster_discovery_delay:
188+
request = RequestCall(self,
189+
self.cluster_discovery_function,
190+
(),
191+
self.call_16)
192+
164193
resp = self._send_request_wo_reconnect(request)
165194

166195
# got data to refresh
167196
if resp.data and resp.data[0]:
168-
addrs = list(parse_uri(i) for i in resp.data[0])
169-
self.strategy = self.strategy_class(addrs)
170-
self.last_nodes_refresh = now
171-
172-
if {'host': self.host, 'port': self.port} not in addrs:
173-
addr = self.strategy.getnext()
174-
self.host = addr['host']
175-
self.port = addr['port']
176-
self.close()
177-
178-
if not self.connected:
179-
180-
nattempts = (len(self.strategy.addrs) * 2) + 1
181-
182-
while nattempts >= 0:
183-
try:
197+
addrs = []
198+
for i in resp.data[0]:
199+
addr = parse_uri(i)
200+
if addr:
201+
addrs.append(addr)
202+
if addrs:
203+
self.strategy.reload(addrs)
204+
self.last_nodes_refresh = now
205+
else:
206+
raise NetworkError('No addresses to connect')
207+
208+
# Check current addr in list
209+
if {'host': self.host, 'port': self.port} not in addrs:
210+
self.close()
184211
addr = self.strategy.getnext()
185-
if addr['host'] != self.host or addr['port'] != self.port:
186-
self.host = addr['host']
187-
self.port = addr['port']
188-
self._opt_reconnect()
189-
break
190-
else:
191-
nattempts -= 1
192-
except NetworkError:
193-
continue
194-
else:
195-
raise NetworkError
212+
self.host = addr['host']
213+
self.port = addr['port']
214+
self._opt_reconnect()
196215

197216
def _send_request(self, request):
198217
'''
199218
Send the request to the server through the socket.
200219
Return an instance of `Response` class.
201220
202-
Update instances list from server `get_nodes_function_name` function.
221+
Update instances list from server `cluster_discovery_function` function.
203222
204223
:param request: object representing a request
205224
:type request: `Request` instance
206225
207226
:rtype: `Response` instance
208227
'''
209-
if self.get_nodes_function_name:
228+
if self.cluster_discovery_function:
210229
self._opt_refresh_instances()
211230

212-
try:
213-
return super(MeshConnection, self)._send_request(request)
214-
except NetworkError:
215-
self.connected = False
216-
self._opt_refresh_instances()
217-
finally:
218-
return super(MeshConnection, self)._send_request(request)
231+
return super(MeshConnection, self)._send_request(request)

0 commit comments

Comments
 (0)