Skip to content

Commit 832be22

Browse files
committed
Add support for Create Iceberg Table statement for Snowflake parser (apache#1664)
1 parent e5bc3df commit 832be22

9 files changed

+274
-7
lines changed

src/ast/dml.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ pub struct CreateTable {
117117
pub if_not_exists: bool,
118118
pub transient: bool,
119119
pub volatile: bool,
120+
pub iceberg: bool,
120121
/// Table name
121122
#[cfg_attr(feature = "visitor", visit(with = "visit_relation"))]
122123
pub name: ObjectName,
@@ -192,6 +193,21 @@ pub struct CreateTable {
192193
/// Snowflake "WITH TAG" clause
193194
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
194195
pub with_tags: Option<Vec<Tag>>,
196+
/// Snowflake "EXTERNAL_VOLUME" clause for Iceberg tables
197+
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
198+
pub external_volume: Option<String>,
199+
/// Snowflake "BASE_LOCATION" clause for Iceberg tables
200+
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
201+
pub base_location: Option<String>,
202+
/// Snowflake "CATALOG" clause for Iceberg tables
203+
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
204+
pub catalog: Option<String>,
205+
/// Snowflake "CATALOG_SYNC" clause for Iceberg tables
206+
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
207+
pub catalog_sync: Option<String>,
208+
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
209+
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
210+
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
195211
}
196212

197213
impl Display for CreateTable {
@@ -205,7 +221,7 @@ impl Display for CreateTable {
205221
// `CREATE TABLE t (a INT) AS SELECT a from t2`
206222
write!(
207223
f,
208-
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}TABLE {if_not_exists}{name}",
224+
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}TABLE {if_not_exists}{name}",
209225
or_replace = if self.or_replace { "OR REPLACE " } else { "" },
210226
external = if self.external { "EXTERNAL " } else { "" },
211227
global = self.global
@@ -221,6 +237,8 @@ impl Display for CreateTable {
221237
temporary = if self.temporary { "TEMPORARY " } else { "" },
222238
transient = if self.transient { "TRANSIENT " } else { "" },
223239
volatile = if self.volatile { "VOLATILE " } else { "" },
240+
// Only for Snowflake
241+
iceberg = if self.iceberg { "ICEBERG " } else { "" },
224242
name = self.name,
225243
)?;
226244
if let Some(on_cluster) = &self.on_cluster {
@@ -382,6 +400,31 @@ impl Display for CreateTable {
382400
)?;
383401
}
384402

403+
if let Some(external_volume) = self.external_volume.as_ref() {
404+
write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?;
405+
}
406+
407+
if let Some(catalog) = self.catalog.as_ref() {
408+
write!(f, " CATALOG = '{catalog}'")?;
409+
}
410+
411+
if self.iceberg {
412+
if let Some(base_location) = self.base_location.as_ref() {
413+
write!(f, " BASE_LOCATION = '{base_location}'")?;
414+
}
415+
}
416+
417+
if let Some(catalog_sync) = self.catalog_sync.as_ref() {
418+
write!(f, " CATALOG_SYNC = '{catalog_sync}'")?;
419+
}
420+
421+
if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() {
422+
write!(
423+
f,
424+
" STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}"
425+
)?;
426+
}
427+
385428
if self.copy_grants {
386429
write!(f, " COPY GRANTS")?;
387430
}

src/ast/helpers/stmt_create_table.rs

+64-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use super::super::dml::CreateTable;
2828
use crate::ast::{
2929
ClusteredBy, ColumnDef, CommentDef, Expr, FileFormat, HiveDistributionStyle, HiveFormat, Ident,
3030
ObjectName, OnCommit, OneOrManyWithParens, Query, RowAccessPolicy, SqlOption, Statement,
31-
TableConstraint, TableEngine, Tag, WrappedCollection,
31+
StorageSerializationPolicy, TableConstraint, TableEngine, Tag, WrappedCollection,
3232
};
3333
use crate::parser::ParserError;
3434

@@ -71,6 +71,7 @@ pub struct CreateTableBuilder {
7171
pub if_not_exists: bool,
7272
pub transient: bool,
7373
pub volatile: bool,
74+
pub iceberg: bool,
7475
pub name: ObjectName,
7576
pub columns: Vec<ColumnDef>,
7677
pub constraints: Vec<TableConstraint>,
@@ -107,6 +108,11 @@ pub struct CreateTableBuilder {
107108
pub with_aggregation_policy: Option<ObjectName>,
108109
pub with_row_access_policy: Option<RowAccessPolicy>,
109110
pub with_tags: Option<Vec<Tag>>,
111+
pub base_location: Option<String>,
112+
pub external_volume: Option<String>,
113+
pub catalog: Option<String>,
114+
pub catalog_sync: Option<String>,
115+
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
110116
}
111117

112118
impl CreateTableBuilder {
@@ -119,6 +125,7 @@ impl CreateTableBuilder {
119125
if_not_exists: false,
120126
transient: false,
121127
volatile: false,
128+
iceberg: false,
122129
name,
123130
columns: vec![],
124131
constraints: vec![],
@@ -155,6 +162,11 @@ impl CreateTableBuilder {
155162
with_aggregation_policy: None,
156163
with_row_access_policy: None,
157164
with_tags: None,
165+
base_location: None,
166+
external_volume: None,
167+
catalog: None,
168+
catalog_sync: None,
169+
storage_serialization_policy: None,
158170
}
159171
}
160172
pub fn or_replace(mut self, or_replace: bool) -> Self {
@@ -192,6 +204,11 @@ impl CreateTableBuilder {
192204
self
193205
}
194206

207+
pub fn iceberg(mut self, iceberg: bool) -> Self {
208+
self.iceberg = iceberg;
209+
self
210+
}
211+
195212
pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
196213
self.columns = columns;
197214
self
@@ -371,6 +388,34 @@ impl CreateTableBuilder {
371388
self
372389
}
373390

391+
pub fn base_location(mut self, base_location: Option<String>) -> Self {
392+
self.base_location = base_location;
393+
self
394+
}
395+
396+
pub fn external_volume(mut self, external_volume: Option<String>) -> Self {
397+
self.external_volume = external_volume;
398+
self
399+
}
400+
401+
pub fn catalog(mut self, catalog: Option<String>) -> Self {
402+
self.catalog = catalog;
403+
self
404+
}
405+
406+
pub fn catalog_sync(mut self, catalog_sync: Option<String>) -> Self {
407+
self.catalog_sync = catalog_sync;
408+
self
409+
}
410+
411+
pub fn storage_serialization_policy(
412+
mut self,
413+
storage_serialization_policy: Option<StorageSerializationPolicy>,
414+
) -> Self {
415+
self.storage_serialization_policy = storage_serialization_policy;
416+
self
417+
}
418+
374419
pub fn build(self) -> Statement {
375420
Statement::CreateTable(CreateTable {
376421
or_replace: self.or_replace,
@@ -380,6 +425,7 @@ impl CreateTableBuilder {
380425
if_not_exists: self.if_not_exists,
381426
transient: self.transient,
382427
volatile: self.volatile,
428+
iceberg: self.iceberg,
383429
name: self.name,
384430
columns: self.columns,
385431
constraints: self.constraints,
@@ -416,6 +462,11 @@ impl CreateTableBuilder {
416462
with_aggregation_policy: self.with_aggregation_policy,
417463
with_row_access_policy: self.with_row_access_policy,
418464
with_tags: self.with_tags,
465+
base_location: self.base_location,
466+
external_volume: self.external_volume,
467+
catalog: self.catalog,
468+
catalog_sync: self.catalog_sync,
469+
storage_serialization_policy: self.storage_serialization_policy,
419470
})
420471
}
421472
}
@@ -435,6 +486,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
435486
if_not_exists,
436487
transient,
437488
volatile,
489+
iceberg,
438490
name,
439491
columns,
440492
constraints,
@@ -471,6 +523,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
471523
with_aggregation_policy,
472524
with_row_access_policy,
473525
with_tags,
526+
base_location,
527+
external_volume,
528+
catalog,
529+
catalog_sync,
530+
storage_serialization_policy,
474531
}) => Ok(Self {
475532
or_replace,
476533
temporary,
@@ -505,6 +562,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
505562
clustered_by,
506563
options,
507564
strict,
565+
iceberg,
508566
copy_grants,
509567
enable_schema_evolution,
510568
change_tracking,
@@ -515,6 +573,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
515573
with_row_access_policy,
516574
with_tags,
517575
volatile,
576+
base_location,
577+
external_volume,
578+
catalog,
579+
catalog_sync,
580+
storage_serialization_policy,
518581
}),
519582
_ => Err(ParserError::ParserError(format!(
520583
"Expected create table statement, but received: {stmt}"

src/ast/spans.rs

+6
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ impl Spanned for CreateTable {
532532
if_not_exists: _, // bool
533533
transient: _, // bool
534534
volatile: _, // bool
535+
iceberg: _, // bool, Snowflake specific
535536
name,
536537
columns,
537538
constraints,
@@ -568,6 +569,11 @@ impl Spanned for CreateTable {
568569
with_aggregation_policy: _, // todo, Snowflake specific
569570
with_row_access_policy: _, // todo, Snowflake specific
570571
with_tags: _, // todo, Snowflake specific
572+
external_volume: _, // todo, Snowflake specific
573+
base_location: _, // todo, Snowflake specific
574+
catalog: _, // todo, Snowflake specific
575+
catalog_sync: _, // todo, Snowflake specific
576+
storage_serialization_policy: _, // todo, Snowflake specific
571577
} = self;
572578

573579
union_spans(

src/dialect/snowflake.rs

+53-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use alloc::string::String;
3737
use alloc::vec::Vec;
3838
#[cfg(not(feature = "std"))]
3939
use alloc::{format, vec};
40+
use sqlparser::ast::StorageSerializationPolicy;
4041

4142
use super::keywords::RESERVED_FOR_IDENTIFIER;
4243

@@ -130,16 +131,19 @@ impl Dialect for SnowflakeDialect {
130131
let mut temporary = false;
131132
let mut volatile = false;
132133
let mut transient = false;
134+
let mut iceberg = false;
133135

134136
match parser.parse_one_of_keywords(&[
135137
Keyword::TEMP,
136138
Keyword::TEMPORARY,
137139
Keyword::VOLATILE,
138140
Keyword::TRANSIENT,
141+
Keyword::ICEBERG,
139142
]) {
140143
Some(Keyword::TEMP | Keyword::TEMPORARY) => temporary = true,
141144
Some(Keyword::VOLATILE) => volatile = true,
142145
Some(Keyword::TRANSIENT) => transient = true,
146+
Some(Keyword::ICEBERG) => iceberg = true,
143147
_ => {}
144148
}
145149

@@ -148,7 +152,7 @@ impl Dialect for SnowflakeDialect {
148152
return Some(parse_create_stage(or_replace, temporary, parser));
149153
} else if parser.parse_keyword(Keyword::TABLE) {
150154
return Some(parse_create_table(
151-
or_replace, global, temporary, volatile, transient, parser,
155+
or_replace, global, temporary, volatile, transient, iceberg, parser,
152156
));
153157
} else {
154158
// need to go back with the cursor
@@ -255,7 +259,7 @@ impl Dialect for SnowflakeDialect {
255259
fn is_select_item_alias(&self, explicit: bool, kw: &Keyword, parser: &mut Parser) -> bool {
256260
explicit
257261
|| match kw {
258-
// The following keywords can be considered an alias as long as
262+
// The following keywords can be considered an alias as long as
259263
// they are not followed by other tokens that may change their meaning
260264
// e.g. `SELECT * EXCEPT (col1) FROM tbl`
261265
Keyword::EXCEPT
@@ -277,8 +281,8 @@ impl Dialect for SnowflakeDialect {
277281
false
278282
}
279283

280-
// Reserved keywords by the Snowflake dialect, which seem to be less strictive
281-
// than what is listed in `keywords::RESERVED_FOR_COLUMN_ALIAS`. The following
284+
// Reserved keywords by the Snowflake dialect, which seem to be less strictive
285+
// than what is listed in `keywords::RESERVED_FOR_COLUMN_ALIAS`. The following
282286
// keywords were tested with the this statement: `SELECT 1 <KW>`.
283287
Keyword::FROM
284288
| Keyword::GROUP
@@ -325,12 +329,14 @@ fn parse_file_staging_command(kw: Keyword, parser: &mut Parser) -> Result<Statem
325329

326330
/// Parse snowflake create table statement.
327331
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
332+
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
328333
pub fn parse_create_table(
329334
or_replace: bool,
330335
global: Option<bool>,
331336
temporary: bool,
332337
volatile: bool,
333338
transient: bool,
339+
iceberg: bool,
334340
parser: &mut Parser,
335341
) -> Result<Statement, ParserError> {
336342
let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
@@ -342,6 +348,7 @@ pub fn parse_create_table(
342348
.temporary(temporary)
343349
.transient(transient)
344350
.volatile(volatile)
351+
.iceberg(iceberg)
345352
.global(global)
346353
.hive_formats(Some(Default::default()));
347354

@@ -468,6 +475,28 @@ pub fn parse_create_table(
468475
let on_commit = Some(parser.parse_create_table_on_commit()?);
469476
builder = builder.on_commit(on_commit);
470477
}
478+
Keyword::EXTERNAL_VOLUME => {
479+
parser.expect_token(&Token::Eq)?;
480+
builder.external_volume = Some(parser.parse_literal_string()?);
481+
}
482+
Keyword::CATALOG => {
483+
parser.expect_token(&Token::Eq)?;
484+
builder.catalog = Some(parser.parse_literal_string()?);
485+
}
486+
Keyword::BASE_LOCATION => {
487+
parser.expect_token(&Token::Eq)?;
488+
builder.base_location = Some(parser.parse_literal_string()?);
489+
}
490+
Keyword::CATALOG_SYNC => {
491+
parser.expect_token(&Token::Eq)?;
492+
builder.catalog_sync = Some(parser.parse_literal_string()?);
493+
}
494+
Keyword::STORAGE_SERIALIZATION_POLICY => {
495+
parser.expect_token(&Token::Eq)?;
496+
497+
builder.storage_serialization_policy =
498+
Some(parse_storage_serialization_policy(parser)?);
499+
}
471500
_ => {
472501
return parser.expected("end of statement", next_token);
473502
}
@@ -502,9 +531,29 @@ pub fn parse_create_table(
502531
}
503532
}
504533

534+
if iceberg && builder.base_location.is_none() {
535+
return Err(ParserError::ParserError(
536+
"BASE_LOCATION is required for ICEBERG tables".to_string(),
537+
));
538+
}
539+
505540
Ok(builder.build())
506541
}
507542

543+
pub fn parse_storage_serialization_policy(
544+
parser: &mut Parser,
545+
) -> Result<StorageSerializationPolicy, ParserError> {
546+
let next_token = parser.next_token();
547+
match &next_token.token {
548+
Token::Word(w) => match w.keyword {
549+
Keyword::COMPATIBLE => Ok(StorageSerializationPolicy::Compatible),
550+
Keyword::OPTIMIZED => Ok(StorageSerializationPolicy::Optimized),
551+
_ => parser.expected("storage_serialization_policy", next_token),
552+
},
553+
_ => parser.expected("storage_serialization_policy", next_token),
554+
}
555+
}
556+
508557
pub fn parse_create_stage(
509558
or_replace: bool,
510559
temporary: bool,

0 commit comments

Comments
 (0)