Skip to content

Commit 948e5cd

Browse files
drewdzzzlocker
authored andcommitted
iproto: add pagination options to iproto
The patch adds new iproto keys needed for pagination and extends tx_process_select, used by IPROTO. IPROTO and NETBOX_IPROTO versions are updated, iproto feature pagination is introduced, new keys are: 0x2e - IPROTO_AFTER_POSITION - start iteration after passed iterator position. It has type MP_STR. 0x2f - IPROTO_AFTER_TUPLE - start iteration after passed tuple. It has type MP_ARRAY. 0x1f - IPROTO_FETCH_POSITION - send position of last fetched tuple in response. It has type MP_BOOL. 0x35 - IPROTO_POSITION - iterator position, sent in response if IPROTO_FETCH_POSITION is true. It has type MP_STR. Part of #7637 NO_CHANGELOG=see later commits NO_DOC=see later commits
1 parent 215630e commit 948e5cd

13 files changed

+313
-26
lines changed

src/box/iproto.cc

+41-7
Original file line numberDiff line numberDiff line change
@@ -2081,19 +2081,40 @@ tx_process_select(struct cmsg *m)
20812081
struct port port;
20822082
int count;
20832083
int rc;
2084+
const char *packed_pos, *packed_pos_end;
2085+
bool reply_position;
20842086
struct request *req = &msg->dml;
2087+
uint32_t region_svp = region_used(&fiber()->gc);
20852088
if (tx_check_schema(msg->header.schema_version))
20862089
goto error;
20872090

20882091
tx_inject_delay();
2092+
packed_pos = req->after_position;
2093+
packed_pos_end = req->after_position_end;
2094+
if (packed_pos != NULL) {
2095+
mp_decode_strl(&packed_pos);
2096+
} else if (req->after_tuple != NULL) {
2097+
rc = box_index_tuple_position(req->space_id, req->index_id,
2098+
req->after_tuple,
2099+
req->after_tuple_end,
2100+
&packed_pos, &packed_pos_end);
2101+
if (rc < 0)
2102+
goto error;
2103+
}
20892104
rc = box_select(req->space_id, req->index_id,
20902105
req->iterator, req->offset, req->limit,
2091-
req->key, req->key_end, NULL, NULL, false, &port);
2106+
req->key, req->key_end, &packed_pos, &packed_pos_end,
2107+
req->fetch_position, &port);
20922108
if (rc < 0)
20932109
goto error;
20942110

20952111
out = msg->connection->tx.p_obuf;
2096-
if (iproto_prepare_select(out, &svp) != 0) {
2112+
reply_position = req->fetch_position && packed_pos != NULL;
2113+
if (reply_position)
2114+
rc = iproto_prepare_select_with_position(out, &svp);
2115+
else
2116+
rc = iproto_prepare_select(out, &svp);
2117+
if (rc != 0) {
20972118
port_destroy(&port);
20982119
goto error;
20992120
}
@@ -2103,16 +2124,29 @@ tx_process_select(struct cmsg *m)
21032124
count = port_dump_msgpack_16(&port, out);
21042125
port_destroy(&port);
21052126
if (count < 0) {
2106-
/* Discard the prepared select. */
2107-
obuf_rollback_to_svp(out, &svp);
2108-
goto error;
2127+
goto discard;
2128+
}
2129+
if (reply_position) {
2130+
assert(packed_pos != NULL);
2131+
if (iproto_reply_select_with_position(out, &svp,
2132+
msg->header.sync,
2133+
::schema_version, count,
2134+
packed_pos,
2135+
packed_pos_end) != 0)
2136+
goto discard;
2137+
} else {
2138+
iproto_reply_select(out, &svp, msg->header.sync,
2139+
::schema_version, count);
21092140
}
2110-
iproto_reply_select(out, &svp, msg->header.sync,
2111-
::schema_version, count);
2141+
region_truncate(&fiber()->gc, region_svp);
21122142
iproto_wpos_create(&msg->wpos, out);
21132143
tx_end_msg(msg, &svp);
21142144
return;
2145+
discard:
2146+
/* Discard the prepared select. */
2147+
obuf_rollback_to_svp(out, &svp);
21152148
error:
2149+
region_truncate(&fiber()->gc, region_svp);
21162150
out = msg->connection->tx.p_obuf;
21172151
svp = obuf_create_svp(out);
21182152
tx_reply_error(msg);

src/box/iproto_constants.c

+4-7
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
7373
/* 0x1c */ MP_UINT,
7474
/* 0x1d */ MP_UINT,
7575
/* 0x1e */ MP_UINT,
76-
/* 0x1f */ MP_UINT,
7776
/* }}} */
7877

7978
/* {{{ body -- all keys */
79+
/* 0x1f */ MP_BOOL, /* IPROTO_FETCH_POSITION */
8080
/* 0x20 */ MP_ARRAY, /* IPROTO_KEY */
8181
/* 0x21 */ MP_ARRAY, /* IPROTO_TUPLE */
8282
/* 0x22 */ MP_STR, /* IPROTO_FUNCTION_NAME */
@@ -91,11 +91,8 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
9191
/* 0x2b */ MP_MAP, /* IPROTO_OPTIONS */
9292
/* 0x2c */ MP_ARRAY, /* IPROTO_OLD_TUPLE */
9393
/* 0x2d */ MP_ARRAY, /* IPROTO_NEW_TUPLE */
94-
/* }}} */
95-
96-
/* {{{ unused */
97-
/* 0x2e */ MP_UINT,
98-
/* 0x2f */ MP_UINT,
94+
/* 0x2e */ MP_STR, /* IPROTO_AFTER_POSITION */
95+
/* 0x2f */ MP_ARRAY, /* IPROTO_AFTER_TUPLE */
9996
/* }}} */
10097

10198
/* {{{ body -- response keys */
@@ -104,10 +101,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
104101
/* 0x32 */ MP_ARRAY, /* IPROTO_METADATA */
105102
/* 0x33 */ MP_ARRAY, /* IPROTO_BIND_METADATA */
106103
/* 0x34 */ MP_UINT, /* IIPROTO_BIND_COUNT */
104+
/* 0x35 */ MP_STR, /* IPROTO_POSITION */
107105
/* }}} */
108106

109107
/* {{{ unused */
110-
/* 0x35 */ MP_UINT,
111108
/* 0x36 */ MP_UINT,
112109
/* 0x37 */ MP_UINT,
113110
/* 0x38 */ MP_UINT,

src/box/iproto_constants.h

+12-2
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,12 @@ enum iproto_key {
8080
IPROTO_OFFSET = 0x13,
8181
IPROTO_ITERATOR = 0x14,
8282
IPROTO_INDEX_BASE = 0x15,
83-
8483
/* Leave a gap between integer values and other keys */
84+
/**
85+
* Flag indicating the need to send position of
86+
* last selected tuple in response.
87+
*/
88+
IPROTO_FETCH_POSITION = 0x1f,
8589
IPROTO_KEY = 0x20,
8690
IPROTO_TUPLE = 0x21,
8791
IPROTO_FUNCTION_NAME = 0x22,
@@ -108,8 +112,12 @@ enum iproto_key {
108112
IPROTO_OLD_TUPLE = 0x2c,
109113
/** New tuple (i.e. result of DML request). */
110114
IPROTO_NEW_TUPLE = 0x2d,
115+
/** Position of last selected tuple to start iteration after it. */
116+
IPROTO_AFTER_POSITION = 0x2e,
117+
/** Last selected tuple to start iteration after it. */
118+
IPROTO_AFTER_TUPLE = 0x2f,
111119

112-
/* Leave a gap between request keys and response keys */
120+
/** Response keys. */
113121
IPROTO_DATA = 0x30,
114122
IPROTO_ERROR_24 = 0x31,
115123
/**
@@ -122,6 +130,8 @@ enum iproto_key {
122130
IPROTO_METADATA = 0x32,
123131
IPROTO_BIND_METADATA = 0x33,
124132
IPROTO_BIND_COUNT = 0x34,
133+
/** Position of last selected tuple in response. */
134+
IPROTO_POSITION = 0x35,
125135

126136
/* Leave a gap between response keys and SQL keys. */
127137
IPROTO_SQL_TEXT = 0x40,

src/box/iproto_features.c

+2
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,6 @@ iproto_features_init(void)
6666
IPROTO_FEATURE_ERROR_EXTENSION);
6767
iproto_features_set(&IPROTO_CURRENT_FEATURES,
6868
IPROTO_FEATURE_WATCHERS);
69+
iproto_features_set(&IPROTO_CURRENT_FEATURES,
70+
IPROTO_FEATURE_PAGINATION);
6971
}

src/box/iproto_features.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ enum iproto_feature_id {
4545
* IPROTO_WATCH, IPROTO_UNWATCH, IPROTO_EVENT commands.
4646
*/
4747
IPROTO_FEATURE_WATCHERS = 3,
48+
/**
49+
* Pagination support:
50+
* IPROTO_AFTER_POSITION, IPROTO_AFTER_TUPLE, IPROTO_FETCH_POSITION
51+
* request fields and IPROTO_POSITION response field.
52+
*/
53+
IPROTO_FEATURE_PAGINATION = 4,
4854
iproto_feature_id_MAX,
4955
};
5056

@@ -60,7 +66,7 @@ struct iproto_features {
6066
* It should be incremented every time a new feature is added or removed.
6167
*/
6268
enum {
63-
IPROTO_CURRENT_VERSION = 3,
69+
IPROTO_CURRENT_VERSION = 4,
6470
};
6571

6672
/**

src/box/lua/net_box.c

+3-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ enum {
8282
/**
8383
* IPROTO protocol version supported by the netbox connector.
8484
*/
85-
NETBOX_IPROTO_VERSION = 3,
85+
NETBOX_IPROTO_VERSION = 4,
8686
};
8787

8888
/**
@@ -2809,6 +2809,8 @@ luaopen_net_box(struct lua_State *L)
28092809
IPROTO_FEATURE_ERROR_EXTENSION);
28102810
iproto_features_set(&NETBOX_IPROTO_FEATURES,
28112811
IPROTO_FEATURE_WATCHERS);
2812+
iproto_features_set(&NETBOX_IPROTO_FEATURES,
2813+
IPROTO_FEATURE_PAGINATION);
28122814

28132815
lua_pushcfunction(L, luaT_netbox_request_iterator_next);
28142816
luaT_netbox_request_iterator_next_ref = luaL_ref(L, LUA_REGISTRYINDEX);

src/box/lua/net_box.lua

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ local IPROTO_FEATURE_NAMES = {
5656
[1] = 'transactions',
5757
[2] = 'error_extension',
5858
[3] = 'watchers',
59+
[4] = 'pagination',
5960
}
6061

6162
local REQUEST_OPTION_TYPES = {

src/box/xrow.c

+70-2
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,10 @@ static const struct iproto_body_bin iproto_body_bin = {
411411
0x81, IPROTO_DATA, 0xdd, 0
412412
};
413413

414+
static const struct iproto_body_bin iproto_body_bin_with_position = {
415+
0x82, IPROTO_DATA, 0xdd, 0
416+
};
417+
414418
/** Return a 4-byte numeric error code, with status flags. */
415419
static inline uint32_t
416420
iproto_encode_error(uint32_t error)
@@ -664,21 +668,56 @@ iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size)
664668
return 0;
665669
}
666670

671+
/** Reply select with IPROTO_DATA. */
667672
void
668673
iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
669674
uint32_t schema_version, uint32_t count)
670675
{
671676
char *pos = (char *) obuf_svp_to_ptr(buf, svp);
672677
iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
673-
obuf_size(buf) - svp->used -
674-
IPROTO_HEADER_LEN);
678+
obuf_size(buf) - svp->used -
679+
IPROTO_HEADER_LEN);
675680

676681
struct iproto_body_bin body = iproto_body_bin;
677682
body.v_data_len = mp_bswap_u32(count);
678683

679684
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
680685
}
681686

687+
/** Reply select with IPROTO_DATA and IPROTO_POSITION. */
688+
int
689+
iproto_reply_select_with_position(struct obuf *buf, struct obuf_svp *svp,
690+
uint64_t sync, uint32_t schema_version,
691+
uint32_t count, const char *packed_pos,
692+
const char *packed_pos_end)
693+
{
694+
size_t packed_pos_size = packed_pos_end - packed_pos;
695+
size_t key_size = mp_sizeof_uint(IPROTO_POSITION);
696+
size_t alloc_size = key_size + mp_sizeof_strl(packed_pos_size);
697+
char *ptr = obuf_alloc(buf, alloc_size);
698+
if (ptr == NULL) {
699+
diag_set(OutOfMemory, alloc_size, "obuf_alloc", "ptr");
700+
return -1;
701+
}
702+
ptr = mp_encode_uint(ptr, IPROTO_POSITION);
703+
mp_encode_strl(ptr, packed_pos_size);
704+
if (obuf_dup(buf, packed_pos,
705+
packed_pos_size) != packed_pos_size) {
706+
return -1;
707+
}
708+
709+
char *pos = (char *)obuf_svp_to_ptr(buf, svp);
710+
iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
711+
obuf_size(buf) - svp->used -
712+
IPROTO_HEADER_LEN);
713+
714+
struct iproto_body_bin body = iproto_body_bin_with_position;
715+
body.v_data_len = mp_bswap_u32(count);
716+
717+
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
718+
return 0;
719+
}
720+
682721
int
683722
xrow_decode_sql(const struct xrow_header *row, struct sql_request *request)
684723
{
@@ -850,6 +889,9 @@ xrow_decode_dml(struct xrow_header *row, struct request *request,
850889
case IPROTO_ITERATOR:
851890
request->iterator = mp_decode_uint(&value);
852891
break;
892+
case IPROTO_FETCH_POSITION:
893+
request->fetch_position = mp_decode_bool(&value);
894+
break;
853895
case IPROTO_TUPLE:
854896
request->tuple = value;
855897
request->tuple_end = data;
@@ -874,6 +916,14 @@ xrow_decode_dml(struct xrow_header *row, struct request *request,
874916
request->new_tuple = value;
875917
request->new_tuple_end = data;
876918
break;
919+
case IPROTO_AFTER_POSITION:
920+
request->after_position = value;
921+
request->after_position_end = data;
922+
break;
923+
case IPROTO_AFTER_TUPLE:
924+
request->after_tuple = value;
925+
request->after_tuple_end = data;
926+
break;
877927
default:
878928
break;
879929
}
@@ -920,6 +970,17 @@ request_snprint(char *buf, int size, const struct request *request)
920970
SNPRINT(total, snprintf, buf, size, ", new_tuple: ");
921971
SNPRINT(total, mp_snprint, buf, size, request->new_tuple);
922972
}
973+
if (request->fetch_position) {
974+
SNPRINT(total, snprintf, buf, size, ", fetch_position: true");
975+
}
976+
if (request->after_position != NULL) {
977+
SNPRINT(total, snprintf, buf, size, ", after_position: ");
978+
SNPRINT(total, mp_snprint, buf, size, request->after_position);
979+
}
980+
if (request->after_tuple != NULL) {
981+
SNPRINT(total, snprintf, buf, size, ", after_tuple: ");
982+
SNPRINT(total, mp_snprint, buf, size, request->after_tuple);
983+
}
923984
SNPRINT(total, snprintf, buf, size, "}");
924985
return total;
925986
}
@@ -937,6 +998,13 @@ int
937998
xrow_encode_dml(const struct request *request, struct region *region,
938999
struct iovec *iov)
9391000
{
1001+
assert(request != NULL);
1002+
/* Select is unexpected here. Hence, pagination option too. */
1003+
assert(request->header == NULL ||
1004+
request->header->type != IPROTO_SELECT);
1005+
assert(request->after_position == NULL);
1006+
assert(request->after_tuple == NULL);
1007+
assert(!request->fetch_position);
9401008
int iovcnt = 1;
9411009
const int MAP_LEN_MAX = 40;
9421010
uint32_t key_len = request->key_end - request->key;

src/box/xrow.h

+35
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,18 @@ struct request {
202202
const char *new_tuple;
203203
/** End of @new_tuple. */
204204
const char *new_tuple_end;
205+
/** Packed iterator_position with MP_STR header. */
206+
const char *after_position;
207+
/** End of @after_position. */
208+
const char *after_position_end;
209+
/** Last selected tuple to start iteration after it. */
210+
const char *after_tuple;
211+
/** End of @after_tuple. */
212+
const char *after_tuple_end;
205213
/** Base field offset for UPDATE/UPSERT, e.g. 0 for C and 1 for Lua. */
206214
int index_base;
215+
/** Send position of last selected tuple in response if true. */
216+
bool fetch_position;
207217
};
208218

209219
/**
@@ -648,6 +658,22 @@ iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
648658
return iproto_prepare_header(buf, svp, IPROTO_SELECT_HEADER_LEN);
649659
}
650660

661+
/**
662+
* Prepare the iproto header for a select result set and iterator position.
663+
* It is just an alias fot iproto_prepare_select, it is needed for better
664+
* code readability.
665+
* @param buf Out buffer.
666+
* @param svp Savepoint of the header beginning.
667+
*
668+
* @retval 0 Success.
669+
* @retval -1 Memory error.
670+
*/
671+
static inline int
672+
iproto_prepare_select_with_position(struct obuf *buf, struct obuf_svp *svp)
673+
{
674+
return iproto_prepare_select(buf, svp);
675+
}
676+
651677
/**
652678
* Write select header to a preallocated buffer.
653679
* This function doesn't throw (and we rely on this in iproto.cc).
@@ -656,6 +682,15 @@ void
656682
iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
657683
uint32_t schema_version, uint32_t count);
658684

685+
/**
686+
* Write extended select header to a preallocated buffer.
687+
*/
688+
int
689+
iproto_reply_select_with_position(struct obuf *buf, struct obuf_svp *svp,
690+
uint64_t sync, uint32_t schema_version,
691+
uint32_t count, const char *packed_pos,
692+
const char *packed_pos_end);
693+
659694
/**
660695
* Encode iproto header with IPROTO_OK response code.
661696
* @param out Encode to.

0 commit comments

Comments
 (0)