Skip to content

Commit 92262cc

Browse files
refactor: introduce ReadPolicy to parquet reader. (#13020)
* Support tablestats of parquet rs. * Remove `is_inner_project`. * Introduce read policy and split parquet readers. * Support predicate and topk policy. * Add test cases and fix bugs. * Fix typo. * Satisfy the code review. * Simplify the codes and fix bug. --------- Co-authored-by: sundyli <[email protected]>
1 parent 26e6a40 commit 92262cc

File tree

31 files changed

+1946
-727
lines changed

31 files changed

+1946
-727
lines changed

src/query/catalog/src/plan/pushdown.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,16 @@ impl PushDownInfo {
132132
}
133133

134134
let leaf_fields = schema.leaf_fields();
135-
let column_id = leaf_fields
135+
let (leaf_id, f) = leaf_fields
136136
.iter()
137-
.find(|&p| p == field)
138-
.unwrap()
139-
.column_id();
137+
.enumerate()
138+
.find(|&(_, p)| p == field)
139+
.unwrap();
140+
// Databend column id is not equal to parquet leaf id when there is nested type.
141+
if f.column_id as usize != leaf_id {
142+
return None;
143+
}
144+
let column_id = f.column_id;
140145

141146
let top_k = TopK {
142147
limit: self.limit.unwrap(),

src/query/catalog/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ pub enum NavigationPoint {
405405
TimePoint(DateTime<Utc>),
406406
}
407407

408-
#[derive(Debug, Copy, Clone)]
408+
#[derive(Debug, Copy, Clone, Default)]
409409
pub struct TableStatistics {
410410
pub num_rows: Option<u64>,
411411
pub data_size: Option<u64>,

src/query/sql/src/planner/binder/table.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -972,13 +972,8 @@ impl Binder {
972972
}
973973
}
974974

975-
let mut stat = table.table().table_statistics()?;
976-
if let Some(rows) = statistics_provider.num_rows() {
977-
// For external storage (parquet)
978-
if let Some(stat) = &mut stat {
979-
stat.num_rows = Some(rows);
980-
}
981-
};
975+
let stat = table.table().table_statistics()?;
976+
982977
Ok((
983978
SExpr::create_leaf(Arc::new(
984979
Scan {

src/query/storages/iceberg/src/table.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use common_storage::DataOperator;
4343
use common_storages_parquet::ParquetFilesPart;
4444
use common_storages_parquet::ParquetPart;
4545
use common_storages_parquet::ParquetRSPruner;
46-
use common_storages_parquet::ParquetRSReader;
46+
use common_storages_parquet::ParquetRSReaderBuilder;
4747
use icelake::catalog::Catalog;
4848
use opendal::Operator;
4949
use storages_common_pruner::RangePrunerCreator;
@@ -189,12 +189,16 @@ impl IcebergTable {
189189
options,
190190
)?;
191191

192-
let builder =
193-
ParquetRSReader::builder(ctx.clone(), self.op.operator(), table_schema, &arrow_schema)?
194-
.with_push_downs(plan.push_downs.as_ref())
195-
.with_pruner(Some(pruner));
192+
let mut builder = ParquetRSReaderBuilder::create(
193+
ctx.clone(),
194+
self.op.operator(),
195+
table_schema,
196+
&arrow_schema,
197+
)?
198+
.with_push_downs(plan.push_downs.as_ref())
199+
.with_pruner(Some(pruner));
196200

197-
let praquet_reader = Arc::new(builder.build()?);
201+
let praquet_reader = Arc::new(builder.build_full_reader()?);
198202

199203
// TODO: we need to support top_k.
200204
let output_schema = Arc::new(DataSchema::from(plan.schema()));

src/query/storages/iceberg/src/table_source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use common_pipeline_core::processors::processor::Event;
2828
use common_pipeline_core::processors::processor::ProcessorPtr;
2929
use common_pipeline_core::processors::Processor;
3030
use common_storages_parquet::ParquetPart;
31-
use common_storages_parquet::ParquetRSReader;
31+
use common_storages_parquet::ParquetRSFullReader;
3232
use opendal::Reader;
3333
use parquet::arrow::async_reader::ParquetRecordBatchStream;
3434

@@ -45,7 +45,7 @@ pub struct IcebergTableSource {
4545

4646
// Used to read parquet.
4747
output_schema: DataSchemaRef,
48-
parquet_reader: Arc<ParquetRSReader>,
48+
parquet_reader: Arc<ParquetRSFullReader>,
4949
stream: Option<ParquetRecordBatchStream<Reader>>,
5050
}
5151

@@ -54,7 +54,7 @@ impl IcebergTableSource {
5454
ctx: Arc<dyn TableContext>,
5555
output: Arc<OutputPort>,
5656
output_schema: DataSchemaRef,
57-
parquet_reader: Arc<ParquetRSReader>,
57+
parquet_reader: Arc<ParquetRSFullReader>,
5858
) -> Result<ProcessorPtr> {
5959
let scan_progress = ctx.get_scan_progress();
6060
Ok(ProcessorPtr::create(Box::new(IcebergTableSource {

src/query/storages/parquet/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#![feature(try_blocks)]
1717
#![feature(impl_trait_in_assoc_type)]
1818
#![feature(let_chains)]
19+
#![feature(core_intrinsics)]
20+
#![feature(int_roundings)]
1921

2022
mod parquet2;
2123
mod parquet_part;
@@ -25,7 +27,9 @@ mod utils;
2527
pub use parquet2::Parquet2Table;
2628
pub use parquet_part::ParquetFilesPart;
2729
pub use parquet_part::ParquetPart;
30+
pub use parquet_rs::ParquetRSFullReader;
2831
pub use parquet_rs::ParquetRSPruner;
29-
pub use parquet_rs::ParquetRSReader;
32+
pub use parquet_rs::ParquetRSReaderBuilder;
3033
pub use parquet_rs::ParquetRSRowGroupPart;
34+
pub use parquet_rs::ParquetRSRowGroupReader;
3135
pub use parquet_rs::ParquetRSTable;

src/query/storages/parquet/src/parquet2/parquet_table/create.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,14 @@ impl Parquet2Table {
6363
&first_file,
6464
)
6565
.await?;
66+
67+
let num_rows = file_metas.iter().map(|m| m.num_rows as u64).sum();
6668
let column_statistics_provider =
6769
create_parquet2_statistics_provider(file_metas, &arrow_schema)?;
6870

69-
let table_info = create_parquet_table_info(arrow_schema.clone(), &stage_info);
71+
let mut table_info = create_parquet_table_info(arrow_schema.clone(), &stage_info);
72+
table_info.meta.statistics.number_of_rows = num_rows;
73+
7074
Ok(Arc::new(Parquet2Table {
7175
table_info,
7276
arrow_schema,

src/query/storages/parquet/src/parquet_rs/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ mod pruning;
1919
mod source;
2020
mod statistics;
2121

22-
pub use parquet_reader::ParquetRSReader;
22+
pub use parquet_reader::ParquetRSFullReader;
23+
pub use parquet_reader::ParquetRSReaderBuilder;
24+
pub use parquet_reader::ParquetRSRowGroupReader;
2325
pub use parquet_table::ParquetRSTable;
2426
pub use partition::ParquetRSRowGroupPart;
2527
pub use pruning::ParquetRSPruner;

src/query/storages/parquet/src/parquet_rs/parquet_reader/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
// limitations under the License.
1414

1515
mod predicate;
16+
mod read_policy;
1617
mod reader;
1718
mod row_group;
1819
mod topk;
20+
mod utils;
1921

20-
pub use reader::ParquetRSReader;
22+
pub use read_policy::*;
23+
pub use reader::ParquetRSFullReader;
2124
pub use reader::ParquetRSReaderBuilder;
25+
pub use reader::ParquetRSRowGroupReader;

src/query/storages/parquet/src/parquet_rs/parquet_reader/predicate.rs

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,56 +12,46 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use arrow_array::BooleanArray;
1618
use arrow_array::RecordBatch;
17-
use arrow_schema::FieldRef;
19+
use common_arrow::arrow::bitmap::Bitmap;
20+
use common_catalog::plan::PrewhereInfo;
21+
use common_catalog::plan::Projection;
1822
use common_exception::Result;
1923
use common_expression::types::DataType;
2024
use common_expression::DataBlock;
2125
use common_expression::Evaluator;
2226
use common_expression::Expr;
23-
use common_expression::FieldIndex;
2427
use common_expression::FunctionContext;
28+
use common_expression::TableSchema;
2529
use common_functions::BUILTIN_FUNCTIONS;
30+
use parquet::arrow::parquet_to_arrow_field_levels;
2631
use parquet::arrow::FieldLevels;
2732
use parquet::arrow::ProjectionMask;
33+
use parquet::schema::types::SchemaDescriptor;
2834

29-
use super::reader::transform_record_batch;
35+
use super::utils::bitmap_to_boolean_array;
36+
use super::utils::transform_record_batch;
37+
use super::utils::FieldPaths;
38+
use crate::parquet_rs::parquet_reader::utils::compute_output_field_paths;
3039

3140
pub struct ParquetPredicate {
3241
func_ctx: FunctionContext,
3342

3443
/// Columns used for eval predicate.
3544
projection: ProjectionMask,
36-
/// Projected field levels.
3745
field_levels: FieldLevels,
3846

3947
/// Predicate filter expression.
4048
filter: Expr,
41-
field_paths: Vec<(FieldRef, Vec<FieldIndex>)>,
49+
field_paths: Option<FieldPaths>,
4250

43-
inner_projected: bool,
51+
schema: TableSchema,
4452
}
4553

4654
impl ParquetPredicate {
47-
pub fn new(
48-
func_ctx: FunctionContext,
49-
projection: ProjectionMask,
50-
field_levels: FieldLevels,
51-
filter: Expr,
52-
field_paths: Vec<(FieldRef, Vec<FieldIndex>)>,
53-
inner_projected: bool,
54-
) -> Self {
55-
Self {
56-
func_ctx,
57-
projection,
58-
field_levels,
59-
filter,
60-
field_paths,
61-
inner_projected,
62-
}
63-
}
64-
6555
pub fn projection(&self) -> &ProjectionMask {
6656
&self.projection
6757
}
@@ -70,18 +60,59 @@ impl ParquetPredicate {
7060
&self.field_levels
7161
}
7262

73-
pub fn evaluate(&self, batch: &RecordBatch) -> Result<BooleanArray> {
74-
let block = if self.inner_projected {
75-
transform_record_batch(batch, &self.field_paths)?
76-
} else {
77-
let (block, _) = DataBlock::from_record_batch(batch)?;
78-
block
79-
};
80-
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
63+
pub fn field_paths(&self) -> &Option<FieldPaths> {
64+
&self.field_paths
65+
}
66+
67+
pub fn evaluate_block(&self, block: &DataBlock) -> Result<Bitmap> {
68+
let evaluator = Evaluator::new(block, &self.func_ctx, &BUILTIN_FUNCTIONS);
8169
let res = evaluator
8270
.run(&self.filter)?
83-
.convert_to_full_column(&DataType::Boolean, batch.num_rows())
84-
.into_arrow_rs()?;
85-
Ok(BooleanArray::from(res.to_data()))
71+
.convert_to_full_column(&DataType::Boolean, block.num_rows())
72+
.as_boolean()
73+
.cloned()
74+
.unwrap();
75+
Ok(res)
8676
}
77+
78+
pub fn evaluate(&self, batch: &RecordBatch) -> Result<BooleanArray> {
79+
let block = transform_record_batch(batch, &self.field_paths)?;
80+
let res = self.evaluate_block(&block)?;
81+
Ok(bitmap_to_boolean_array(res))
82+
}
83+
84+
pub fn schema(&self) -> &TableSchema {
85+
&self.schema
86+
}
87+
}
88+
89+
/// Build [`PrewhereInfo`] into [`ParquetPredicate`] and get the leave columnd ids.
90+
pub fn build_predicate(
91+
func_ctx: FunctionContext,
92+
prewhere: &PrewhereInfo,
93+
table_schema: &TableSchema,
94+
schema_desc: &SchemaDescriptor,
95+
) -> Result<(Arc<ParquetPredicate>, Vec<usize>)> {
96+
let inner_projection = matches!(prewhere.output_columns, Projection::InnerColumns(_));
97+
let schema = prewhere.prewhere_columns.project_schema(table_schema);
98+
let filter = prewhere
99+
.filter
100+
.as_expr(&BUILTIN_FUNCTIONS)
101+
.project_column_ref(|name| schema.index_of(name).unwrap());
102+
let (projection, leaves) = prewhere.prewhere_columns.to_arrow_projection(schema_desc);
103+
let field_paths =
104+
compute_output_field_paths(schema_desc, &projection, &schema, inner_projection)?;
105+
let field_levels = parquet_to_arrow_field_levels(schema_desc, projection.clone(), None)?;
106+
107+
Ok((
108+
Arc::new(ParquetPredicate {
109+
func_ctx,
110+
projection,
111+
filter,
112+
field_levels,
113+
field_paths,
114+
schema,
115+
}),
116+
leaves,
117+
))
87118
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod no_prefetch;
16+
mod predicate_and_topk;
17+
mod topk_only;
18+
mod utils;
19+
20+
pub mod policy;
21+
pub use no_prefetch::NoPrefetchPolicy;
22+
pub use no_prefetch::NoPretchPolicyBuilder;
23+
pub use predicate_and_topk::PredicateAndTopkPolicy;
24+
pub use predicate_and_topk::PredicateAndTopkPolicyBuilder;
25+
pub use topk_only::TopkOnlyPolicy;
26+
pub use topk_only::TopkOnlyPolicyBuilder;

0 commit comments

Comments
 (0)