Skip to content

Add support for Create Iceberg Table statement for Snowflake parser #1664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions src/ast/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use super::{
CommentDef, Expr, FileFormat, FromTable, HiveDistributionStyle, HiveFormat, HiveIOFormat,
HiveRowFormat, Ident, InsertAliases, MysqlInsertPriority, ObjectName, OnCommit, OnInsert,
OneOrManyWithParens, OrderByExpr, Query, RowAccessPolicy, SelectItem, Setting, SqlOption,
SqliteOnConflict, TableEngine, TableObject, TableWithJoins, Tag, WrappedCollection,
SqliteOnConflict, StorageSerializationPolicy, TableEngine, TableObject, TableWithJoins, Tag,
WrappedCollection,
};

/// CREATE INDEX statement.
Expand Down Expand Up @@ -117,6 +118,7 @@ pub struct CreateTable {
pub if_not_exists: bool,
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
/// Table name
#[cfg_attr(feature = "visitor", visit(with = "visit_relation"))]
pub name: ObjectName,
Expand Down Expand Up @@ -192,6 +194,21 @@ pub struct CreateTable {
/// Snowflake "WITH TAG" clause
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
pub with_tags: Option<Vec<Tag>>,
/// Snowflake "EXTERNAL_VOLUME" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub external_volume: Option<String>,
/// Snowflake "BASE_LOCATION" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub base_location: Option<String>,
/// Snowflake "CATALOG" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub catalog: Option<String>,
/// Snowflake "CATALOG_SYNC" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub catalog_sync: Option<String>,
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
}

impl Display for CreateTable {
Expand All @@ -205,7 +222,7 @@ impl Display for CreateTable {
// `CREATE TABLE t (a INT) AS SELECT a from t2`
write!(
f,
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}TABLE {if_not_exists}{name}",
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}TABLE {if_not_exists}{name}",
or_replace = if self.or_replace { "OR REPLACE " } else { "" },
external = if self.external { "EXTERNAL " } else { "" },
global = self.global
Expand All @@ -221,6 +238,8 @@ impl Display for CreateTable {
temporary = if self.temporary { "TEMPORARY " } else { "" },
transient = if self.transient { "TRANSIENT " } else { "" },
volatile = if self.volatile { "VOLATILE " } else { "" },
// Only for Snowflake
iceberg = if self.iceberg { "ICEBERG " } else { "" },
name = self.name,
)?;
if let Some(on_cluster) = &self.on_cluster {
Expand Down Expand Up @@ -382,6 +401,31 @@ impl Display for CreateTable {
)?;
}

if let Some(external_volume) = self.external_volume.as_ref() {
write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?;
}

if let Some(catalog) = self.catalog.as_ref() {
write!(f, " CATALOG = '{catalog}'")?;
}

if self.iceberg {
if let Some(base_location) = self.base_location.as_ref() {
write!(f, " BASE_LOCATION = '{base_location}'")?;
}
}

if let Some(catalog_sync) = self.catalog_sync.as_ref() {
write!(f, " CATALOG_SYNC = '{catalog_sync}'")?;
}

if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() {
write!(
f,
" STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}"
)?;
}

if self.copy_grants {
write!(f, " COPY GRANTS")?;
}
Expand Down
65 changes: 64 additions & 1 deletion src/ast/helpers/stmt_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::super::dml::CreateTable;
use crate::ast::{
ClusteredBy, ColumnDef, CommentDef, Expr, FileFormat, HiveDistributionStyle, HiveFormat, Ident,
ObjectName, OnCommit, OneOrManyWithParens, Query, RowAccessPolicy, SqlOption, Statement,
TableConstraint, TableEngine, Tag, WrappedCollection,
StorageSerializationPolicy, TableConstraint, TableEngine, Tag, WrappedCollection,
};
use crate::parser::ParserError;

Expand Down Expand Up @@ -71,6 +71,7 @@ pub struct CreateTableBuilder {
pub if_not_exists: bool,
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
pub name: ObjectName,
pub columns: Vec<ColumnDef>,
pub constraints: Vec<TableConstraint>,
Expand Down Expand Up @@ -107,6 +108,11 @@ pub struct CreateTableBuilder {
pub with_aggregation_policy: Option<ObjectName>,
pub with_row_access_policy: Option<RowAccessPolicy>,
pub with_tags: Option<Vec<Tag>>,
pub base_location: Option<String>,
pub external_volume: Option<String>,
pub catalog: Option<String>,
pub catalog_sync: Option<String>,
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
}

impl CreateTableBuilder {
Expand All @@ -119,6 +125,7 @@ impl CreateTableBuilder {
if_not_exists: false,
transient: false,
volatile: false,
iceberg: false,
name,
columns: vec![],
constraints: vec![],
Expand Down Expand Up @@ -155,6 +162,11 @@ impl CreateTableBuilder {
with_aggregation_policy: None,
with_row_access_policy: None,
with_tags: None,
base_location: None,
external_volume: None,
catalog: None,
catalog_sync: None,
storage_serialization_policy: None,
}
}
pub fn or_replace(mut self, or_replace: bool) -> Self {
Expand Down Expand Up @@ -192,6 +204,11 @@ impl CreateTableBuilder {
self
}

pub fn iceberg(mut self, iceberg: bool) -> Self {
self.iceberg = iceberg;
self
}

pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
self.columns = columns;
self
Expand Down Expand Up @@ -371,6 +388,34 @@ impl CreateTableBuilder {
self
}

pub fn base_location(mut self, base_location: Option<String>) -> Self {
self.base_location = base_location;
self
}

pub fn external_volume(mut self, external_volume: Option<String>) -> Self {
self.external_volume = external_volume;
self
}

pub fn catalog(mut self, catalog: Option<String>) -> Self {
self.catalog = catalog;
self
}

pub fn catalog_sync(mut self, catalog_sync: Option<String>) -> Self {
self.catalog_sync = catalog_sync;
self
}

pub fn storage_serialization_policy(
mut self,
storage_serialization_policy: Option<StorageSerializationPolicy>,
) -> Self {
self.storage_serialization_policy = storage_serialization_policy;
self
}

pub fn build(self) -> Statement {
Statement::CreateTable(CreateTable {
or_replace: self.or_replace,
Expand All @@ -380,6 +425,7 @@ impl CreateTableBuilder {
if_not_exists: self.if_not_exists,
transient: self.transient,
volatile: self.volatile,
iceberg: self.iceberg,
name: self.name,
columns: self.columns,
constraints: self.constraints,
Expand Down Expand Up @@ -416,6 +462,11 @@ impl CreateTableBuilder {
with_aggregation_policy: self.with_aggregation_policy,
with_row_access_policy: self.with_row_access_policy,
with_tags: self.with_tags,
base_location: self.base_location,
external_volume: self.external_volume,
catalog: self.catalog,
catalog_sync: self.catalog_sync,
storage_serialization_policy: self.storage_serialization_policy,
})
}
}
Expand All @@ -435,6 +486,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
if_not_exists,
transient,
volatile,
iceberg,
name,
columns,
constraints,
Expand Down Expand Up @@ -471,6 +523,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
with_aggregation_policy,
with_row_access_policy,
with_tags,
base_location,
external_volume,
catalog,
catalog_sync,
storage_serialization_policy,
}) => Ok(Self {
or_replace,
temporary,
Expand Down Expand Up @@ -505,6 +562,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
clustered_by,
options,
strict,
iceberg,
copy_grants,
enable_schema_evolution,
change_tracking,
Expand All @@ -515,6 +573,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
with_row_access_policy,
with_tags,
volatile,
base_location,
external_volume,
catalog,
catalog_sync,
storage_serialization_policy,
}),
_ => Err(ParserError::ParserError(format!(
"Expected create table statement, but received: {stmt}"
Expand Down
23 changes: 23 additions & 0 deletions src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8064,6 +8064,29 @@ impl fmt::Display for SessionParamValue {
}
}

/// Snowflake StorageSerializationPolicy for Iceberg Tables
/// ```sql
/// [ STORAGE_SERIALIZATION_POLICY = { COMPATIBLE | OPTIMIZED } ]
/// ```
///
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum StorageSerializationPolicy {
Compatible,
Optimized,
}

impl Display for StorageSerializationPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StorageSerializationPolicy::Compatible => write!(f, "COMPATIBLE"),
StorageSerializationPolicy::Optimized => write!(f, "OPTIMIZED"),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 6 additions & 0 deletions src/ast/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl Spanned for CreateTable {
if_not_exists: _, // bool
transient: _, // bool
volatile: _, // bool
iceberg: _, // bool, Snowflake specific
name,
columns,
constraints,
Expand Down Expand Up @@ -567,6 +568,11 @@ impl Spanned for CreateTable {
with_aggregation_policy: _, // todo, Snowflake specific
with_row_access_policy: _, // todo, Snowflake specific
with_tags: _, // todo, Snowflake specific
external_volume: _, // todo, Snowflake specific
base_location: _, // todo, Snowflake specific
catalog: _, // todo, Snowflake specific
catalog_sync: _, // todo, Snowflake specific
storage_serialization_policy: _, // todo, Snowflake specific
} = self;

union_spans(
Expand Down
Loading
Loading