Skip to content

Commit 26c6db1

Browse files
GRISHNOVDifferentialOrange
authored andcommitted
iproto: support feature push
Adds support for receiving out-of-band messages from server that uses box.session.push call. Data obtaining is implemented for methods: `call`, `eval`, `select`, `insert`, `replace`, `update`, `upsert`, `delete`. To do this, optional arguments `on_push` and `on_push_ctx` are used for these methods. Argument `on_push` sets the callback to call when an out-of-band message is received, and the `on_push_ctx` argument allows to save the result of `on_push` work or pass data to it. So the API is similar to the implementation of LUA version at the moment. Closes #201
1 parent 1820452 commit 26c6db1

File tree

8 files changed

+591
-46
lines changed

8 files changed

+591
-46
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
184184
Thus, if ``True``, datetime is computed in a way that `dt.timestamp` will
185185
always be equal to initialization `timestamp`.
186186

187+
- Support iproto feature push (#201).
188+
187189
### Changed
188190
- Bump msgpack requirement to 1.0.4 (PR #223).
189191
The only reason of this bump is various vulnerability fixes,

docs/source/quick-start.rst

+147
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,150 @@ read-write and read-only pool instances:
200200
>>> resp = conn.select('demo', 'AAAA', mode=tarantool.Mode.PREFER_RO)
201201
>>> resp
202202
- ['AAAA', 'Alpha']
203+
204+
205+
Receiving out-of-band messages
206+
----------------------------------
207+
208+
Receiving out-of-band messages from a server that uses box.session.push
209+
call is supported for methods: :meth:`~tarantool.Connection.call`,
210+
:meth:`~tarantool.Connection.eval`, :meth:`~tarantool.Connection.select`,
211+
:meth:`~tarantool.Connection.insert`, :meth:`~tarantool.Connection.replace`,
212+
:meth:`~tarantool.Connection.update`, :meth:`~tarantool.Connection.upsert`,
213+
:meth:`~tarantool.Connection.delete`.
214+
215+
To work with out-of-band messages, 2 optional arguments are used in
216+
the methods listed above:
217+
218+
* `on_push` - callback, launched with the received data for each out-of-band message. Two arguments for this callback are expected:
219+
220+
* the first is the received from an out-of-band message data.
221+
222+
* the second is `on_push_ctx`, variable for working with callback context (for example, recording the result or pass data to callback).
223+
* `on_push_ctx` - result of the `on_push` work can be written to this variable, or through this variable you can pass data to `on_push` callback.
224+
225+
Below is an example of the proposed API with method :meth:`~tarantool.Connection.call`
226+
and :meth:`~tarantool.Connection.insert`. In the described example, before the end
227+
of the :meth:`~tarantool.Connection.call` and :meth:`~tarantool.Connection.insert`,
228+
out-of-band messages are processed via specified callback.
229+
230+
In the example below, two shells are used, in the first we will configure the server:
231+
232+
.. code-block:: lua
233+
234+
fiber = require('fiber')
235+
box.cfg({listen = 3301})
236+
box.schema.user.grant(
237+
'guest',
238+
'read,write,execute',
239+
'universe'
240+
)
241+
function server_function()
242+
x = {0,0}
243+
while x[1] < 3 do
244+
x[1] = x[1] + 1
245+
fiber.sleep(1)
246+
box.session.push(x)
247+
end
248+
fiber.sleep(1)
249+
return x
250+
end
251+
252+
In the second shell, we will execute a :meth:`~tarantool.Connection.call`
253+
with receiving out-of-band messages from the server:
254+
255+
.. code-block:: python
256+
257+
import tarantool
258+
259+
def callback(data, on_push_ctx=[]):
260+
print('run callback with data: ', data)
261+
data[0][1] = data[0][1] + 1
262+
on_push_ctx.append(data)
263+
264+
callback_res = []
265+
266+
conn = tarantool.Connection(port=3301)
267+
res = conn.call(
268+
'server_function',
269+
on_push=callback,
270+
on_push_ctx=callback_res
271+
)
272+
273+
# receiving out-of-band messages,
274+
# the conn.call is not finished yet.
275+
276+
>>> run callback with data: [[1, 0]]
277+
>>> run callback with data: [[2, 0]]
278+
>>> run callback with data: [[3, 0]]
279+
280+
# the conn.call is finished now.
281+
282+
print(res)
283+
>>> [3, 0]
284+
285+
print(callback_res)
286+
>>> [[[1, 1]], [[2, 1]], [[3, 1]]]
287+
288+
Let's go back to the first shell with the server and
289+
create a space and a trigger for it:
290+
291+
.. code-block:: lua
292+
293+
box.schema.create_space(
294+
'tester', {
295+
format = {
296+
{name = 'id', type = 'unsigned'},
297+
{name = 'name', type = 'string'},
298+
}
299+
})
300+
box.space.tester:create_index(
301+
'primary_index', {
302+
parts = {
303+
{field = 1, type = 'unsigned'},
304+
}
305+
})
306+
function on_replace_callback()
307+
x = {0,0}
308+
while x[1] < 300 do
309+
x[1] = x[1] + 100
310+
box.session.push(x)
311+
end
312+
return x
313+
end
314+
box.space.tester:on_replace(
315+
on_replace_callback
316+
)
317+
318+
Now, in the second shell, we will execute an :meth:`~tarantool.ConnectionPool.insert`
319+
with out-of-band message processing:
320+
321+
.. code-block:: python
322+
323+
callback_res = []
324+
325+
conn_pool = tarantool.ConnectionPool(
326+
[{'host':'localhost', 'port':3301}],
327+
user='guest')
328+
329+
res = conn_pool.insert(
330+
'tester',
331+
(1, 'Mike'),
332+
on_push=callback,
333+
on_push_ctx=callback_res,
334+
)
335+
336+
# receiving out-of-band messages,
337+
# the conn_pool.insert is not finished yet.
338+
339+
>>> run callback with data: [[100, 0]]
340+
>>> run callback with data: [[200, 0]]
341+
>>> run callback with data: [[300, 0]]
342+
343+
# the conn_pool.insert is finished now.
344+
345+
print(res)
346+
>>> [1, 'Mike']
347+
348+
print(callback_res)
349+
>>> [[[100, 1]], [[200, 1]], [[300, 1]]]

0 commit comments

Comments
 (0)