Skip to content

Commit 2b5f56c

Browse files
authored
GH-45201: [C++][Parquet] Improve performance of generating size statistics (#45202)
### Rationale for this change We found out in #45085 that there is a non-trivial overhead when writing size statistics is enabled. ### What changes are included in this PR? Dramatically reduce overhead by speeding up def/rep levels histogram updates. Performance results on the author's machine: ``` ------------------------------------------------------------------------------------------------------------------------------------------------ Benchmark Time CPU Iterations UserCounters... ------------------------------------------------------------------------------------------------------------------------------------------------ BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::Int64Type> 8103053 ns 8098569 ns 86 bytes_per_second=1003.26Mi/s items_per_second=129.477M/s output_size=537.472k page_index_size=33 BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type> 8153499 ns 8148492 ns 86 bytes_per_second=997.117Mi/s items_per_second=128.683M/s output_size=537.488k page_index_size=33 BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type> 8212560 ns 8207754 ns 83 bytes_per_second=989.918Mi/s items_per_second=127.754M/s output_size=537.502k page_index_size=47 BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::StringType> 10405020 ns 10400775 ns 67 bytes_per_second=444.142Mi/s items_per_second=100.817M/s output_size=848.305k page_index_size=34 BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType> 10464784 ns 10460778 ns 66 bytes_per_second=441.594Mi/s items_per_second=100.239M/s output_size=848.325k page_index_size=34 BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType> 10469832 ns 10465739 ns 67 bytes_per_second=441.385Mi/s items_per_second=100.191M/s output_size=848.344k page_index_size=48 BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::Int64Type> 13004962 ns 12992678 ns 52 bytes_per_second=657.101Mi/s items_per_second=80.7052M/s output_size=617.464k page_index_size=34 BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type> 13718352 ns 13705599 ns 50 bytes_per_second=622.921Mi/s items_per_second=76.5071M/s output_size=617.486k page_index_size=34 BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type> 13845553 ns 13832138 ns 52 bytes_per_second=617.222Mi/s items_per_second=75.8072M/s output_size=617.506k page_index_size=54 BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::StringType> 15715263 ns 15702707 ns 44 bytes_per_second=320.449Mi/s items_per_second=66.7768M/s output_size=927.326k page_index_size=35 BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType> 16507328 ns 16493800 ns 43 bytes_per_second=305.079Mi/s items_per_second=63.5739M/s output_size=927.352k page_index_size=35 BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType> 16575359 ns 16561311 ns 42 bytes_per_second=303.836Mi/s items_per_second=63.3148M/s output_size=927.377k page_index_size=55 ``` Performance results without this PR: ``` ------------------------------------------------------------------------------------------------------------------------------------------------ Benchmark Time CPU Iterations UserCounters... ------------------------------------------------------------------------------------------------------------------------------------------------ BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::Int64Type> 8042576 ns 8037678 ns 87 bytes_per_second=1010.86Mi/s items_per_second=130.458M/s output_size=537.472k page_index_size=33 BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type> 9576627 ns 9571279 ns 73 bytes_per_second=848.894Mi/s items_per_second=109.554M/s output_size=537.488k page_index_size=33 BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type> 9570204 ns 9563595 ns 73 bytes_per_second=849.576Mi/s items_per_second=109.642M/s output_size=537.502k page_index_size=47 BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::StringType> 10165397 ns 10160868 ns 69 bytes_per_second=454.628Mi/s items_per_second=103.197M/s output_size=848.305k page_index_size=34 BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType> 11662568 ns 11657396 ns 60 bytes_per_second=396.265Mi/s items_per_second=89.9494M/s output_size=848.325k page_index_size=34 BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType> 11657135 ns 11653063 ns 60 bytes_per_second=396.412Mi/s items_per_second=89.9829M/s output_size=848.344k page_index_size=48 BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::Int64Type> 13182006 ns 13168704 ns 51 bytes_per_second=648.318Mi/s items_per_second=79.6264M/s output_size=617.464k page_index_size=34 BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type> 16438205 ns 16421762 ns 43 bytes_per_second=519.89Mi/s items_per_second=63.8528M/s output_size=617.486k page_index_size=34 BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type> 16424615 ns 16409032 ns 42 bytes_per_second=520.293Mi/s items_per_second=63.9024M/s output_size=617.506k page_index_size=54 BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::StringType> 15387808 ns 15373086 ns 46 bytes_per_second=327.32Mi/s items_per_second=68.2086M/s output_size=927.326k page_index_size=35 BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType> 18319628 ns 18302938 ns 37 bytes_per_second=274.924Mi/s items_per_second=57.29M/s output_size=927.352k page_index_size=35 BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType> 18346665 ns 18329336 ns 37 bytes_per_second=274.528Mi/s items_per_second=57.2075M/s output_size=927.377k page_index_size=55 ``` ### Are these changes tested? Tested by existing tests, validated by existing benchmarks. ### Are there any user-facing changes? No. * GitHub Issue: #45201 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent 3695d3d commit 2b5f56c

File tree

4 files changed

+249
-52
lines changed

4 files changed

+249
-52
lines changed

cpp/src/parquet/column_writer.cc

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,42 +1468,43 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
14681468
// which case we call back to the dense write path)
14691469
std::shared_ptr<::arrow::Array> preserved_dictionary_;
14701470

1471-
int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
1471+
int64_t WriteLevels(int64_t num_levels, const int16_t* def_levels,
14721472
const int16_t* rep_levels) {
1473+
// Update histograms now, to maximize cache efficiency.
1474+
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
1475+
14731476
int64_t values_to_write = 0;
14741477
// If the field is required and non-repeated, there are no definition levels
14751478
if (descr_->max_definition_level() > 0) {
1476-
for (int64_t i = 0; i < num_values; ++i) {
1479+
for (int64_t i = 0; i < num_levels; ++i) {
14771480
if (def_levels[i] == descr_->max_definition_level()) {
14781481
++values_to_write;
14791482
}
14801483
}
14811484

1482-
WriteDefinitionLevels(num_values, def_levels);
1485+
WriteDefinitionLevels(num_levels, def_levels);
14831486
} else {
14841487
// Required field, write all values
1485-
values_to_write = num_values;
1488+
values_to_write = num_levels;
14861489
}
14871490

14881491
// Not present for non-repeated fields
14891492
if (descr_->max_repetition_level() > 0) {
14901493
// A row could include more than one value
14911494
// Count the occasions where we start a new row
1492-
for (int64_t i = 0; i < num_values; ++i) {
1495+
for (int64_t i = 0; i < num_levels; ++i) {
14931496
if (rep_levels[i] == 0) {
14941497
rows_written_++;
14951498
num_buffered_rows_++;
14961499
}
14971500
}
14981501

1499-
WriteRepetitionLevels(num_values, rep_levels);
1502+
WriteRepetitionLevels(num_levels, rep_levels);
15001503
} else {
15011504
// Each value is exactly one row
1502-
rows_written_ += num_values;
1503-
num_buffered_rows_ += num_values;
1505+
rows_written_ += num_levels;
1506+
num_buffered_rows_ += num_levels;
15041507
}
1505-
1506-
UpdateLevelHistogram(num_values, def_levels, rep_levels);
15071508
return values_to_write;
15081509
}
15091510

@@ -1575,6 +1576,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
15751576

15761577
void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
15771578
const int16_t* rep_levels) {
1579+
// Update histograms now, to maximize cache efficiency.
1580+
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
1581+
15781582
// If the field is required and non-repeated, there are no definition levels
15791583
if (descr_->max_definition_level() > 0) {
15801584
WriteDefinitionLevels(num_levels, def_levels);
@@ -1595,8 +1599,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
15951599
rows_written_ += num_levels;
15961600
num_buffered_rows_ += num_levels;
15971601
}
1598-
1599-
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
16001602
}
16011603

16021604
void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
@@ -1606,26 +1608,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
16061608
}
16071609

16081610
auto add_levels = [](std::vector<int64_t>& level_histogram,
1609-
::arrow::util::span<const int16_t> levels) {
1610-
for (int16_t level : levels) {
1611-
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
1612-
++level_histogram[level];
1613-
}
1611+
::arrow::util::span<const int16_t> levels, int16_t max_level) {
1612+
ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, level_histogram.size());
1613+
::parquet::UpdateLevelHistogram(levels, level_histogram);
16141614
};
16151615

1616-
if (descr_->max_definition_level() > 0) {
1617-
add_levels(page_size_statistics_->definition_level_histogram,
1618-
{def_levels, static_cast<size_t>(num_levels)});
1619-
} else {
1620-
page_size_statistics_->definition_level_histogram[0] += num_levels;
1621-
}
1622-
1623-
if (descr_->max_repetition_level() > 0) {
1624-
add_levels(page_size_statistics_->repetition_level_histogram,
1625-
{rep_levels, static_cast<size_t>(num_levels)});
1626-
} else {
1627-
page_size_statistics_->repetition_level_histogram[0] += num_levels;
1628-
}
1616+
add_levels(page_size_statistics_->definition_level_histogram,
1617+
{def_levels, static_cast<size_t>(num_levels)},
1618+
descr_->max_definition_level());
1619+
add_levels(page_size_statistics_->repetition_level_histogram,
1620+
{rep_levels, static_cast<size_t>(num_levels)},
1621+
descr_->max_repetition_level());
16291622
}
16301623

16311624
// Update the unencoded data bytes for ByteArray only per the specification.

cpp/src/parquet/size_statistics.cc

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,27 @@
1818
#include "parquet/size_statistics.h"
1919

2020
#include <algorithm>
21+
#include <numeric>
22+
#include <ostream>
23+
#include <string_view>
2124

2225
#include "arrow/util/logging.h"
2326
#include "parquet/exception.h"
2427
#include "parquet/schema.h"
2528

2629
namespace parquet {
2730

31+
namespace {
32+
33+
void MergeLevelHistogram(::arrow::util::span<int64_t> histogram,
34+
::arrow::util::span<const int64_t> other) {
35+
ARROW_DCHECK_EQ(histogram.size(), other.size());
36+
std::transform(histogram.begin(), histogram.end(), other.begin(), histogram.begin(),
37+
std::plus<>());
38+
}
39+
40+
} // namespace
41+
2842
void SizeStatistics::Merge(const SizeStatistics& other) {
2943
if (repetition_level_histogram.size() != other.repetition_level_histogram.size()) {
3044
throw ParquetException("Repetition level histogram size mismatch");
@@ -36,12 +50,8 @@ void SizeStatistics::Merge(const SizeStatistics& other) {
3650
other.unencoded_byte_array_data_bytes.has_value()) {
3751
throw ParquetException("Unencoded byte array data bytes are not consistent");
3852
}
39-
std::transform(repetition_level_histogram.begin(), repetition_level_histogram.end(),
40-
other.repetition_level_histogram.begin(),
41-
repetition_level_histogram.begin(), std::plus<>());
42-
std::transform(definition_level_histogram.begin(), definition_level_histogram.end(),
43-
other.definition_level_histogram.begin(),
44-
definition_level_histogram.begin(), std::plus<>());
53+
MergeLevelHistogram(repetition_level_histogram, other.repetition_level_histogram);
54+
MergeLevelHistogram(definition_level_histogram, other.definition_level_histogram);
4555
if (unencoded_byte_array_data_bytes.has_value()) {
4656
unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
4757
other.unencoded_byte_array_data_bytes.value();
@@ -91,4 +101,88 @@ std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* des
91101
return size_stats;
92102
}
93103

104+
std::ostream& operator<<(std::ostream& os, const SizeStatistics& size_stats) {
105+
constexpr std::string_view kComma = ", ";
106+
os << "SizeStatistics{";
107+
std::string_view sep = "";
108+
if (size_stats.unencoded_byte_array_data_bytes.has_value()) {
109+
os << "unencoded_byte_array_data_bytes="
110+
<< *size_stats.unencoded_byte_array_data_bytes;
111+
sep = kComma;
112+
}
113+
auto print_histogram = [&](std::string_view name,
114+
const std::vector<int64_t>& histogram) {
115+
if (!histogram.empty()) {
116+
os << sep << name << "={";
117+
sep = kComma;
118+
std::string_view value_sep = "";
119+
for (int64_t v : histogram) {
120+
os << value_sep << v;
121+
value_sep = kComma;
122+
}
123+
os << "}";
124+
}
125+
};
126+
print_histogram("repetition_level_histogram", size_stats.repetition_level_histogram);
127+
print_histogram("definition_level_histogram", size_stats.definition_level_histogram);
128+
os << "}";
129+
return os;
130+
}
131+
132+
void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
133+
::arrow::util::span<int64_t> histogram) {
134+
const int64_t num_levels = static_cast<int64_t>(levels.size());
135+
DCHECK_GE(histogram.size(), 1);
136+
const int16_t max_level = static_cast<int16_t>(histogram.size() - 1);
137+
if (max_level == 0) {
138+
histogram[0] += num_levels;
139+
return;
140+
}
141+
142+
#ifndef NDEBUG
143+
for (auto level : levels) {
144+
ARROW_DCHECK_LE(level, max_level);
145+
}
146+
#endif
147+
148+
if (max_level == 1) {
149+
// Specialize the common case for non-repeated non-nested columns.
150+
// Summing the levels gives us the number of 1s, and the number of 0s follows.
151+
// We do repeated sums in the int16_t space, which the compiler is likely
152+
// to vectorize efficiently.
153+
constexpr int64_t kChunkSize = 1 << 14; // to avoid int16_t overflows
154+
int64_t hist1 = 0;
155+
auto it = levels.begin();
156+
while (it != levels.end()) {
157+
const auto chunk_size = std::min<int64_t>(levels.end() - it, kChunkSize);
158+
hist1 += std::accumulate(levels.begin(), levels.begin() + chunk_size, int16_t{0});
159+
it += chunk_size;
160+
}
161+
histogram[0] += num_levels - hist1;
162+
histogram[1] += hist1;
163+
return;
164+
}
165+
166+
// The generic implementation issues a series of histogram load-stores.
167+
// However, it limits store-to-load dependencies by interleaving partial histogram
168+
// updates.
169+
constexpr int kUnroll = 4;
170+
std::array<std::vector<int64_t>, kUnroll> partial_hist;
171+
for (auto& hist : partial_hist) {
172+
hist.assign(histogram.size(), 0);
173+
}
174+
int64_t i = 0;
175+
for (; i <= num_levels - kUnroll; i += kUnroll) {
176+
for (int j = 0; j < kUnroll; ++j) {
177+
++partial_hist[j][levels[i + j]];
178+
}
179+
}
180+
for (; i < num_levels; ++i) {
181+
++partial_hist[0][levels[i]];
182+
}
183+
for (const auto& hist : partial_hist) {
184+
MergeLevelHistogram(histogram, hist);
185+
}
186+
}
187+
94188
} // namespace parquet

cpp/src/parquet/size_statistics.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
#pragma once
1919

20+
#include <cstdint>
21+
#include <iosfwd>
2022
#include <optional>
2123
#include <vector>
2224

25+
#include "arrow/util/span.h"
2326
#include "parquet/platform.h"
2427
#include "parquet/type_fwd.h"
2528

@@ -89,4 +92,11 @@ struct PARQUET_EXPORT SizeStatistics {
8992
static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
9093
};
9194

95+
PARQUET_EXPORT
96+
std::ostream& operator<<(std::ostream&, const SizeStatistics&);
97+
98+
PARQUET_EXPORT
99+
void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
100+
::arrow::util::span<int64_t> histogram);
101+
92102
} // namespace parquet

0 commit comments

Comments
 (0)