Skip to content

Commit 15be9ec

Browse files
committed
iproto: support feature push
Adds support for receiving out-of-band messages from a server that uses box.session.push call. Data obtaining is possible for methods: `call`, `eval`, `select`, `insert`, `replace`, `update`, `upsert`, `delete`. To do this, an optional argument `on_push_ctx` is used in the form of a python list, where the received data from out-of-band messages will be added. Closes #201
1 parent a94f97c commit 15be9ec

File tree

3 files changed

+62
-21
lines changed

3 files changed

+62
-21
lines changed

tarantool/connection.py

+60-20
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
REQUEST_TYPE_ERROR,
5656
IPROTO_GREETING_SIZE,
5757
ITERATOR_EQ,
58-
ITERATOR_ALL
58+
ITERATOR_ALL,
59+
IPROTO_CHUNK
5960
)
6061
from tarantool.error import (
6162
Error,
@@ -748,7 +749,7 @@ def _read_response(self):
748749
# Read the packet
749750
return self._recv(length)
750751

751-
def _send_request_wo_reconnect(self, request):
752+
def _send_request_wo_reconnect(self, request, on_push_ctx=[]):
752753
"""
753754
Send request without trying to reconnect.
754755
Reload schema, if required.
@@ -772,6 +773,11 @@ def _send_request_wo_reconnect(self, request):
772773
try:
773774
self._socket.sendall(bytes(request))
774775
response = request.response_class(self, self._read_response())
776+
if response._code == IPROTO_CHUNK:
777+
# Сase of receiving an out-of-band message
778+
on_push_ctx.append(response._data)
779+
# Next comes the main message
780+
response = request.response_class(self, self._read_response())
775781
break
776782
except SchemaReloadException as e:
777783
self.update_schema(e.schema_version)
@@ -851,7 +857,7 @@ def check(): # Check that connection is alive
851857
self.wrap_socket_ssl()
852858
self.handshake()
853859

854-
def _send_request(self, request):
860+
def _send_request(self, request, on_push_ctx=[]):
855861
"""
856862
Send a request to the server through the socket.
857863
@@ -872,7 +878,7 @@ def _send_request(self, request):
872878

873879
self._opt_reconnect()
874880

875-
return self._send_request_wo_reconnect(request)
881+
return self._send_request_wo_reconnect(request, on_push_ctx)
876882

877883
def load_schema(self):
878884
"""
@@ -914,7 +920,7 @@ def flush_schema(self):
914920
self.schema.flush()
915921
self.load_schema()
916922

917-
def call(self, func_name, *args):
923+
def call(self, func_name, *args, **kwargs):
918924
"""
919925
Execute a CALL request: call a stored Lua function.
920926
@@ -930,19 +936,26 @@ def call(self, func_name, *args):
930936
:exc:`~tarantool.error.SchemaError`,
931937
:exc:`~tarantool.error.NetworkError`,
932938
:exc:`~tarantool.error.SslError`
939+
940+
!!!
941+
TODO: write docs
942+
!!!
933943
"""
934944

935945
assert isinstance(func_name, str)
936946

937947
# This allows to use a tuple or list as an argument
938948
if len(args) == 1 and isinstance(args[0], (list, tuple)):
939949
args = args[0]
950+
# Case for absence of optional arg for accepting out-of-band msg data
951+
if not 'on_push_ctx' in kwargs:
952+
kwargs['on_push_ctx'] = []
940953

941954
request = RequestCall(self, func_name, args, self.call_16)
942-
response = self._send_request(request)
955+
response = self._send_request(request, kwargs['on_push_ctx'])
943956
return response
944957

945-
def eval(self, expr, *args):
958+
def eval(self, expr, *args, **kwargs):
946959
"""
947960
Execute an EVAL request: evaluate a Lua expression.
948961
@@ -966,12 +979,15 @@ def eval(self, expr, *args):
966979
# This allows to use a tuple or list as an argument
967980
if len(args) == 1 and isinstance(args[0], (list, tuple)):
968981
args = args[0]
982+
# Case for absence of optional arg for accepting out-of-band msg data
983+
if not 'on_push_ctx' in kwargs:
984+
kwargs['on_push_ctx'] = []
969985

970986
request = RequestEval(self, expr, args)
971-
response = self._send_request(request)
987+
response = self._send_request(request, kwargs['on_push_ctx'])
972988
return response
973989

974-
def replace(self, space_name, values):
990+
def replace(self, space_name, values, **kwargs):
975991
"""
976992
Execute a REPLACE request: `replace`_ a tuple in the space.
977993
Doesn't throw an error if there is no tuple with the specified
@@ -994,10 +1010,14 @@ def replace(self, space_name, values):
9941010
.. _replace: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/replace/
9951011
"""
9961012

1013+
# Case for absence of optional arg for accepting out-of-band msg data
1014+
if not 'on_push_ctx' in kwargs:
1015+
kwargs['on_push_ctx'] = []
1016+
9971017
if isinstance(space_name, str):
9981018
space_name = self.schema.get_space(space_name).sid
9991019
request = RequestReplace(self, space_name, values)
1000-
return self._send_request(request)
1020+
return self._send_request(request, kwargs['on_push_ctx'])
10011021

10021022
def authenticate(self, user, password):
10031023
"""
@@ -1149,7 +1169,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None):
11491169
return
11501170
self.close() # close connection after SUBSCRIBE
11511171

1152-
def insert(self, space_name, values):
1172+
def insert(self, space_name, values, **kwargs):
11531173
"""
11541174
Execute an INSERT request: `insert`_ a tuple to the space.
11551175
Throws an error if there is already a tuple with the same
@@ -1172,12 +1192,16 @@ def insert(self, space_name, values):
11721192
.. _insert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/insert/
11731193
"""
11741194

1195+
# Case for absence of optional arg for accepting out-of-band msg data
1196+
if not 'on_push_ctx' in kwargs:
1197+
kwargs['on_push_ctx'] = []
1198+
11751199
if isinstance(space_name, str):
11761200
space_name = self.schema.get_space(space_name).sid
11771201
request = RequestInsert(self, space_name, values)
1178-
return self._send_request(request)
1202+
return self._send_request(request, kwargs['on_push_ctx'])
11791203

1180-
def delete(self, space_name, key, *, index=0):
1204+
def delete(self, space_name, key, *, index=0, **kwargs):
11811205
"""
11821206
Execute a DELETE request: `delete`_ a tuple in the space.
11831207
@@ -1202,15 +1226,19 @@ def delete(self, space_name, key, *, index=0):
12021226
.. _delete: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/delete/
12031227
"""
12041228

1229+
# Case for absence of optional arg for accepting out-of-band msg data
1230+
if not 'on_push_ctx' in kwargs:
1231+
kwargs['on_push_ctx'] = []
1232+
12051233
key = check_key(key)
12061234
if isinstance(space_name, str):
12071235
space_name = self.schema.get_space(space_name).sid
12081236
if isinstance(index, str):
12091237
index = self.schema.get_index(space_name, index).iid
12101238
request = RequestDelete(self, space_name, index, key)
1211-
return self._send_request(request)
1239+
return self._send_request(request, kwargs['on_push_ctx'])
12121240

1213-
def upsert(self, space_name, tuple_value, op_list, *, index=0):
1241+
def upsert(self, space_name, tuple_value, op_list, *, index=0, **kwargs):
12141242
"""
12151243
Execute an UPSERT request: `upsert`_ a tuple to the space.
12161244
@@ -1252,16 +1280,20 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0):
12521280
.. _upsert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/upsert/
12531281
"""
12541282

1283+
# Case for absence of optional arg for accepting out-of-band msg data
1284+
if not 'on_push_ctx' in kwargs:
1285+
kwargs['on_push_ctx'] = []
1286+
12551287
if isinstance(space_name, str):
12561288
space_name = self.schema.get_space(space_name).sid
12571289
if isinstance(index, str):
12581290
index = self.schema.get_index(space_name, index).iid
12591291
op_list = self._ops_process(space_name, op_list)
12601292
request = RequestUpsert(self, space_name, index, tuple_value,
12611293
op_list)
1262-
return self._send_request(request)
1294+
return self._send_request(request, kwargs['on_push_ctx'])
12631295

1264-
def update(self, space_name, key, op_list, *, index=0):
1296+
def update(self, space_name, key, op_list, *, index=0, **kwargs):
12651297
"""
12661298
Execute an UPDATE request: `update`_ a tuple in the space.
12671299
@@ -1331,14 +1363,18 @@ def update(self, space_name, key, op_list, *, index=0):
13311363
.. _update: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/
13321364
"""
13331365

1366+
# Case for absence of optional arg for accepting out-of-band msg data
1367+
if not 'on_push_ctx' in kwargs:
1368+
kwargs['on_push_ctx'] = []
1369+
13341370
key = check_key(key)
13351371
if isinstance(space_name, str):
13361372
space_name = self.schema.get_space(space_name).sid
13371373
if isinstance(index, str):
13381374
index = self.schema.get_index(space_name, index).iid
13391375
op_list = self._ops_process(space_name, op_list)
13401376
request = RequestUpdate(self, space_name, index, key, op_list)
1341-
return self._send_request(request)
1377+
return self._send_request(request, kwargs['on_push_ctx'])
13421378

13431379
def ping(self, notime=False):
13441380
"""
@@ -1368,7 +1404,7 @@ def ping(self, notime=False):
13681404
return "Success"
13691405
return t1 - t0
13701406

1371-
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None):
1407+
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None, **kwargs):
13721408
"""
13731409
Execute a SELECT request: `select`_ a tuple from the space.
13741410
@@ -1518,13 +1554,17 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i
15181554
# tuples)
15191555
key = check_key(key, select=True)
15201556

1557+
# Case for absence of optional arg for accepting out-of-band msg data
1558+
if not 'on_push_ctx' in kwargs:
1559+
kwargs['on_push_ctx'] = []
1560+
15211561
if isinstance(space_name, str):
15221562
space_name = self.schema.get_space(space_name).sid
15231563
if isinstance(index, str):
15241564
index = self.schema.get_index(space_name, index).iid
15251565
request = RequestSelect(self, space_name, index, key, offset,
15261566
limit, iterator)
1527-
response = self._send_request(request)
1567+
response = self._send_request(request, kwargs['on_push_ctx'])
15281568
return response
15291569

15301570
def space(self, space_name):

tarantool/const.py

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
IPROTO_GREETING_SIZE = 128
4040
IPROTO_BODY_MAX_LEN = 2147483648
41+
IPROTO_CHUNK=0x80
4142

4243
REQUEST_TYPE_OK = 0
4344
REQUEST_TYPE_SELECT = 1

tarantool/mesh_connection.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ def _opt_refresh_instances(self):
581581
update_connection(self, addr)
582582
self._opt_reconnect()
583583

584-
def _send_request(self, request):
584+
def _send_request(self, request, on_push_ctx=[]):
585585
"""
586586
Send a request to a Tarantool server. If required, refresh
587587
addresses list before sending a request.

0 commit comments

Comments
 (0)