Skip to content

Commit 70a098c

Browse files
committed
[SPARK-42398][SQL] Refine default column value DS v2 interface
### What changes were proposed in this pull request? The current default value DS V2 API is a bit inconsistent. The `createTable` API only takes `StructType`, so implementations must know the special metadata key of the default value to access it. The `TableChange` API has the default value as an individual field. This API adds a new `Column` interface, which holds both current default (as a SQL string) and exist default (as a v2 literal). `createTable` API now takes `Column`. This avoids the need of special metadata key and is also more extensible when adding more special cols like generated cols. This is also type-safe and makes sure the exist default is literal. The implementation is free to decide how to encode and store default values. Note: backward compatibility is taken care of. ### Why are the changes needed? better DS v2 API for default value ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #40049 from cloud-fan/table2. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5fc44da commit 70a098c

File tree

43 files changed

+670
-229
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+670
-229
lines changed

connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer
3030
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
3131
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
3232
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
33+
import org.apache.spark.sql.connector.expressions.Transform
3334
import org.apache.spark.sql.test.SharedSparkSession
3435
import org.apache.spark.sql.types.StructType
3536
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -67,16 +68,17 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession {
6768

6869
protected val inputFilePath: Path = baseResourcePath.resolve("queries")
6970
protected val goldenFilePath: Path = baseResourcePath.resolve("explain-results")
71+
private val emptyProps: util.Map[String, String] = util.Collections.emptyMap()
7072

7173
private val analyzer = {
7274
val inMemoryCatalog = new InMemoryCatalog
7375
inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty())
74-
inMemoryCatalog.createNamespace(Array("tempdb"), util.Collections.emptyMap())
76+
inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps)
7577
inMemoryCatalog.createTable(
7678
Identifier.of(Array("tempdb"), "myTable"),
7779
new StructType().add("id", "long"),
78-
Array.empty,
79-
util.Collections.emptyMap())
80+
Array.empty[Transform],
81+
emptyProps)
8082

8183
val catalogManager = new CatalogManager(
8284
inMemoryCatalog,

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Level
2727
import org.apache.spark.sql.AnalysisException
2828
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
2929
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange}
30+
import org.apache.spark.sql.connector.expressions.Transform
3031
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
3132
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
3233
import org.apache.spark.sql.test.SharedSparkSession
@@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
118119
// Drop non empty namespace without cascade
119120
catalog.createNamespace(Array("foo"), commentMap.asJava)
120121
assert(catalog.namespaceExists(Array("foo")) === true)
121-
catalog.createTable(ident1, schema, Array.empty, emptyProps)
122+
catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
122123
if (supportsDropSchemaRestrict) {
123124
intercept[NonEmptyNamespaceException] {
124125
catalog.dropNamespace(Array("foo"), cascade = false)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog;
19+
20+
import java.util.Map;
21+
import javax.annotation.Nullable;
22+
23+
import org.apache.spark.annotation.Evolving;
24+
import org.apache.spark.sql.connector.expressions.Transform;
25+
import org.apache.spark.sql.internal.connector.ColumnImpl;
26+
import org.apache.spark.sql.types.DataType;
27+
28+
/**
29+
* An interface representing a column of a {@link Table}. It defines basic properties of a column,
30+
* such as name and data type, as well as some advanced ones like default column value.
31+
* <p>
32+
* Data Sources do not need to implement it. They should consume it in APIs like
33+
* {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in
34+
* {@link Table#columns()} by calling the static {@code create} functions of this interface to
35+
* create it.
36+
*/
37+
@Evolving
38+
public interface Column {
39+
40+
static Column create(String name, DataType dataType) {
41+
return create(name, dataType, true);
42+
}
43+
44+
static Column create(String name, DataType dataType, boolean nullable) {
45+
return create(name, dataType, nullable, null, null, null);
46+
}
47+
48+
static Column create(
49+
String name,
50+
DataType dataType,
51+
boolean nullable,
52+
String comment,
53+
ColumnDefaultValue defaultValue,
54+
String metadataInJSON) {
55+
return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON);
56+
}
57+
58+
/**
59+
* Returns the name of this table column.
60+
*/
61+
String name();
62+
63+
/**
64+
* Returns the data type of this table column.
65+
*/
66+
DataType dataType();
67+
68+
/**
69+
* Returns true if this column may produce null values.
70+
*/
71+
boolean nullable();
72+
73+
/**
74+
* Returns the comment of this table column. Null means no comment.
75+
*/
76+
@Nullable
77+
String comment();
78+
79+
/**
80+
* Returns the default value of this table column. Null means no default value.
81+
*/
82+
@Nullable
83+
ColumnDefaultValue defaultValue();
84+
85+
/**
86+
* Returns the column metadata in JSON format.
87+
*/
88+
@Nullable
89+
String metadataInJSON();
90+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog;
19+
20+
import java.util.Objects;
21+
import javax.annotation.Nonnull;
22+
23+
import org.apache.spark.annotation.Evolving;
24+
import org.apache.spark.sql.connector.expressions.Literal;
25+
26+
/**
27+
* A class representing the default value of a column. It contains both the SQL string and literal
28+
* value of the user-specified default value expression. The SQL string should be re-evaluated for
29+
* each table writing command, which may produce different values if the default value expression is
30+
* something like {@code CURRENT_DATE()}. The literal value is used to back-fill existing data if
31+
* new columns with default value are added. Note: the back-fill can be lazy. The data sources can
32+
* remember the column default value and let the reader fill the column value when reading existing
33+
* data that do not have these new columns.
34+
*/
35+
@Evolving
36+
public class ColumnDefaultValue {
37+
private String sql;
38+
private Literal<?> value;
39+
40+
public ColumnDefaultValue(String sql, Literal<?> value) {
41+
this.sql = sql;
42+
this.value = value;
43+
}
44+
45+
/**
46+
* Returns the SQL string (Spark SQL dialect) of the default value expression. This is the
47+
* original string contents of the SQL expression specified at the time the column was created in
48+
* a CREATE TABLE, REPLACE TABLE, or ADD COLUMN command. For example, for
49+
* "CREATE TABLE t (col INT DEFAULT 40 + 2)", this returns the string literal "40 + 2" (without
50+
* quotation marks).
51+
*/
52+
@Nonnull
53+
public String getSql() {
54+
return sql;
55+
}
56+
57+
/**
58+
* Returns the default value literal. This is the literal value corresponding to
59+
* {@link #getSql()}. For the example in the doc of {@link #getSql()}, this returns a literal
60+
* integer with a value of 42.
61+
*/
62+
@Nonnull
63+
public Literal<?> getValue() {
64+
return value;
65+
}
66+
67+
@Override
68+
public boolean equals(Object o) {
69+
if (this == o) return true;
70+
if (!(o instanceof ColumnDefaultValue)) return false;
71+
ColumnDefaultValue that = (ColumnDefaultValue) o;
72+
return sql.equals(that.sql) && value.equals(that.value);
73+
}
74+
75+
@Override
76+
public int hashCode() {
77+
return Objects.hash(sql, value);
78+
}
79+
80+
@Override
81+
public String toString() {
82+
return "ColumnDefaultValue{sql='" + sql + "\', value=" + value + '}';
83+
}
84+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@
5454
@Evolving
5555
public interface StagingTableCatalog extends TableCatalog {
5656

57+
/**
58+
* Stage the creation of a table, preparing it to be committed into the metastore.
59+
* <p>
60+
* This is deprecated. Please override
61+
* {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
62+
*/
63+
@Deprecated
64+
StagedTable stageCreate(
65+
Identifier ident,
66+
StructType schema,
67+
Transform[] partitions,
68+
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
69+
5770
/**
5871
* Stage the creation of a table, preparing it to be committed into the metastore.
5972
* <p>
@@ -64,19 +77,34 @@ public interface StagingTableCatalog extends TableCatalog {
6477
* committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
6578
*
6679
* @param ident a table identifier
67-
* @param schema the schema of the new table, as a struct type
80+
* @param columns the column of the new table
6881
* @param partitions transforms to use for partitioning data in the table
6982
* @param properties a string map of table properties
7083
* @return metadata for the new table
7184
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
7285
* @throws UnsupportedOperationException If a requested partition transform is not supported
7386
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
7487
*/
75-
StagedTable stageCreate(
88+
default StagedTable stageCreate(
89+
Identifier ident,
90+
Column[] columns,
91+
Transform[] partitions,
92+
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
93+
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
94+
}
95+
96+
/**
97+
* Stage the replacement of a table, preparing it to be committed into the metastore when the
98+
* returned table's {@link StagedTable#commitStagedChanges()} is called.
99+
* <p>
100+
* This is deprecated, please override
101+
* {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
102+
*/
103+
StagedTable stageReplace(
76104
Identifier ident,
77105
StructType schema,
78106
Transform[] partitions,
79-
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
107+
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
80108

81109
/**
82110
* Stage the replacement of a table, preparing it to be committed into the metastore when the
@@ -97,19 +125,35 @@ StagedTable stageCreate(
97125
* operation.
98126
*
99127
* @param ident a table identifier
100-
* @param schema the schema of the new table, as a struct type
128+
* @param columns the columns of the new table
101129
* @param partitions transforms to use for partitioning data in the table
102130
* @param properties a string map of table properties
103131
* @return metadata for the new table
104132
* @throws UnsupportedOperationException If a requested partition transform is not supported
105133
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
106134
* @throws NoSuchTableException If the table does not exist
107135
*/
108-
StagedTable stageReplace(
136+
default StagedTable stageReplace(
137+
Identifier ident,
138+
Column[] columns,
139+
Transform[] partitions,
140+
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
141+
return stageReplace(
142+
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
143+
}
144+
145+
/**
146+
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
147+
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
148+
* <p>
149+
* This is deprecated, please override
150+
* {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)} instead.
151+
*/
152+
StagedTable stageCreateOrReplace(
109153
Identifier ident,
110154
StructType schema,
111155
Transform[] partitions,
112-
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
156+
Map<String, String> properties) throws NoSuchNamespaceException;
113157

114158
/**
115159
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
@@ -129,16 +173,19 @@ StagedTable stageReplace(
129173
* the staged changes are committed but the table doesn't exist at commit time.
130174
*
131175
* @param ident a table identifier
132-
* @param schema the schema of the new table, as a struct type
176+
* @param columns the columns of the new table
133177
* @param partitions transforms to use for partitioning data in the table
134178
* @param properties a string map of table properties
135179
* @return metadata for the new table
136180
* @throws UnsupportedOperationException If a requested partition transform is not supported
137181
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
138182
*/
139-
StagedTable stageCreateOrReplace(
183+
default StagedTable stageCreateOrReplace(
140184
Identifier ident,
141-
StructType schema,
185+
Column[] columns,
142186
Transform[] partitions,
143-
Map<String, String> properties) throws NoSuchNamespaceException;
187+
Map<String, String> properties) throws NoSuchNamespaceException {
188+
return stageCreateOrReplace(
189+
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
190+
}
144191
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,20 @@ public interface Table {
5151
/**
5252
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
5353
* empty schema can be returned here.
54+
* <p>
55+
* This is deprecated. Please override {@link #columns} instead.
5456
*/
57+
@Deprecated
5558
StructType schema();
5659

60+
/**
61+
* Returns the columns of this table. If the table is not readable and doesn't have a schema, an
62+
* empty array can be returned here.
63+
*/
64+
default Column[] columns() {
65+
return CatalogV2Util.structTypeToV2Columns(schema());
66+
}
67+
5768
/**
5869
* Returns the physical partitioning of this table.
5970
*/

0 commit comments

Comments
 (0)