Skip to content

Commit 26e6a40

Browse files
authored
feat: copy into location support compression. (#13067)
1 parent 278b85c commit 26e6a40

File tree

9 files changed

+123
-7
lines changed

9 files changed

+123
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/compress/src/encode.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ impl Encode for CompressCodec {
134134
}
135135

136136
impl CompressCodec {
137-
#[allow(unused)]
138137
pub fn compress_all(&mut self, to_compress: &[u8]) -> common_exception::Result<Vec<u8>> {
139138
let mut compress_bufs = vec![];
140139
let mut input = PartialBuffer::new(to_compress);

src/common/compress/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ pub use decode::DecompressCodec;
2525
pub use decode::DecompressDecoder;
2626
pub use decode::DecompressReader;
2727
pub use decode::DecompressState;
28+
pub use encode::CompressCodec;

src/query/storages/stage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ test = false
1414
[dependencies]
1515
common-base = { path = "../../../common/base" }
1616
common-catalog = { path = "../../catalog" }
17+
common-compress = { path = "../../../common/compress" }
1718
common-exception = { path = "../../../common/exception" }
1819
common-expression = { path = "../../expression" }
1920
common-formats = { path = "../../formats" }

src/query/storages/stage/src/parquet_file/sink_processor.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,13 @@ impl Processor for ParquetFileSink {
113113
#[async_backtrace::framed]
114114
async fn async_process(&mut self) -> Result<()> {
115115
assert!(!self.output_data.is_empty());
116-
let path = unload_path(&self.table_info, &self.uuid, self.group_id, self.batch_id);
116+
let path = unload_path(
117+
&self.table_info,
118+
&self.uuid,
119+
self.group_id,
120+
self.batch_id,
121+
None,
122+
);
117123
let data = mem::take(&mut self.output_data);
118124
self.data_accessor.write(&path, data).await?;
119125
self.batch_id += 1;

src/query/storages/stage/src/row_based_file/pipeline.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_catalog::table_context::TableContext;
1919
use common_exception::Result;
2020
use common_formats::FileFormatOptionsExt;
2121
use common_pipeline_core::Pipeline;
22+
use common_pipeline_sources::input_formats::InputContext;
2223
use opendal::Operator;
2324

2425
use crate::row_based_file::limit_file_size_processor::LimitFileSizeProcessor;
@@ -44,6 +45,11 @@ pub(crate) fn append_data_to_row_based_files(
4445
table_info.schema(),
4546
table_info.stage_info.file_format_params.clone(),
4647
)?;
48+
let compression = table_info
49+
.stage_info
50+
.file_format_params
51+
.clone()
52+
.compression();
4753
let prefix = output_format.serialize_prefix()?;
4854

4955
pipeline.add_transform(|input, output| {
@@ -62,6 +68,9 @@ pub(crate) fn append_data_to_row_based_files(
6268
if max_file_size != usize::MAX {
6369
pipeline.try_resize(max_threads)?;
6470
}
71+
72+
let compression = InputContext::get_compression_alg_copy(compression, "")?;
73+
6574
pipeline.add_sink(|input| {
6675
let gid = group_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6776
RowBasedFileSink::try_create(
@@ -71,6 +80,7 @@ pub(crate) fn append_data_to_row_based_files(
7180
prefix.clone(),
7281
uuid.clone(),
7382
gid,
83+
compression,
7484
)
7585
})?;
7686
Ok(())

src/query/storages/stage/src/row_based_file/sink_processor.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use std::sync::Arc;
1818

1919
use async_trait::async_trait;
2020
use common_catalog::plan::StageTableInfo;
21+
use common_compress::CompressAlgorithm;
22+
use common_compress::CompressCodec;
2123
use common_exception::Result;
2224
use common_expression::BlockMetaInfoDowncast;
2325
use common_expression::DataBlock;
@@ -45,6 +47,8 @@ pub struct RowBasedFileSink {
4547
uuid: String,
4648
group_id: usize,
4749
batch_id: usize,
50+
51+
compression: Option<CompressAlgorithm>,
4852
}
4953

5054
impl RowBasedFileSink {
@@ -55,6 +59,7 @@ impl RowBasedFileSink {
5559
prefix: Vec<u8>,
5660
uuid: String,
5761
group_id: usize,
62+
compression: Option<CompressAlgorithm>,
5863
) -> Result<ProcessorPtr> {
5964
Ok(ProcessorPtr::create(Box::new(RowBasedFileSink {
6065
table_info,
@@ -66,6 +71,7 @@ impl RowBasedFileSink {
6671
group_id,
6772
batch_id: 0,
6873
output_data: vec![],
74+
compression,
6975
})))
7076
}
7177
}
@@ -110,13 +116,22 @@ impl Processor for RowBasedFileSink {
110116
for b in buffers.buffers {
111117
output.extend_from_slice(b.as_slice());
112118
}
119+
if let Some(compression) = self.compression {
120+
output = CompressCodec::from(compression).compress_all(&output)?;
121+
}
113122
self.output_data = output;
114123
Ok(())
115124
}
116125

117126
#[async_backtrace::framed]
118127
async fn async_process(&mut self) -> Result<()> {
119-
let path = unload_path(&self.table_info, &self.uuid, self.group_id, self.batch_id);
128+
let path = unload_path(
129+
&self.table_info,
130+
&self.uuid,
131+
self.group_id,
132+
self.batch_id,
133+
self.compression,
134+
);
120135
let data = mem::take(&mut self.output_data);
121136
self.data_accessor.write(&path, data).await?;
122137
self.batch_id += 1;

src/query/storages/stage/src/stage_table.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use common_catalog::plan::StageTableInfo;
3131
use common_catalog::table::AppendMode;
3232
use common_catalog::table::Table;
3333
use common_catalog::table_context::TableContext;
34+
use common_compress::CompressAlgorithm;
3435
use common_exception::ErrorCode;
3536
use common_exception::Result;
3637
use common_expression::BlockThresholds;
@@ -296,24 +297,29 @@ pub fn unload_path(
296297
uuid: &str,
297298
group_id: usize,
298299
batch_id: usize,
300+
compression: Option<CompressAlgorithm>,
299301
) -> String {
300302
let format_name = format!(
301303
"{:?}",
302304
stage_table_info.stage_info.file_format_params.get_type()
303305
)
304306
.to_ascii_lowercase();
305307

308+
let suffix: &str = &compression
309+
.map(|c| format!(".{}", c.extension()))
310+
.unwrap_or_default();
311+
306312
let path = &stage_table_info.files_info.path;
307313

308314
if path.ends_with("data_") {
309315
format!(
310-
"{}{}_{:0>4}_{:0>8}.{}",
311-
path, uuid, group_id, batch_id, format_name
316+
"{}{}_{:0>4}_{:0>8}.{}{}",
317+
path, uuid, group_id, batch_id, format_name, suffix
312318
)
313319
} else {
314320
format!(
315-
"{}/data_{}_{:0>4}_{:0>8}.{}",
316-
path, uuid, group_id, batch_id, format_name
321+
"{}/data_{}_{:0>4}_{:0>8}.{}{}",
322+
path, uuid, group_id, batch_id, format_name, suffix
317323
)
318324
}
319325
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# need to run with '-p 0'
2+
3+
statement ok
4+
drop stage if exists unload;
5+
6+
statement ok
7+
create stage unload;
8+
9+
statement ok
10+
remove @unload;
11+
12+
statement ok
13+
drop table if exists ii;
14+
15+
statement ok
16+
create table ii (a int, b int);
17+
18+
statement ok
19+
insert into ii values (1, 2), (3, 4), (5, 6);
20+
21+
statement ok
22+
create file format if not exists csv_gzip type=csv compression=gzip;
23+
24+
# test csv
25+
statement ok
26+
copy into @unload from ii file_format=(type=csv);
27+
-----
28+
29+
query
30+
select right(name, 4), size from list_stage(location=>'@unload');
31+
----
32+
.csv 12
33+
34+
query
35+
select $1, $2 from @unload(file_format=>'csv');
36+
----
37+
1 2
38+
3 4
39+
5 6
40+
41+
# test csv_gzip
42+
statement ok
43+
remove @unload;
44+
45+
statement ok
46+
copy into @unload from ii file_format=(format_name='csv_gzip');
47+
48+
query
49+
select right(name, 7), size from list_stage(location=>'@unload');
50+
----
51+
.csv.gz 32
52+
53+
query
54+
select $1, $2 from @unload(file_format => 'csv_gzip');
55+
----
56+
1 2
57+
3 4
58+
5 6
59+
60+
# test tsv
61+
statement ok
62+
remove @unload;
63+
64+
statement ok
65+
copy into @unload from ii file_format=(format_name='tsv');
66+
67+
query
68+
select right(name, 4), size from list_stage(location=>'@unload');
69+
----
70+
.tsv 12
71+
72+
query
73+
select $1, $2 from @unload(file_format => 'tsv');
74+
----
75+
1 2
76+
3 4
77+
5 6

0 commit comments

Comments
 (0)