Skip to content

Commit 84b7e37

Browse files
committed
[Plasma] Use unique_ptr instead of raw pointer
1 parent fbce08d commit 84b7e37

File tree

7 files changed

+21
-35
lines changed

7 files changed

+21
-35
lines changed

cpp/src/plasma/client.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -872,13 +872,14 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
872872
return Status::OK();
873873
}
874874

875+
875876
Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
876877
int64_t* data_size, int64_t* metadata_size) {
877-
uint8_t* notification = read_message_async(fd);
878+
auto notification = read_message_async(fd);
878879
if (notification == NULL) {
879880
return Status::IOError("Failed to read object notification from Plasma socket");
880881
}
881-
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
882+
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification.get());
882883
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
883884
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
884885
if (object_info->is_deletion()) {
@@ -888,7 +889,6 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
888889
*data_size = object_info->data_size();
889890
*metadata_size = object_info->metadata_size();
890891
}
891-
delete[] notification;
892892
return Status::OK();
893893
}
894894

cpp/src/plasma/io.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ int AcceptClient(int socket_fd) {
210210
return client_fd;
211211
}
212212

213-
uint8_t* read_message_async(int sock) {
213+
std::unique_ptr<uint8_t[]> read_message_async(int sock) {
214214
int64_t size;
215215
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
216216
if (!s.ok()) {
@@ -219,10 +219,9 @@ uint8_t* read_message_async(int sock) {
219219
close(sock);
220220
return NULL;
221221
}
222-
uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
223-
s = ReadBytes(sock, message, size);
222+
auto message = std::unique_ptr<uint8_t[]> (new uint8_t[size]);
223+
s = ReadBytes(sock, message.get(), size);
224224
if (!s.ok()) {
225-
free(message);
226225
/* The other side has closed the socket. */
227226
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
228227
close(sock);

cpp/src/plasma/io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
5656

5757
int AcceptClient(int socket_fd);
5858

59-
uint8_t* read_message_async(int sock);
59+
std::unique_ptr<uint8_t[]> read_message_async(int sock);
6060

6161
} // namespace plasma
6262

cpp/src/plasma/plasma.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ int warn_if_sigpipe(int status, int client_sock) {
5151
* @return The object info buffer. It is the caller's responsibility to free
5252
* this buffer with "delete" after it has been used.
5353
*/
54-
uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
54+
std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info) {
5555
flatbuffers::FlatBufferBuilder fbb;
5656
auto message = CreateObjectInfo(fbb, object_info);
5757
fbb.Finish(message);
58-
uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
59-
*(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
60-
memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
58+
auto notification = std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
59+
*(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
60+
memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
6161
return notification;
6262
}
6363

cpp/src/plasma/plasma.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info,
179179
/// @return The errno set.
180180
int warn_if_sigpipe(int status, int client_sock);
181181

182-
uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
182+
std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info);
183183

184184
} // namespace plasma
185185

cpp/src/plasma/store.cc

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,6 @@ PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string dir
119119

120120
// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
121121
PlasmaStore::~PlasmaStore() {
122-
for (const auto& element : pending_notifications_) {
123-
auto object_notifications = element.second.object_notifications;
124-
for (size_t i = 0; i < object_notifications.size(); ++i) {
125-
uint8_t* notification = reinterpret_cast<uint8_t*>(object_notifications.at(i));
126-
uint8_t* data = notification;
127-
// TODO(pcm): Get rid of this delete.
128-
delete[] data;
129-
}
130-
}
131122
}
132123

133124
const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; }
@@ -322,11 +313,11 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
322313
}
323314

324315
void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
325-
std::vector<GetRequest*>& get_requests = object_get_requests_[object_id];
316+
auto& get_requests = object_get_requests_[object_id];
326317
size_t index = 0;
327318
size_t num_requests = get_requests.size();
328319
for (size_t i = 0; i < num_requests; ++i) {
329-
GetRequest* get_req = get_requests[index];
320+
auto get_req = get_requests[index];
330321
auto entry = get_object_table_entry(&store_info_, object_id);
331322
ARROW_CHECK(entry != NULL);
332323

@@ -356,7 +347,7 @@ void PlasmaStore::process_get_request(Client* client,
356347
const std::vector<ObjectID>& object_ids,
357348
int64_t timeout_ms) {
358349
// Create a get request for this object.
359-
GetRequest* get_req = new GetRequest(client, object_ids);
350+
auto get_req = new GetRequest(client, object_ids);
360351

361352
for (auto object_id : object_ids) {
362353
// Check if this object is already present locally. If so, record that the
@@ -582,13 +573,12 @@ void PlasmaStore::send_notifications(int client_fd) {
582573
// Loop over the array of pending notifications and send as many of them as
583574
// possible.
584575
for (size_t i = 0; i < it->second.object_notifications.size(); ++i) {
585-
uint8_t* notification =
586-
reinterpret_cast<uint8_t*>(it->second.object_notifications.at(i));
576+
auto& notification = it->second.object_notifications.at(i);
587577
// Decode the length, which is the first bytes of the message.
588-
int64_t size = *(reinterpret_cast<int64_t*>(notification));
578+
int64_t size = *(reinterpret_cast<int64_t*>(notification.get()));
589579

590580
// Attempt to send a notification about this object ID.
591-
ssize_t nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
581+
ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0);
592582
if (nbytes >= 0) {
593583
ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
594584
} else if (nbytes == -1 &&
@@ -613,9 +603,6 @@ void PlasmaStore::send_notifications(int client_fd) {
613603
}
614604
}
615605
num_processed += 1;
616-
// The corresponding malloc happened in create_object_info_buffer
617-
// within push_notification.
618-
delete[] notification;
619606
}
620607
// Remove the sent notifications from the array.
621608
it->second.object_notifications.erase(
@@ -636,8 +623,8 @@ void PlasmaStore::send_notifications(int client_fd) {
636623

637624
void PlasmaStore::push_notification(ObjectInfoT* object_info) {
638625
for (auto& element : pending_notifications_) {
639-
uint8_t* notification = create_object_info_buffer(object_info);
640-
element.second.object_notifications.push_back(notification);
626+
auto notification = create_object_info_buffer(object_info);
627+
element.second.object_notifications.emplace_back(std::move(notification));
641628
send_notifications(element.first);
642629
// The notification gets freed in send_notifications when the notification
643630
// is sent over the socket.

cpp/src/plasma/store.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ struct GetRequest;
3737
struct NotificationQueue {
3838
/// The object notifications for clients. We notify the client about the
3939
/// objects in the order that the objects were sealed or deleted.
40-
std::deque<uint8_t*> object_notifications;
40+
std::deque<std::unique_ptr<uint8_t[]>> object_notifications;
4141
};
4242

4343
/// Contains all information that is associated with a Plasma store client.

0 commit comments

Comments
 (0)