@@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info
124
124
125
125
// If this client is not already using the object, add the client to the
126
126
// object's list of clients, otherwise do nothing.
127
- void PlasmaStore::add_client_to_object_clients (ObjectTableEntry* entry, Client* client) {
127
+ void PlasmaStore::add_to_client_object_ids (ObjectTableEntry* entry, Client* client) {
128
128
// Check if this client is already using the object.
129
- if (entry-> clients .find (client ) != entry-> clients .end ()) {
129
+ if (client-> object_ids .find (entry-> object_id ) != client-> object_ids .end ()) {
130
130
return ;
131
131
}
132
132
// If there are no other clients using this object, notify the eviction policy
133
133
// that the object is being used.
134
- if (entry->clients . size () == 0 ) {
134
+ if (entry->ref_count == 0 ) {
135
135
// Tell the eviction policy that this object is being used.
136
136
std::vector<ObjectID> objects_to_evict;
137
137
eviction_policy_.begin_object_access (entry->object_id , &objects_to_evict);
138
138
delete_objects (objects_to_evict);
139
139
}
140
- // Add the client pointer to the list of clients using this object.
141
- entry->clients .insert (client);
140
+ // Increase reference count.
141
+ entry->ref_count ++;
142
+
143
+ // Add object id to the list of object ids that this client is using.
144
+ client->object_ids .insert (entry->object_id );
142
145
}
143
146
144
147
// Create a new object buffer in the hash table.
@@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
225
228
result->metadata_size = metadata_size;
226
229
result->device_num = device_num;
227
230
// Notify the eviction policy that this object was created. This must be done
228
- // immediately before the call to add_client_to_object_clients so that the
231
+ // immediately before the call to add_to_client_object_ids so that the
229
232
// eviction policy does not have an opportunity to evict the object.
230
233
eviction_policy_.object_created (object_id);
231
234
// Record that this client is using this object.
232
- add_client_to_object_clients (store_info_.objects [object_id].get (), client);
235
+ add_to_client_object_ids (store_info_.objects [object_id].get (), client);
233
236
return PlasmaError_OK;
234
237
}
235
238
@@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
324
327
get_req->num_satisfied += 1 ;
325
328
// Record the fact that this client will be using this object and will
326
329
// be responsible for releasing this object.
327
- add_client_to_object_clients (entry, get_req->client );
330
+ add_to_client_object_ids (entry, get_req->client );
328
331
329
332
// If this get request is done, reply to the client.
330
333
if (get_req->num_satisfied == get_req->num_objects_to_wait_for ) {
@@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client,
358
361
get_req->num_satisfied += 1 ;
359
362
// If necessary, record that this client is using this object. In the case
360
363
// where entry == NULL, this will be called from seal_object.
361
- add_client_to_object_clients (entry, client);
364
+ add_to_client_object_ids (entry, client);
362
365
} else {
363
366
// Add a placeholder plasma object to the get request to indicate that the
364
367
// object is not present. This will be parsed by the client. We set the
@@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client,
383
386
}
384
387
}
385
388
386
- int PlasmaStore::remove_client_from_object_clients (ObjectTableEntry* entry,
387
- Client* client) {
388
- auto it = entry->clients .find (client);
389
- if (it != entry->clients .end ()) {
390
- entry->clients .erase (it);
389
+ int PlasmaStore::remove_from_client_object_ids (ObjectTableEntry* entry, Client* client) {
390
+ auto it = client->object_ids .find (entry->object_id );
391
+ if (it != client->object_ids .end ()) {
392
+ client->object_ids .erase (it);
393
+ // Decrease reference count.
394
+ entry->ref_count --;
395
+
391
396
// If no more clients are using this object, notify the eviction policy
392
397
// that the object is no longer being used.
393
- if (entry->clients . size () == 0 ) {
398
+ if (entry->ref_count == 0 ) {
394
399
// Tell the eviction policy that this object is no longer being used.
395
400
std::vector<ObjectID> objects_to_evict;
396
401
eviction_policy_.end_object_access (entry->object_id , &objects_to_evict);
@@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
408
413
auto entry = get_object_table_entry (&store_info_, object_id);
409
414
ARROW_CHECK (entry != NULL );
410
415
// Remove the client from the object's array of clients.
411
- ARROW_CHECK (remove_client_from_object_clients (entry, client) == 1 );
416
+ ARROW_CHECK (remove_from_client_object_ids (entry, client) == 1 );
412
417
}
413
418
414
419
// Check if an object is present.
@@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
439
444
ARROW_CHECK (entry != NULL ) << " To abort an object it must be in the object table." ;
440
445
ARROW_CHECK (entry->state != PLASMA_SEALED)
441
446
<< " To abort an object it must not have been sealed." ;
442
- auto it = entry-> clients .find (client );
443
- if (it == entry-> clients .end ()) {
447
+ auto it = client-> object_ids .find (object_id );
448
+ if (it == client-> object_ids .end ()) {
444
449
// If the client requesting the abort is not the creator, do not
445
450
// perform the abort.
446
451
return 0 ;
@@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) {
466
471
return PlasmaError_ObjectNotSealed;
467
472
}
468
473
469
- if (entry->clients . size () != 0 ) {
474
+ if (entry->ref_count != 0 ) {
470
475
// To delete an object, there must be no clients currently using it.
471
476
return PlasmaError_ObjectInUse;
472
477
}
@@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
493
498
ARROW_CHECK (entry != NULL ) << " To delete an object it must be in the object table." ;
494
499
ARROW_CHECK (entry->state == PLASMA_SEALED)
495
500
<< " To delete an object it must have been sealed." ;
496
- ARROW_CHECK (entry->clients . size () == 0 )
501
+ ARROW_CHECK (entry->ref_count == 0 )
497
502
<< " To delete an object, there must be no clients currently using it." ;
498
503
store_info_.objects .erase (object_id);
499
504
// Inform all subscribers that the object has been deleted.
@@ -529,23 +534,27 @@ void PlasmaStore::disconnect_client(int client_fd) {
529
534
// Close the socket.
530
535
close (client_fd);
531
536
ARROW_LOG (INFO) << " Disconnecting client on fd " << client_fd;
532
- // If this client was using any objects, remove it from the appropriate
533
- // lists.
534
- // TODO(swang): Avoid iteration through the object table.
537
+ // Release all the objects that the client was using.
535
538
auto client = it->second .get ();
536
- std::vector<ObjectID> unsealed_objects;
537
- for (const auto & entry : store_info_.objects ) {
538
- if (entry.second ->state == PLASMA_SEALED) {
539
- remove_client_from_object_clients (entry.second .get (), client);
539
+ std::vector<ObjectTableEntry*> sealed_objects;
540
+ for (const auto & object_id : client->object_ids ) {
541
+ auto it = store_info_.objects .find (object_id);
542
+ if (it == store_info_.objects .end ()) {
543
+ continue ;
544
+ }
545
+
546
+ if (it->second ->state == PLASMA_SEALED) {
547
+ // Add sealed objects to a temporary list of object IDs. Do not perform
548
+ // the remove here, since it potentially modifies the object_ids table.
549
+ sealed_objects.push_back (it->second .get ());
540
550
} else {
541
- // Add unsealed objects to a temporary list of object IDs. Do not perform
542
- // the abort here, since it potentially modifies the object table.
543
- unsealed_objects.push_back (entry.first );
551
+ // Abort unsealed object.
552
+ abort_object (it->first , client);
544
553
}
545
554
}
546
- // If the client was creating any objects, abort them.
547
- for (const auto & entry : unsealed_objects ) {
548
- abort_object (entry, client);
555
+
556
+ for (const auto & entry : sealed_objects ) {
557
+ remove_from_client_object_ids (entry, client);
549
558
}
550
559
551
560
// Note, the store may still attempt to send a message to the disconnected
0 commit comments