Skip to content

Commit ae279e6

Browse files
committed
Implement flush_all feature
1 parent c4be8c4 commit ae279e6

21 files changed

+288
-50
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ venv/
1414
bench_venv/
1515
_client.cpp
1616
_client.so
17+
_client.*.so
1718
.tox/
1819

1920
.cache/

ext/gtest/CMakeLists.txt.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ include(ExternalProject)
55

66
ExternalProject_Add(googletest
77
GIT_REPOSITORY https://github.com/google/googletest.git
8-
GIT_TAG master
8+
GIT_TAG release-1.8.0
99
SOURCE_DIR "${CMAKE_BINARY_DIR}/googletest-src"
1010
BINARY_DIR "${CMAKE_BINARY_DIR}/googletest-build"
1111
CMAKE_ARGS -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG:PATH=DebugLibs

include/Client.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ DECL_RETRIEVAL_CMD(gets)
5050
err_code_t version(broadcast_result_t** results, size_t* nHosts);
5151
err_code_t quit();
5252
err_code_t stats(broadcast_result_t** results, size_t* nHosts);
53+
err_code_t flushAll(broadcast_result_t** results, size_t* nHosts);
5354

5455
// touch
5556
err_code_t touch(const char* const* keys, const size_t* keyLens,
@@ -65,18 +66,23 @@ DECL_RETRIEVAL_CMD(gets)
6566
const bool noreply,
6667
unsigned_result_t** result, size_t* nResults);
6768

69+
inline void toggleFlushAllFeature(bool enabled) {
70+
m_flushAllEnabled = enabled;
71+
}
6872
void _sleep(uint32_t seconds); // check GIL in Python
6973

7074
protected:
7175
void collectRetrievalResult(retrieval_result_t*** results, size_t* nResults);
7276
void collectMessageResult(message_result_t*** results, size_t* nResults);
73-
void collectBroadcastResult(broadcast_result_t** results, size_t* nHosts);
77+
void collectBroadcastResult(broadcast_result_t** results, size_t* nHosts, bool isFlushAll=false);
7478
void collectUnsignedResult(unsigned_result_t** results, size_t* nResults);
7579

7680
std::vector<retrieval_result_t*> m_outRetrievalResultPtrs;
7781
std::vector<message_result_t*> m_outMessageResultPtrs;
7882
std::vector<broadcast_result_t> m_outBroadcastResultPtrs;
7983
std::vector<unsigned_result_t*> m_outUnsignedResultPtrs;
84+
85+
bool m_flushAllEnabled;
8086
};
8187

8288
} // namespace mc

include/ConnectionPool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ConnectionPool {
4141

4242
void collectRetrievalResult(std::vector<retrieval_result_t*>& results);
4343
void collectMessageResult(std::vector<message_result_t*>& results);
44-
void collectBroadcastResult(std::vector<broadcast_result_t>& results);
44+
void collectBroadcastResult(std::vector<broadcast_result_t>& results, bool isFlushAll=false);
4545
void collectUnsignedResult(std::vector<unsigned_result_t*>& results);
4646
void reset();
4747
void setPollTimeout(int timeout);

include/Export.h

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,6 @@ typedef uint32_t flags_t;
4444
typedef uint64_t cas_unique_t;
4545

4646

47-
typedef struct {
48-
char* host;
49-
char** lines;
50-
size_t* line_lens;
51-
size_t len;
52-
} broadcast_result_t;
53-
54-
5547
typedef struct {
5648
char* key; // 8B
5749
char* data_block; // 8B
@@ -63,7 +55,8 @@ typedef struct {
6355

6456

6557
enum message_result_type {
66-
MSG_EXISTS,
58+
MSG_LIBMC_INVALID = -1,
59+
MSG_EXISTS = 0,
6760
MSG_OK,
6861
MSG_STORED,
6962
MSG_NOT_STORED,
@@ -85,3 +78,16 @@ typedef struct {
8578
size_t key_len;
8679
uint64_t value;
8780
} unsigned_result_t;
81+
82+
83+
// For flush_all command, we need to specify
84+
// {host} and {msg_type},
85+
// for other broadcast commands, we need to specify
86+
// all fields except {msg_type}
87+
typedef struct {
88+
char* host;
89+
char** lines;
90+
size_t* line_lens;
91+
size_t len;
92+
enum message_result_type msg_type; // for flush_all command
93+
} broadcast_result_t;

include/Keywords.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ static const char k_NOREPLY[] = " noreply";
3131

3232
static const char kVERSION[] = "version";
3333
static const char kSTATS[] = "stats";
34+
static const char kFLUSHALL[] = "flush_all";
3435

3536
static const char kQUIT[] = "quit";
3637

include/c_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ extern "C" {
6363
void client_destroy_unsigned_result(void* client);
6464

6565
err_code_t client_stats(void* client, broadcast_result_t** results, size_t* n_servers);
66+
void client_toggle_flush_all_feature(void* client, bool enabled);
67+
err_code_t client_flush_all(void* client, broadcast_result_t** results, size_t* n_servers);
6668
err_code_t client_quit(void* client);
6769

6870
const char* err_code_to_string(err_code_t err);

libmc/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
)
2929

3030
__VERSION__ = "1.2.0"
31-
__version__ = "v1.2.1"
31+
__version__ = "v1.2.1-4-gc3c6500"
3232
__author__ = "mckelvin"
3333
__email__ = "[email protected]"
34-
__date__ = "Wed Mar 13 14:40:14 2019 +0800"
34+
__date__ = "Tue Jul 30 14:06:44 2019 +0800"
3535

3636

3737
class Client(PyClient):

libmc/_client.pyx

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,8 @@ cdef extern from "Export.h":
7272
uint32_t bytes
7373
cas_unique_t cas_unique
7474

75-
ctypedef struct broadcast_result_t:
76-
char* host
77-
char** lines
78-
size_t* line_lens
79-
size_t len
80-
8175
ctypedef enum message_result_type:
76+
MSG_LIBMC_INVALID
8277
MSG_EXISTS
8378
MSG_OK
8479
MSG_STORED
@@ -109,6 +104,13 @@ cdef extern from "Export.h":
109104
size_t key_len
110105
uint64_t value
111106

107+
ctypedef struct broadcast_result_t:
108+
char* host
109+
char** lines
110+
size_t* line_lens
111+
size_t len
112+
message_result_type msg_type;
113+
112114

113115
cdef extern from "Client.h" namespace "douban::mc":
114116
cdef cppclass Client:
@@ -189,6 +191,8 @@ cdef extern from "Client.h" namespace "douban::mc":
189191
err_code_t version(broadcast_result_t** results, size_t* nHosts) nogil
190192
err_code_t quit() nogil
191193
err_code_t stats(broadcast_result_t** results, size_t* nHosts) nogil
194+
err_code_t flushAll(broadcast_result_t** results, size_t* nHosts) nogil
195+
void toggleFlushAllFeature(bool_t enabled)
192196
void destroyBroadcastResult() nogil
193197

194198
err_code_t incr(
@@ -952,6 +956,30 @@ cdef class PyClient:
952956
self._imp.destroyBroadcastResult()
953957
return rv
954958

959+
def toggle_flush_all_feature(self, enabled):
960+
self._imp.toggleFlushAllFeature(enabled)
961+
962+
def flush_all(self):
963+
self._record_thread_ident()
964+
cdef broadcast_result_t* rst = NULL
965+
cdef size_t n = 0
966+
with nogil:
967+
self.last_error = self._imp.flushAll(&rst, &n)
968+
969+
rv = []
970+
for i in range(n):
971+
if rst[i].msg_type == MSG_OK:
972+
rv.append(rst[i].host)
973+
974+
with nogil:
975+
self._imp.destroyBroadcastResult()
976+
if self.last_error == RET_PROGRAMMING_ERR:
977+
raise RuntimeError(
978+
"Please call toggle_flush_all_feature(true) first "
979+
"to enable the flush_all feature."
980+
)
981+
return rv
982+
955983
def quit(self):
956984
self._record_thread_ident()
957985
with nogil:

misc/update_version.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ for VERSIONING_FILE in $VERSIONING_FILES
77
do
88
TMPFILE=$VERSIONING_FILE".2"
99
cat $VERSIONING_FILE | \
10-
python $VERSIONING_SCRIPT --clean | \
11-
python $VERSIONING_SCRIPT > $TMPFILE
10+
python2 $VERSIONING_SCRIPT --clean | \
11+
python2 $VERSIONING_SCRIPT > $TMPFILE
1212
mv $TMPFILE $VERSIONING_FILE
1313
git add $VERSIONING_FILE
1414
done

misc/versioning.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#!/usr/bin/env python
1+
#!/usr/bin/env python2
22
# from: https://gist.githubusercontent.com/pkrusche/7369262/raw/5bf2dc8afb88d3fdde7be6d16ee4290db6735f37/versioning.py
33

44
""" Git Versioning Script

setup.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import re
44
import sys
5+
import shlex
56
import pkg_resources
67
from glob import glob
78
from setuptools import setup, Extension
@@ -56,7 +57,7 @@ class PyTest(TestCommand):
5657

5758
def initialize_options(self):
5859
TestCommand.initialize_options(self)
59-
self.pytest_args = ['tests']
60+
self.pytest_args = 'tests'
6061

6162
def finalize_options(self):
6263
TestCommand.finalize_options(self)
@@ -66,7 +67,7 @@ def finalize_options(self):
6667
def run_tests(self):
6768
#import here, cause outside the eggs aren't loaded
6869
import pytest
69-
errno = pytest.main(self.pytest_args)
70+
errno = pytest.main(shlex.split(self.pytest_args))
7071
sys.exit(errno)
7172

7273
setup(

src/Client.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace douban {
99
namespace mc {
1010

11-
Client::Client() {
11+
Client::Client(): m_flushAllEnabled(false) {
1212
}
1313

1414

@@ -125,10 +125,10 @@ err_code_t Client::_delete(const char* const* keys, const size_t* keyLens,
125125
}
126126

127127

128-
void Client::collectBroadcastResult(broadcast_result_t** results, size_t* nHosts) {
128+
void Client::collectBroadcastResult(broadcast_result_t** results, size_t* nHosts, bool isFlushAll) {
129129
assert(m_outBroadcastResultPtrs.empty());
130130
*nHosts = m_nConns;
131-
ConnectionPool::collectBroadcastResult(m_outBroadcastResultPtrs);
131+
ConnectionPool::collectBroadcastResult(m_outBroadcastResultPtrs, isFlushAll);
132132
*results = &m_outBroadcastResultPtrs.front();
133133
}
134134

@@ -166,6 +166,19 @@ err_code_t Client::stats(broadcast_result_t** results, size_t* nHosts) {
166166
return rv;
167167
}
168168

169+
err_code_t Client::flushAll(broadcast_result_t** results, size_t* nHosts) {
170+
if (!m_flushAllEnabled) {
171+
*results = NULL;
172+
*nHosts = 0;
173+
return RET_PROGRAMMING_ERR;
174+
}
175+
176+
broadcastCommand(keywords::kFLUSHALL, 9);
177+
err_code_t rv = waitPoll();
178+
collectBroadcastResult(results, nHosts, true);
179+
return rv;
180+
}
181+
169182

170183
err_code_t Client::touch(const char* const* keys, const size_t* keyLens,
171184
const exptime_t exptime, const bool noreply, size_t nItems,

src/ConnectionPool.cpp

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -582,27 +582,39 @@ void ConnectionPool::collectMessageResult(std::vector<message_result_t*>& result
582582
}
583583

584584

585-
void ConnectionPool::collectBroadcastResult(std::vector<broadcast_result_t>& results) {
585+
void ConnectionPool::collectBroadcastResult(std::vector<broadcast_result_t>& results, bool isFlushAll) {
586586
results.resize(m_nConns);
587587
for (size_t i = 0; i < m_nConns; ++i) {
588588
Connection* conn = m_conns + i;
589589
broadcast_result_t* conn_result = &results[i];
590590
conn_result->host = const_cast<char*>(conn->name());
591-
types::LineResultList* rst = conn->getLineResults();
592-
conn_result->len = rst->size();
591+
conn_result->lines = NULL;
592+
conn_result->line_lens = NULL;
593+
conn_result->len = 0;
594+
conn_result->msg_type = MSG_LIBMC_INVALID;
595+
596+
if (isFlushAll) {
597+
types::MessageResultList* rst = conn->getMessageResults();
598+
if (rst->size() == 1) {
599+
conn_result->msg_type = (rst->front()).type_;
600+
} else {
601+
conn_result->msg_type = MSG_LIBMC_INVALID;
602+
}
603+
} else {
604+
types::LineResultList* rst = conn->getLineResults();
605+
conn_result->len = rst->size();
593606

594-
if (conn_result->len == 0) {
595-
conn_result->lines = NULL;
596-
conn_result->line_lens = NULL;
597-
continue;
598-
}
599-
conn_result->lines = new char*[conn_result->len];
600-
conn_result->line_lens = new size_t[conn_result->len];
607+
if (conn_result->len == 0) {
608+
continue;
609+
}
610+
conn_result->lines = new char*[conn_result->len];
611+
conn_result->line_lens = new size_t[conn_result->len];
601612

602-
int j = 0;
603-
for (types::LineResultList::iterator it2 = rst->begin(); it2 != rst->end(); ++it2, ++j) {
604-
types::LineResult* r1 = &(*it2);
605-
conn_result->lines[j] = r1->inner(conn_result->line_lens[j]);
613+
int j = 0;
614+
for (types::LineResultList::iterator it2 = rst->begin(); it2 != rst->end(); ++it2, ++j) {
615+
types::LineResult* r1 = &(*it2);
616+
conn_result->lines[j] = r1->inner(conn_result->line_lens[j]);
617+
}
606618
}
607619
}
608620
}

src/Parser.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,20 @@ void PacketParser::setMode(ParserMode md) {
3535

3636
void PacketParser::processMessageResult(enum message_result_type tp) {
3737
m_messageResults.push_back(message_result_t());
38-
3938
message_result_t* inner_rst = &m_messageResults.back();
40-
struct iovec iov = m_requestKeys[m_requestKeyIdx];
41-
++m_requestKeyIdx;
4239
inner_rst->type_ = tp;
43-
inner_rst->key = static_cast<char*>(iov.iov_base);
44-
inner_rst->key_len = iov.iov_len;
40+
41+
// "OK\r\n" is not a key related response code,
42+
// so we don't need to fill the key.
43+
if (tp == MSG_OK) {
44+
inner_rst->key = NULL;
45+
inner_rst->key_len = 0;
46+
} else {
47+
struct iovec iov = m_requestKeys[m_requestKeyIdx];
48+
++m_requestKeyIdx;
49+
inner_rst->key = static_cast<char*>(iov.iov_base);
50+
inner_rst->key_len = iov.iov_len;
51+
}
4552
}
4653

4754

@@ -338,6 +345,7 @@ int PacketParser::start_state(err_code_t& err) {
338345
// OK
339346
EXPECT_BYTES("OK\r\n", 4);
340347
processMessageResult(MSG_OK);
348+
m_state = FSM_END;
341349
}
342350
break;
343351
case 'S':

src/c_client.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,20 @@ err_code_t client_stats(void* client, broadcast_result_t** results, size_t* n_se
142142
return c->stats(results, n_servers);
143143
}
144144

145+
void client_toggle_flush_all_feature(void* client, bool enabled) {
146+
douban::mc::Client* c = static_cast<Client*>(client);
147+
return c->toggleFlushAllFeature(enabled);
148+
}
149+
err_code_t client_flush_all(void* client, broadcast_result_t** results, size_t* n_servers) {
150+
douban::mc::Client* c = static_cast<Client*>(client);
151+
return c->flushAll(results, n_servers);
152+
}
153+
145154
err_code_t client_quit(void* client) {
146155
douban::mc::Client* c = static_cast<Client*>(client);
147156
return c->quit();
148157
}
149158

150159
const char* err_code_to_string(err_code_t err) {
151160
return douban::mc::errCodeToString(err);
152-
}
161+
}

0 commit comments

Comments
 (0)