Skip to content

Commit 176b7c6

Browse files
committed
clean up macros
1 parent b916c79 commit 176b7c6

File tree

8 files changed

+30
-21
lines changed

8 files changed

+30
-21
lines changed

cpp/src/plasma/client.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -703,8 +703,8 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
703703
const int num_threads = kThreadPoolSize;
704704
uint64_t threadhash[num_threads + 1];
705705
const uint64_t data_address = reinterpret_cast<uint64_t>(data);
706-
const uint64_t num_blocks = nbytes / BLOCK_SIZE;
707-
const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
706+
const uint64_t num_blocks = nbytes / kBlockSize;
707+
const uint64_t chunk_size = (num_blocks / num_threads) * kBlockSize;
708708
const uint64_t right_address = data_address + chunk_size * num_threads;
709709
const uint64_t suffix = (data_address + nbytes) - right_address;
710710
// Now the data layout is | k * num_threads * block_size | suffix | ==

cpp/src/plasma/client.h

+9-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@ using arrow::Status;
3434

3535
namespace plasma {
3636

37-
#define PLASMA_DEFAULT_RELEASE_DELAY 64
37+
ARROW_DEPRECATED("PLASMA_DEFAULT_RELEASE_DELAY is deprecated")
38+
constexpr int64_t kDeprecatedPlasmaDefaultReleaseDelay = 64;
39+
#define PLASMA_DEFAULT_RELEASE_DELAY kDeprecatedPlasmaDefaultReleaseDelay;
40+
41+
/// Number of objects that will be kept unreleased in the plasma client
42+
/// until we send a message to the store to release the object.
43+
constexpr int64_t kPlasmaDefaultReleaseDelay = 64;
3844

3945
/// Object buffer data structure.
4046
struct ObjectBuffer {
@@ -64,7 +70,8 @@ class ARROW_EXPORT PlasmaClient {
6470
/// \param num_retries number of attempts to connect to IPC socket, default 50
6571
/// \return The return status.
6672
Status Connect(const std::string& store_socket_name,
67-
const std::string& manager_socket_name, int release_delay,
73+
const std::string& manager_socket_name,
74+
int release_delay = kPlasmaDefaultReleaseDelay,
6875
int num_retries = -1);
6976

7077
/// Create an object in the Plasma Store. Any metadata for this object must be

cpp/src/plasma/format/plasma.fbs

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
// Plasma protocol specification
1919

2020
enum MessageType:int {
21+
PlasmaDisconnectClient = 0,
2122
// Create a new object.
22-
PlasmaCreateRequest = 1,
23+
PlasmaCreateRequest,
2324
PlasmaCreateReply,
2425
PlasmaAbortRequest,
2526
PlasmaAbortReply,

cpp/src/plasma/io.cc

+7-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "arrow/status.h"
2525

2626
#include "plasma/common.h"
27+
#include "plasma/plasma_generated.h"
2728

2829
using arrow::Status;
2930

@@ -58,7 +59,7 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
5859
}
5960

6061
Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
61-
int64_t version = PLASMA_PROTOCOL_VERSION;
62+
int64_t version = kPlasmaProtocolVersion;
6263
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
6364
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
6465
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
@@ -91,20 +92,20 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
9192
Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
9293
int64_t version;
9394
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
94-
*type = DISCONNECT_CLIENT);
95-
ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
95+
*type = MessageType_PlasmaDisconnectClient);
96+
ARROW_CHECK(version == kPlasmaProtocolVersion) << "version = " << version;
9697
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
97-
*type = DISCONNECT_CLIENT);
98+
*type = MessageType_PlasmaDisconnectClient);
9899
int64_t length_temp;
99100
RETURN_NOT_OK_ELSE(
100101
ReadBytes(fd, reinterpret_cast<uint8_t*>(&length_temp), sizeof(length_temp)),
101-
*type = DISCONNECT_CLIENT);
102+
*type = MessageType_PlasmaDisconnectClient);
102103
// The length must be read as an int64_t, but it should be used as a size_t.
103104
size_t length = static_cast<size_t>(length_temp);
104105
if (length > buffer->size()) {
105106
buffer->resize(length);
106107
}
107-
RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
108+
RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = MessageType_PlasmaDisconnectClient);
108109
return Status::OK();
109110
}
110111

cpp/src/plasma/io.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@
3030
#include "arrow/status.h"
3131
#include "plasma/compat.h"
3232

33+
namespace plasma {
34+
3335
// TODO(pcm): Replace our own custom message header (message type,
3436
// message length, plasma protocol verion) with one that is serialized
3537
// using flatbuffers.
36-
#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
37-
#define DISCONNECT_CLIENT 0
38-
39-
namespace plasma {
38+
constexpr int64_t kPlasmaProtocolVersion = 0x0000000000000000;
4039

4140
using arrow::Status;
4241

cpp/src/plasma/plasma.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
#include "arrow/status.h"
3838
#include "arrow/util/logging.h"
39+
#include "arrow/util/macros.h"
3940
#include "plasma/common.h"
4041
#include "plasma/common_generated.h"
4142

@@ -65,7 +66,7 @@ namespace plasma {
6566
} while (0);
6667

6768
/// Allocation granularity used in plasma for object allocation.
68-
#define BLOCK_SIZE 64
69+
constexpr int64_t kBlockSize = 64;
6970

7071
struct Client;
7172

cpp/src/plasma/store.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
170170
// 64-byte aligned, but in practice it often will be.
171171
if (device_num == 0) {
172172
pointer =
173-
reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size + metadata_size));
173+
reinterpret_cast<uint8_t*>(dlmemalign(kBlockSize, data_size + metadata_size));
174174
if (pointer == NULL) {
175175
// Tell the eviction policy how much space we need to create this object.
176176
std::vector<ObjectID> objects_to_evict;
@@ -741,7 +741,7 @@ Status PlasmaStore::process_message(Client* client) {
741741
HANDLE_SIGPIPE(SendConnectReply(client->fd, store_info_.memory_capacity),
742742
client->fd);
743743
} break;
744-
case DISCONNECT_CLIENT:
744+
case MessageType_PlasmaDisconnectClient:
745745
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
746746
disconnect_client(client->fd);
747747
break;
@@ -768,7 +768,7 @@ class PlasmaStoreRunner {
768768
// achieve that by mallocing and freeing a single large amount of space.
769769
// that maximum allowed size up front.
770770
if (use_one_memory_mapped_file) {
771-
void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory);
771+
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
772772
ARROW_CHECK(pointer != NULL);
773773
plasma::dlfree(pointer);
774774
}

cpp/src/plasma/test/client_tests.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class TestPlasmaStore : public ::testing::Test {
6060
store_index + " 1> /dev/null 2> /dev/null &";
6161
system(plasma_command.c_str());
6262
ARROW_CHECK_OK(
63-
client_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
63+
client_.Connect("/tmp/store" + store_index, ""));
6464
ARROW_CHECK_OK(
65-
client2_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
65+
client2_.Connect("/tmp/store" + store_index, ""));
6666
}
6767
virtual void TearDown() {
6868
ARROW_CHECK_OK(client_.Disconnect());

0 commit comments

Comments
 (0)