Skip to content

Commit 402d430

Browse files
authored
Merge pull request #9 from MaterializeInc/create_sources_add_like
CREATE SOURCES should support filter
2 parents c17eb41 + 76c15a1 commit 402d430

File tree

4 files changed

+50
-3
lines changed

4 files changed

+50
-3
lines changed

src/ast/mod.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ pub enum Statement {
411411
},
412412
/// `CREATE SOURCES`
413413
CreateSources {
414+
like: Option<String>,
414415
url: String,
415416
schema_registry: String,
416417
with_options: Vec<SqlOption>,
@@ -601,13 +602,18 @@ impl fmt::Display for Statement {
601602
Ok(())
602603
}
603604
Statement::CreateSources {
605+
like,
604606
url,
605607
schema_registry,
606608
with_options,
607609
} => {
610+
write!(f, "CREATE SOURCES ")?;
611+
if let Some(like) = like {
612+
write!(f, "LIKE '{}' ", value::escape_single_quote_string(like),)?;
613+
}
608614
write!(
609615
f,
610-
"CREATE SOURCES FROM {} USING SCHEMA REGISTRY {}",
616+
"FROM {} USING SCHEMA REGISTRY {}",
611617
Value::SingleQuotedString(url.clone()).to_string(),
612618
Value::SingleQuotedString(schema_registry.clone()).to_string(),
613619
)?;

src/ast/visit.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,12 @@ pub trait Visit<'ast> {
318318

319319
fn visit_create_sources(
320320
&mut self,
321+
like: Option<&'ast String>,
321322
url: &'ast String,
322323
schema_registry: &'ast String,
323324
with_options: &'ast Vec<SqlOption>,
324325
) {
325-
visit_create_sources(self, url, schema_registry, with_options)
326+
visit_create_sources(self, like, url, schema_registry, with_options)
326327
}
327328

328329
fn visit_source_schema(&mut self, source_schema: &'ast SourceSchema) {
@@ -541,10 +542,11 @@ pub fn visit_statement<'ast, V: Visit<'ast> + ?Sized>(visitor: &mut V, statement
541542
with_options,
542543
} => visitor.visit_create_source(name, url, schema, with_options),
543544
Statement::CreateSources {
545+
like,
544546
url,
545547
schema_registry,
546548
with_options,
547-
} => visitor.visit_create_sources(url, schema_registry, with_options),
549+
} => visitor.visit_create_sources(like.as_ref(), url, schema_registry, with_options),
548550
Statement::CreateSink {
549551
name,
550552
from,
@@ -1157,10 +1159,14 @@ pub fn visit_create_source<'ast, V: Visit<'ast> + ?Sized>(
11571159

11581160
pub fn visit_create_sources<'ast, V: Visit<'ast> + ?Sized>(
11591161
visitor: &mut V,
1162+
like: Option<&'ast String>,
11601163
url: &'ast String,
11611164
schema_registry: &'ast String,
11621165
with_options: &'ast Vec<SqlOption>,
11631166
) {
1167+
if let Some(like) = like {
1168+
visitor.visit_literal_string(like);
1169+
}
11641170
visitor.visit_literal_string(url);
11651171
visitor.visit_literal_string(schema_registry);
11661172
for option in with_options {

src/parser.rs

+11
Original file line numberDiff line numberDiff line change
@@ -904,18 +904,29 @@ impl Parser {
904904
}
905905

906906
pub fn parse_create_sources(&mut self) -> Result<Statement, ParserError> {
907+
// Need to get the LIKE if it exists, otherwise keep moving.
908+
let like = self.parse_like_filter()?;
907909
self.expect_keyword("FROM")?;
908910
let url = self.parse_literal_string()?;
909911
self.expect_keywords(&["USING", "SCHEMA", "REGISTRY"])?;
910912
let schema_registry = self.parse_literal_string()?;
911913
let with_options = self.parse_with_options()?;
912914
Ok(Statement::CreateSources {
915+
like,
913916
url,
914917
schema_registry,
915918
with_options,
916919
})
917920
}
918921

922+
fn parse_like_filter(&mut self) -> Result<Option<String>, ParserError> {
923+
if self.parse_keyword("LIKE") {
924+
Ok(Some(self.parse_literal_string()?))
925+
} else {
926+
Ok(None)
927+
}
928+
}
929+
919930
pub fn parse_create_sink(&mut self) -> Result<Statement, ParserError> {
920931
let name = self.parse_object_name()?;
921932
self.expect_keyword("FROM")?;

tests/sqlparser_common.rs

+24
Original file line numberDiff line numberDiff line change
@@ -2202,10 +2202,34 @@ fn parse_create_sources() {
22022202
let sql = "CREATE SOURCES FROM 'kafka://whatever' USING SCHEMA REGISTRY 'http://foo.bar:8081'";
22032203
match verified_stmt(sql) {
22042204
Statement::CreateSources {
2205+
like,
22052206
url,
22062207
schema_registry,
22072208
with_options,
22082209
} => {
2210+
assert!(like.is_none());
2211+
assert_eq!("kafka://whatever", url);
2212+
assert_eq!("http://foo.bar:8081", schema_registry);
2213+
assert!(with_options.is_empty());
2214+
}
2215+
_ => assert!(false),
2216+
}
2217+
}
2218+
2219+
#[test]
2220+
fn parse_create_sources_with_like_regex() {
2221+
let sql = "CREATE SOURCES LIKE '%foo%' FROM 'kafka://whatever' USING SCHEMA REGISTRY 'http://foo.bar:8081'";
2222+
match verified_stmt(sql) {
2223+
Statement::CreateSources {
2224+
like,
2225+
url,
2226+
schema_registry,
2227+
with_options,
2228+
} => {
2229+
match like {
2230+
Some(value) => assert_eq!("%foo%", value),
2231+
None => unimplemented!(),
2232+
}
22092233
assert_eq!("kafka://whatever", url);
22102234
assert_eq!("http://foo.bar:8081", schema_registry);
22112235
assert!(with_options.is_empty());

0 commit comments

Comments
 (0)