28
28
29
29
using arrow::Status;
30
30
31
- /* Number of times we try connecting to a socket. */
32
- #define NUM_CONNECT_ATTEMPTS 50
33
- #define CONNECT_TIMEOUT_MS 100
31
+ // / Number of times we try connecting to a socket.
32
+ constexpr int64_t kNumConnectAttempts = 50 ;
33
+ // / Time to wait between connection attempts.
34
+ constexpr int64_t kConnectTimeoutMs = 100 ;
34
35
35
36
namespace plasma {
36
37
@@ -39,8 +40,8 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
39
40
size_t bytesleft = length;
40
41
size_t offset = 0 ;
41
42
while (bytesleft > 0 ) {
42
- /* While we haven't written the whole message, write to the file descriptor,
43
- * advance the cursor, and decrease the amount left to write. */
43
+ // While we haven't written the whole message, write to the file descriptor,
44
+ // advance the cursor, and decrease the amount left to write.
44
45
nbytes = write (fd, cursor + offset, bytesleft);
45
46
if (nbytes < 0 ) {
46
47
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
@@ -68,7 +69,7 @@ Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
68
69
69
70
Status ReadBytes (int fd, uint8_t * cursor, size_t length) {
70
71
ssize_t nbytes = 0 ;
71
- /* Termination condition: EOF or read 'length' bytes total. */
72
+ // Termination condition: EOF or read 'length' bytes total.
72
73
size_t bytesleft = length;
73
74
size_t offset = 0 ;
74
75
while (bytesleft > 0 ) {
@@ -117,7 +118,7 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
117
118
ARROW_LOG (ERROR) << " socket() failed for pathname " << pathname;
118
119
return -1 ;
119
120
}
120
- /* Tell the system to allow the port to be reused. */
121
+ // Tell the system to allow the port to be reused.
121
122
int on = 1 ;
122
123
if (setsockopt (socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast <char *>(&on),
123
124
sizeof (on)) < 0 ) {
@@ -152,23 +153,23 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
152
153
153
154
Status ConnectIpcSocketRetry (const std::string& pathname, int num_retries,
154
155
int64_t timeout, int * fd) {
155
- /* Pick the default values if the user did not specify. */
156
+ // Pick the default values if the user did not specify.
156
157
if (num_retries < 0 ) {
157
- num_retries = NUM_CONNECT_ATTEMPTS ;
158
+ num_retries = kNumConnectAttempts ;
158
159
}
159
160
if (timeout < 0 ) {
160
- timeout = CONNECT_TIMEOUT_MS ;
161
+ timeout = kConnectTimeoutMs ;
161
162
}
162
163
*fd = connect_ipc_sock (pathname);
163
164
while (*fd < 0 && num_retries > 0 ) {
164
165
ARROW_LOG (ERROR) << " Connection to IPC socket failed for pathname " << pathname
165
166
<< " , retrying " << num_retries << " more times" ;
166
- /* Sleep for timeout milliseconds. */
167
+ // Sleep for timeout milliseconds.
167
168
usleep (static_cast <int >(timeout * 1000 ));
168
169
*fd = connect_ipc_sock (pathname);
169
170
--num_retries;
170
171
}
171
- /* If we could not connect to the socket, exit. */
172
+ // If we could not connect to the socket, exit.
172
173
if (*fd == -1 ) {
173
174
std::stringstream ss;
174
175
ss << " Could not connect to socket " << pathname;
@@ -217,15 +218,15 @@ std::unique_ptr<uint8_t[]> read_message_async(int sock) {
217
218
int64_t size;
218
219
Status s = ReadBytes (sock, reinterpret_cast <uint8_t *>(&size), sizeof (int64_t ));
219
220
if (!s.ok ()) {
220
- /* The other side has closed the socket. */
221
+ // The other side has closed the socket.
221
222
ARROW_LOG (DEBUG) << " Socket has been closed, or some other error has occurred." ;
222
223
close (sock);
223
224
return NULL ;
224
225
}
225
226
auto message = std::unique_ptr<uint8_t []>(new uint8_t [size]);
226
227
s = ReadBytes (sock, message.get (), size);
227
228
if (!s.ok ()) {
228
- /* The other side has closed the socket. */
229
+ // The other side has closed the socket.
229
230
ARROW_LOG (DEBUG) << " Socket has been closed, or some other error has occurred." ;
230
231
close (sock);
231
232
return NULL ;
0 commit comments