Skip to content

Commit b916c79

Browse files
zhijunfupcmoritz
authored andcommitted
ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer
use unique_ptr to replace raw pointer, so that allocated memory can be freed automatically Author: Zhijun Fu <[email protected]> Closes #1993 from zhijunfu/improve-code and squashes the following commits: 3c69ada <Zhijun Fu> fix format check 6bfebc2 <Zhijun Fu> fix lint b5b2fac <Zhijun Fu> fix build on travis-ci d4d64b0 <Zhijun Fu> Merge branch 'master' of https://github.com/zhijunfu/arrow into improve-code 84b7e37 <Zhijun Fu> Use unique_ptr instead of raw pointer
1 parent 73f0d8e commit b916c79

File tree

7 files changed

+24
-37
lines changed

7 files changed

+24
-37
lines changed

cpp/src/plasma/client.cc

+2-3
Original file line numberDiff line numberDiff line change
@@ -874,11 +874,11 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
874874

875875
Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
876876
int64_t* data_size, int64_t* metadata_size) {
877-
uint8_t* notification = read_message_async(fd);
877+
auto notification = read_message_async(fd);
878878
if (notification == NULL) {
879879
return Status::IOError("Failed to read object notification from Plasma socket");
880880
}
881-
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
881+
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification.get());
882882
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
883883
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
884884
if (object_info->is_deletion()) {
@@ -888,7 +888,6 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
888888
*data_size = object_info->data_size();
889889
*metadata_size = object_info->metadata_size();
890890
}
891-
delete[] notification;
892891
return Status::OK();
893892
}
894893

cpp/src/plasma/io.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "plasma/io.h"
1919

2020
#include <cstdint>
21+
#include <memory>
2122
#include <sstream>
2223

2324
#include "arrow/status.h"
@@ -210,7 +211,7 @@ int AcceptClient(int socket_fd) {
210211
return client_fd;
211212
}
212213

213-
uint8_t* read_message_async(int sock) {
214+
std::unique_ptr<uint8_t[]> read_message_async(int sock) {
214215
int64_t size;
215216
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
216217
if (!s.ok()) {
@@ -219,10 +220,9 @@ uint8_t* read_message_async(int sock) {
219220
close(sock);
220221
return NULL;
221222
}
222-
uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
223-
s = ReadBytes(sock, message, size);
223+
auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]);
224+
s = ReadBytes(sock, message.get(), size);
224225
if (!s.ok()) {
225-
free(message);
226226
/* The other side has closed the socket. */
227227
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
228228
close(sock);

cpp/src/plasma/io.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <sys/un.h>
2424
#include <unistd.h>
2525

26+
#include <memory>
2627
#include <string>
2728
#include <vector>
2829

@@ -56,7 +57,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
5657

5758
int AcceptClient(int socket_fd);
5859

59-
uint8_t* read_message_async(int sock);
60+
std::unique_ptr<uint8_t[]> read_message_async(int sock);
6061

6162
} // namespace plasma
6263

cpp/src/plasma/plasma.cc

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ int warn_if_sigpipe(int status, int client_sock) {
5151
* @return The object info buffer. It is the caller's responsibility to free
5252
* this buffer with "delete" after it has been used.
5353
*/
54-
uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
54+
std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info) {
5555
flatbuffers::FlatBufferBuilder fbb;
5656
auto message = CreateObjectInfo(fbb, object_info);
5757
fbb.Finish(message);
58-
uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
59-
*(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
60-
memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
58+
auto notification =
59+
std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
60+
*(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
61+
memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
6162
return notification;
6263
}
6364

cpp/src/plasma/plasma.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info,
179179
/// @return The errno set.
180180
int warn_if_sigpipe(int status, int client_sock);
181181

182-
uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
182+
std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info);
183183

184184
} // namespace plasma
185185

cpp/src/plasma/store.cc

+9-23
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,7 @@ PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string dir
118118
}
119119

120120
// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
121-
PlasmaStore::~PlasmaStore() {
122-
for (const auto& element : pending_notifications_) {
123-
auto object_notifications = element.second.object_notifications;
124-
for (size_t i = 0; i < object_notifications.size(); ++i) {
125-
uint8_t* notification = reinterpret_cast<uint8_t*>(object_notifications.at(i));
126-
uint8_t* data = notification;
127-
// TODO(pcm): Get rid of this delete.
128-
delete[] data;
129-
}
130-
}
131-
}
121+
PlasmaStore::~PlasmaStore() {}
132122

133123
const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; }
134124

@@ -322,11 +312,11 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
322312
}
323313

324314
void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
325-
std::vector<GetRequest*>& get_requests = object_get_requests_[object_id];
315+
auto& get_requests = object_get_requests_[object_id];
326316
size_t index = 0;
327317
size_t num_requests = get_requests.size();
328318
for (size_t i = 0; i < num_requests; ++i) {
329-
GetRequest* get_req = get_requests[index];
319+
auto get_req = get_requests[index];
330320
auto entry = get_object_table_entry(&store_info_, object_id);
331321
ARROW_CHECK(entry != NULL);
332322

@@ -356,7 +346,7 @@ void PlasmaStore::process_get_request(Client* client,
356346
const std::vector<ObjectID>& object_ids,
357347
int64_t timeout_ms) {
358348
// Create a get request for this object.
359-
GetRequest* get_req = new GetRequest(client, object_ids);
349+
auto get_req = new GetRequest(client, object_ids);
360350

361351
for (auto object_id : object_ids) {
362352
// Check if this object is already present locally. If so, record that the
@@ -582,13 +572,12 @@ void PlasmaStore::send_notifications(int client_fd) {
582572
// Loop over the array of pending notifications and send as many of them as
583573
// possible.
584574
for (size_t i = 0; i < it->second.object_notifications.size(); ++i) {
585-
uint8_t* notification =
586-
reinterpret_cast<uint8_t*>(it->second.object_notifications.at(i));
575+
auto& notification = it->second.object_notifications.at(i);
587576
// Decode the length, which is the first bytes of the message.
588-
int64_t size = *(reinterpret_cast<int64_t*>(notification));
577+
int64_t size = *(reinterpret_cast<int64_t*>(notification.get()));
589578

590579
// Attempt to send a notification about this object ID.
591-
ssize_t nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
580+
ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0);
592581
if (nbytes >= 0) {
593582
ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
594583
} else if (nbytes == -1 &&
@@ -613,9 +602,6 @@ void PlasmaStore::send_notifications(int client_fd) {
613602
}
614603
}
615604
num_processed += 1;
616-
// The corresponding malloc happened in create_object_info_buffer
617-
// within push_notification.
618-
delete[] notification;
619605
}
620606
// Remove the sent notifications from the array.
621607
it->second.object_notifications.erase(
@@ -636,8 +622,8 @@ void PlasmaStore::send_notifications(int client_fd) {
636622

637623
void PlasmaStore::push_notification(ObjectInfoT* object_info) {
638624
for (auto& element : pending_notifications_) {
639-
uint8_t* notification = create_object_info_buffer(object_info);
640-
element.second.object_notifications.push_back(notification);
625+
auto notification = create_object_info_buffer(object_info);
626+
element.second.object_notifications.emplace_back(std::move(notification));
641627
send_notifications(element.first);
642628
// The notification gets freed in send_notifications when the notification
643629
// is sent over the socket.

cpp/src/plasma/store.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ struct GetRequest;
3737
struct NotificationQueue {
3838
/// The object notifications for clients. We notify the client about the
3939
/// objects in the order that the objects were sealed or deleted.
40-
std::deque<uint8_t*> object_notifications;
40+
std::deque<std::unique_ptr<uint8_t[]>> object_notifications;
4141
};
4242

4343
/// Contains all information that is associated with a Plasma store client.

0 commit comments

Comments
 (0)