@@ -202,47 +202,55 @@ class ConcatenateImpl {
202
202
203
203
Status Visit (const FixedWidthType& fixed) {
204
204
// Handles numbers, decimal128, fixed_size_binary
205
- return ConcatenateBuffers (Buffers (1 , fixed), pool_).Value (&out_->buffers [1 ]);
205
+ ARROW_ASSIGN_OR_RAISE (auto buffers, Buffers (1 , fixed));
206
+ return ConcatenateBuffers (buffers, pool_).Value (&out_->buffers [1 ]);
206
207
}
207
208
208
209
Status Visit (const BinaryType&) {
209
210
std::vector<Range> value_ranges;
210
- RETURN_NOT_OK (ConcatenateOffsets<int32_t >(Buffers (1 , sizeof (int32_t )), pool_,
211
- &out_->buffers [1 ], &value_ranges));
212
- return ConcatenateBuffers (Buffers (2 , value_ranges), pool_).Value (&out_->buffers [2 ]);
211
+ ARROW_ASSIGN_OR_RAISE (auto index_buffers, Buffers (1 , sizeof (int32_t )));
212
+ RETURN_NOT_OK (ConcatenateOffsets<int32_t >(index_buffers, pool_, &out_->buffers [1 ],
213
+ &value_ranges));
214
+ ARROW_ASSIGN_OR_RAISE (auto value_buffers, Buffers (2 , value_ranges));
215
+ return ConcatenateBuffers (value_buffers, pool_).Value (&out_->buffers [2 ]);
213
216
}
214
217
215
218
Status Visit (const LargeBinaryType&) {
216
219
std::vector<Range> value_ranges;
217
- RETURN_NOT_OK (ConcatenateOffsets<int64_t >(Buffers (1 , sizeof (int64_t )), pool_,
218
- &out_->buffers [1 ], &value_ranges));
219
- return ConcatenateBuffers (Buffers (2 , value_ranges), pool_).Value (&out_->buffers [2 ]);
220
+ ARROW_ASSIGN_OR_RAISE (auto index_buffers, Buffers (1 , sizeof (int64_t )));
221
+ RETURN_NOT_OK (ConcatenateOffsets<int64_t >(index_buffers, pool_, &out_->buffers [1 ],
222
+ &value_ranges));
223
+ ARROW_ASSIGN_OR_RAISE (auto value_buffers, Buffers (2 , value_ranges));
224
+ return ConcatenateBuffers (value_buffers, pool_).Value (&out_->buffers [2 ]);
220
225
}
221
226
222
227
Status Visit (const ListType&) {
223
228
std::vector<Range> value_ranges;
224
- RETURN_NOT_OK (ConcatenateOffsets<int32_t >(Buffers (1 , sizeof (int32_t )), pool_,
225
- &out_->buffers [1 ], &value_ranges));
226
- return ConcatenateImpl (ChildData (0 , value_ranges), pool_)
227
- .Concatenate (&out_->child_data [0 ]);
229
+ ARROW_ASSIGN_OR_RAISE (auto index_buffers, Buffers (1 , sizeof (int32_t )));
230
+ RETURN_NOT_OK (ConcatenateOffsets<int32_t >(index_buffers, pool_, &out_->buffers [1 ],
231
+ &value_ranges));
232
+ ARROW_ASSIGN_OR_RAISE (auto child_data, ChildData (0 , value_ranges));
233
+ return ConcatenateImpl (child_data, pool_).Concatenate (&out_->child_data [0 ]);
228
234
}
229
235
230
236
Status Visit (const LargeListType&) {
231
237
std::vector<Range> value_ranges;
232
- RETURN_NOT_OK (ConcatenateOffsets<int64_t >(Buffers (1 , sizeof (int64_t )), pool_,
233
- &out_->buffers [1 ], &value_ranges));
234
- return ConcatenateImpl (ChildData (0 , value_ranges), pool_)
235
- .Concatenate (&out_->child_data [0 ]);
238
+ ARROW_ASSIGN_OR_RAISE (auto index_buffers, Buffers (1 , sizeof (int64_t )));
239
+ RETURN_NOT_OK (ConcatenateOffsets<int64_t >(index_buffers, pool_, &out_->buffers [1 ],
240
+ &value_ranges));
241
+ ARROW_ASSIGN_OR_RAISE (auto child_data, ChildData (0 , value_ranges));
242
+ return ConcatenateImpl (child_data, pool_).Concatenate (&out_->child_data [0 ]);
236
243
}
237
244
238
245
Status Visit (const FixedSizeListType&) {
239
- return ConcatenateImpl (ChildData (0 ), pool_).Concatenate (&out_->child_data [0 ]);
246
+ ARROW_ASSIGN_OR_RAISE (auto child_data, ChildData (0 ));
247
+ return ConcatenateImpl (child_data, pool_).Concatenate (&out_->child_data [0 ]);
240
248
}
241
249
242
250
Status Visit (const StructType& s) {
243
251
for (int i = 0 ; i < s.num_fields (); ++i) {
244
- RETURN_NOT_OK (
245
- ConcatenateImpl (ChildData (i) , pool_).Concatenate (&out_->child_data [i]));
252
+ ARROW_ASSIGN_OR_RAISE ( auto child_data, ChildData (i));
253
+ RETURN_NOT_OK ( ConcatenateImpl (child_data , pool_).Concatenate (&out_->child_data [i]));
246
254
}
247
255
return Status::OK ();
248
256
}
@@ -263,7 +271,8 @@ class ConcatenateImpl {
263
271
264
272
if (dictionaries_same) {
265
273
out_->dictionary = in_[0 ]->dictionary ;
266
- return ConcatenateBuffers (Buffers (1 , *fixed), pool_).Value (&out_->buffers [1 ]);
274
+ ARROW_ASSIGN_OR_RAISE (auto index_buffers, Buffers (1 , *fixed));
275
+ return ConcatenateBuffers (index_buffers, pool_).Value (&out_->buffers [1 ]);
267
276
} else {
268
277
return Status::NotImplemented (" Concat with dictionary unification NYI" );
269
278
}
@@ -279,17 +288,24 @@ class ConcatenateImpl {
279
288
}
280
289
281
290
private:
291
+ // NOTE: Concatenate() can be called during IPC reads to append delta dictionaries
292
+ // on non-validated input. Therefore, the input-checking SliceBufferSafe and
293
+ // ArrayData::SliceSafe are used below.
294
+
282
295
// Gather the index-th buffer of each input into a vector.
283
296
// Bytes are sliced with that input's offset and length.
284
297
// Note that BufferVector will not contain the buffer of in_[i] if it's
285
298
// nullptr.
286
- BufferVector Buffers (size_t index) {
299
+ Result< BufferVector> Buffers (size_t index) {
287
300
BufferVector buffers;
288
301
buffers.reserve (in_.size ());
289
302
for (const std::shared_ptr<const ArrayData>& array_data : in_) {
290
303
const auto & buffer = array_data->buffers [index ];
291
304
if (buffer != nullptr ) {
292
- buffers.push_back (SliceBuffer (buffer, array_data->offset , array_data->length ));
305
+ ARROW_ASSIGN_OR_RAISE (
306
+ auto sliced_buffer,
307
+ SliceBufferSafe (buffer, array_data->offset , array_data->length ));
308
+ buffers.push_back (std::move (sliced_buffer));
293
309
}
294
310
}
295
311
return buffers;
@@ -299,14 +315,17 @@ class ConcatenateImpl {
299
315
// Bytes are sliced with the explicitly passed ranges.
300
316
// Note that BufferVector will not contain the buffer of in_[i] if it's
301
317
// nullptr.
302
- BufferVector Buffers (size_t index, const std::vector<Range>& ranges) {
318
+ Result< BufferVector> Buffers (size_t index, const std::vector<Range>& ranges) {
303
319
DCHECK_EQ (in_.size (), ranges.size ());
304
320
BufferVector buffers;
305
321
buffers.reserve (in_.size ());
306
322
for (size_t i = 0 ; i < in_.size (); ++i) {
307
323
const auto & buffer = in_[i]->buffers [index ];
308
324
if (buffer != nullptr ) {
309
- buffers.push_back (SliceBuffer (buffer, ranges[i].offset , ranges[i].length ));
325
+ ARROW_ASSIGN_OR_RAISE (
326
+ auto sliced_buffer,
327
+ SliceBufferSafe (buffer, ranges[i].offset , ranges[i].length ));
328
+ buffers.push_back (std::move (sliced_buffer));
310
329
} else {
311
330
DCHECK_EQ (ranges[i].length , 0 );
312
331
}
@@ -319,14 +338,16 @@ class ConcatenateImpl {
319
338
// those elements are sliced with that input's offset and length.
320
339
// Note that BufferVector will not contain the buffer of in_[i] if it's
321
340
// nullptr.
322
- BufferVector Buffers (size_t index, int byte_width) {
341
+ Result< BufferVector> Buffers (size_t index, int byte_width) {
323
342
BufferVector buffers;
324
343
buffers.reserve (in_.size ());
325
344
for (const std::shared_ptr<const ArrayData>& array_data : in_) {
326
345
const auto & buffer = array_data->buffers [index ];
327
346
if (buffer != nullptr ) {
328
- buffers.push_back (SliceBuffer (buffer, array_data->offset * byte_width,
329
- array_data->length * byte_width));
347
+ ARROW_ASSIGN_OR_RAISE (auto sliced_buffer,
348
+ SliceBufferSafe (buffer, array_data->offset * byte_width,
349
+ array_data->length * byte_width));
350
+ buffers.push_back (std::move (sliced_buffer));
330
351
}
331
352
}
332
353
return buffers;
@@ -337,7 +358,7 @@ class ConcatenateImpl {
337
358
// those elements are sliced with that input's offset and length.
338
359
// Note that BufferVector will not contain the buffer of in_[i] if it's
339
360
// nullptr.
340
- BufferVector Buffers (size_t index, const FixedWidthType& fixed) {
361
+ Result< BufferVector> Buffers (size_t index, const FixedWidthType& fixed) {
341
362
DCHECK_EQ (fixed.bit_width () % 8 , 0 );
342
363
return Buffers (index , fixed.bit_width () / 8 );
343
364
}
@@ -355,23 +376,24 @@ class ConcatenateImpl {
355
376
356
377
// Gather the index-th child_data of each input into a vector.
357
378
// Elements are sliced with that input's offset and length.
358
- std::vector<std::shared_ptr<const ArrayData>> ChildData (size_t index) {
379
+ Result< std::vector<std::shared_ptr<const ArrayData> >> ChildData (size_t index) {
359
380
std::vector<std::shared_ptr<const ArrayData>> child_data (in_.size ());
360
381
for (size_t i = 0 ; i < in_.size (); ++i) {
361
- child_data[i] = in_[i]->child_data [index ]->Slice (in_[i]->offset , in_[i]->length );
382
+ ARROW_ASSIGN_OR_RAISE (child_data[i], in_[i]->child_data [index ]->SliceSafe (
383
+ in_[i]->offset , in_[i]->length ));
362
384
}
363
385
return child_data;
364
386
}
365
387
366
388
// Gather the index-th child_data of each input into a vector.
367
389
// Elements are sliced with the explicitly passed ranges.
368
- std::vector<std::shared_ptr<const ArrayData>> ChildData (
390
+ Result< std::vector<std::shared_ptr<const ArrayData> >> ChildData (
369
391
size_t index, const std::vector<Range>& ranges) {
370
392
DCHECK_EQ (in_.size (), ranges.size ());
371
393
std::vector<std::shared_ptr<const ArrayData>> child_data (in_.size ());
372
394
for (size_t i = 0 ; i < in_.size (); ++i) {
373
- child_data[i] =
374
- in_[i]-> child_data [ index ]-> Slice ( ranges[i].offset , ranges[i].length );
395
+ ARROW_ASSIGN_OR_RAISE ( child_data[i], in_[i]-> child_data [ index ]-> SliceSafe (
396
+ ranges[i].offset , ranges[i].length ) );
375
397
}
376
398
return child_data;
377
399
}
0 commit comments