Skip to content

Commit 5bdfff8

Browse files
pcmoritzxhochy
authored andcommitted
ARROW-2541: [Plasma] Replace macros with constexpr
Author: Philipp Moritz <[email protected]> Closes #2001 from pcmoritz/remove-macros and squashes the following commits: cc3a7d2 <Philipp Moritz> fix comments 3ac436b <Philipp Moritz> fix linting de7b0b6 <Philipp Moritz> more cleanups 9024214 <Philipp Moritz> fix linting a59e1b5 <Philipp Moritz> fix documentation 176b7c6 <Philipp Moritz> clean up macros
1 parent b916c79 commit 5bdfff8

File tree

9 files changed

+52
-42
lines changed

9 files changed

+52
-42
lines changed

cpp/apidoc/tutorials/plasma.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ using namespace plasma;
8080
int main(int argc, char** argv) {
8181
// Start up and connect a Plasma client.
8282
PlasmaClient client;
83-
ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
83+
ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
8484
// Disconnect the Plasma client.
8585
ARROW_CHECK_OK(client.Disconnect());
8686
}
@@ -218,7 +218,7 @@ using namespace plasma;
218218
int main(int argc, char** argv) {
219219
// Start up and connect a Plasma client.
220220
PlasmaClient client;
221-
ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
221+
ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
222222
// Create an object with a fixed ObjectID.
223223
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
224224
int64_t data_size = 1000;
@@ -323,7 +323,7 @@ using namespace plasma;
323323
int main(int argc, char** argv) {
324324
// Start up and connect a Plasma client.
325325
PlasmaClient client;
326-
ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
326+
ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
327327
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
328328
ObjectBuffer object_buffer;
329329
ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));
@@ -411,7 +411,7 @@ using namespace plasma;
411411
int main(int argc, char** argv) {
412412
// Start up and connect a Plasma client.
413413
PlasmaClient client;
414-
ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
414+
ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
415415
416416
int fd;
417417
ARROW_CHECK_OK(client.Subscribe(&fd));

cpp/src/plasma/client.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -703,8 +703,8 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
703703
const int num_threads = kThreadPoolSize;
704704
uint64_t threadhash[num_threads + 1];
705705
const uint64_t data_address = reinterpret_cast<uint64_t>(data);
706-
const uint64_t num_blocks = nbytes / BLOCK_SIZE;
707-
const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
706+
const uint64_t num_blocks = nbytes / kBlockSize;
707+
const uint64_t chunk_size = (num_blocks / num_threads) * kBlockSize;
708708
const uint64_t right_address = data_address + chunk_size * num_threads;
709709
const uint64_t suffix = (data_address + nbytes) - right_address;
710710
// Now the data layout is | k * num_threads * block_size | suffix | ==

cpp/src/plasma/client.h

+10-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@ using arrow::Status;
3434

3535
namespace plasma {
3636

37-
#define PLASMA_DEFAULT_RELEASE_DELAY 64
37+
ARROW_DEPRECATED("PLASMA_DEFAULT_RELEASE_DELAY is deprecated")
38+
constexpr int64_t kDeprecatedPlasmaDefaultReleaseDelay = 64;
39+
#define PLASMA_DEFAULT_RELEASE_DELAY kDeprecatedPlasmaDefaultReleaseDelay;
40+
41+
/// We keep a queue of unreleased objects cached in the client until we start
42+
/// sending release requests to the store. This is to avoid frequently mapping
43+
/// and unmapping objects and evicting data from processor caches.
44+
constexpr int64_t kPlasmaDefaultReleaseDelay = 64;
3845

3946
/// Object buffer data structure.
4047
struct ObjectBuffer {
@@ -64,8 +71,8 @@ class ARROW_EXPORT PlasmaClient {
6471
/// \param num_retries number of attempts to connect to IPC socket, default 50
6572
/// \return The return status.
6673
Status Connect(const std::string& store_socket_name,
67-
const std::string& manager_socket_name, int release_delay,
68-
int num_retries = -1);
74+
const std::string& manager_socket_name,
75+
int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
6976

7077
/// Create an object in the Plasma Store. Any metadata for this object must be
7178
/// be passed in when the object is created.

cpp/src/plasma/format/plasma.fbs

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
// Plasma protocol specification
1919

2020
enum MessageType:int {
21+
// Message that gets send when a client hangs up.
22+
PlasmaDisconnectClient = 0,
2123
// Create a new object.
22-
PlasmaCreateRequest = 1,
24+
PlasmaCreateRequest,
2325
PlasmaCreateReply,
2426
PlasmaAbortRequest,
2527
PlasmaAbortReply,

cpp/src/plasma/io.cc

+23-20
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
#include "arrow/status.h"
2525

2626
#include "plasma/common.h"
27+
#include "plasma/plasma_generated.h"
2728

2829
using arrow::Status;
2930

30-
/* Number of times we try connecting to a socket. */
31-
#define NUM_CONNECT_ATTEMPTS 50
32-
#define CONNECT_TIMEOUT_MS 100
31+
/// Number of times we try connecting to a socket.
32+
constexpr int64_t kNumConnectAttempts = 50;
33+
/// Time to wait between connection attempts to a socket.
34+
constexpr int64_t kConnectTimeoutMs = 100;
3335

3436
namespace plasma {
3537

@@ -38,8 +40,8 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
3840
size_t bytesleft = length;
3941
size_t offset = 0;
4042
while (bytesleft > 0) {
41-
/* While we haven't written the whole message, write to the file descriptor,
42-
* advance the cursor, and decrease the amount left to write. */
43+
// While we haven't written the whole message, write to the file descriptor,
44+
// advance the cursor, and decrease the amount left to write.
4345
nbytes = write(fd, cursor + offset, bytesleft);
4446
if (nbytes < 0) {
4547
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
@@ -58,7 +60,7 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
5860
}
5961

6062
Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
61-
int64_t version = PLASMA_PROTOCOL_VERSION;
63+
int64_t version = kPlasmaProtocolVersion;
6264
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
6365
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
6466
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
@@ -67,7 +69,7 @@ Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
6769

6870
Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
6971
ssize_t nbytes = 0;
70-
/* Termination condition: EOF or read 'length' bytes total. */
72+
// Termination condition: EOF or read 'length' bytes total.
7173
size_t bytesleft = length;
7274
size_t offset = 0;
7375
while (bytesleft > 0) {
@@ -91,20 +93,21 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
9193
Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
9294
int64_t version;
9395
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
94-
*type = DISCONNECT_CLIENT);
95-
ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
96+
*type = MessageType_PlasmaDisconnectClient);
97+
ARROW_CHECK(version == kPlasmaProtocolVersion) << "version = " << version;
9698
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
97-
*type = DISCONNECT_CLIENT);
99+
*type = MessageType_PlasmaDisconnectClient);
98100
int64_t length_temp;
99101
RETURN_NOT_OK_ELSE(
100102
ReadBytes(fd, reinterpret_cast<uint8_t*>(&length_temp), sizeof(length_temp)),
101-
*type = DISCONNECT_CLIENT);
103+
*type = MessageType_PlasmaDisconnectClient);
102104
// The length must be read as an int64_t, but it should be used as a size_t.
103105
size_t length = static_cast<size_t>(length_temp);
104106
if (length > buffer->size()) {
105107
buffer->resize(length);
106108
}
107-
RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
109+
RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length),
110+
*type = MessageType_PlasmaDisconnectClient);
108111
return Status::OK();
109112
}
110113

@@ -115,7 +118,7 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
115118
ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
116119
return -1;
117120
}
118-
/* Tell the system to allow the port to be reused. */
121+
// Tell the system to allow the port to be reused.
119122
int on = 1;
120123
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
121124
sizeof(on)) < 0) {
@@ -150,23 +153,23 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
150153

151154
Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
152155
int64_t timeout, int* fd) {
153-
/* Pick the default values if the user did not specify. */
156+
// Pick the default values if the user did not specify.
154157
if (num_retries < 0) {
155-
num_retries = NUM_CONNECT_ATTEMPTS;
158+
num_retries = kNumConnectAttempts;
156159
}
157160
if (timeout < 0) {
158-
timeout = CONNECT_TIMEOUT_MS;
161+
timeout = kConnectTimeoutMs;
159162
}
160163
*fd = connect_ipc_sock(pathname);
161164
while (*fd < 0 && num_retries > 0) {
162165
ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
163166
<< ", retrying " << num_retries << " more times";
164-
/* Sleep for timeout milliseconds. */
167+
// Sleep for timeout milliseconds.
165168
usleep(static_cast<int>(timeout * 1000));
166169
*fd = connect_ipc_sock(pathname);
167170
--num_retries;
168171
}
169-
/* If we could not connect to the socket, exit. */
172+
// If we could not connect to the socket, exit.
170173
if (*fd == -1) {
171174
std::stringstream ss;
172175
ss << "Could not connect to socket " << pathname;
@@ -215,15 +218,15 @@ std::unique_ptr<uint8_t[]> read_message_async(int sock) {
215218
int64_t size;
216219
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
217220
if (!s.ok()) {
218-
/* The other side has closed the socket. */
221+
// The other side has closed the socket.
219222
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
220223
close(sock);
221224
return NULL;
222225
}
223226
auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]);
224227
s = ReadBytes(sock, message.get(), size);
225228
if (!s.ok()) {
226-
/* The other side has closed the socket. */
229+
// The other side has closed the socket.
227230
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
228231
close(sock);
229232
return NULL;

cpp/src/plasma/io.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@
3030
#include "arrow/status.h"
3131
#include "plasma/compat.h"
3232

33+
namespace plasma {
34+
3335
// TODO(pcm): Replace our own custom message header (message type,
3436
// message length, plasma protocol verion) with one that is serialized
3537
// using flatbuffers.
36-
#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
37-
#define DISCONNECT_CLIENT 0
38-
39-
namespace plasma {
38+
constexpr int64_t kPlasmaProtocolVersion = 0x0000000000000000;
4039

4140
using arrow::Status;
4241

cpp/src/plasma/plasma.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
#include "arrow/status.h"
3838
#include "arrow/util/logging.h"
39+
#include "arrow/util/macros.h"
3940
#include "plasma/common.h"
4041
#include "plasma/common_generated.h"
4142

@@ -65,7 +66,7 @@ namespace plasma {
6566
} while (0);
6667

6768
/// Allocation granularity used in plasma for object allocation.
68-
#define BLOCK_SIZE 64
69+
constexpr int64_t kBlockSize = 64;
6970

7071
struct Client;
7172

cpp/src/plasma/store.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
170170
// 64-byte aligned, but in practice it often will be.
171171
if (device_num == 0) {
172172
pointer =
173-
reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size + metadata_size));
173+
reinterpret_cast<uint8_t*>(dlmemalign(kBlockSize, data_size + metadata_size));
174174
if (pointer == NULL) {
175175
// Tell the eviction policy how much space we need to create this object.
176176
std::vector<ObjectID> objects_to_evict;
@@ -741,7 +741,7 @@ Status PlasmaStore::process_message(Client* client) {
741741
HANDLE_SIGPIPE(SendConnectReply(client->fd, store_info_.memory_capacity),
742742
client->fd);
743743
} break;
744-
case DISCONNECT_CLIENT:
744+
case MessageType_PlasmaDisconnectClient:
745745
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
746746
disconnect_client(client->fd);
747747
break;
@@ -768,7 +768,7 @@ class PlasmaStoreRunner {
768768
// achieve that by mallocing and freeing a single large amount of space.
769769
// that maximum allowed size up front.
770770
if (use_one_memory_mapped_file) {
771-
void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory);
771+
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
772772
ARROW_CHECK(pointer != NULL);
773773
plasma::dlfree(pointer);
774774
}

cpp/src/plasma/test/client_tests.cc

+2-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,8 @@ class TestPlasmaStore : public ::testing::Test {
5959
"/plasma_store -m 1000000000 -s /tmp/store" +
6060
store_index + " 1> /dev/null 2> /dev/null &";
6161
system(plasma_command.c_str());
62-
ARROW_CHECK_OK(
63-
client_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
64-
ARROW_CHECK_OK(
65-
client2_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
62+
ARROW_CHECK_OK(client_.Connect("/tmp/store" + store_index, ""));
63+
ARROW_CHECK_OK(client2_.Connect("/tmp/store" + store_index, ""));
6664
}
6765
virtual void TearDown() {
6866
ARROW_CHECK_OK(client_.Disconnect());

0 commit comments

Comments
 (0)