Skip to content

Commit 69b0e3e

Browse files
MaxGekkragnarok56
authored andcommitted
Revert "[SPARK-44801][SQL][UI] Capture analyzing failed queries in Listener and UI"
### What changes were proposed in this pull request? In the PR, I propose to revert the commit: apache@1f92995 ### Why are the changes needed? To fix the test suite `StringFunctionsSuite`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suite: ``` $ build/sbt "test:testOnly *.StringFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#42589 from MaxGekk/revert-yaooqinn-SPARK-44801. Authored-by: Max Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 3972455 commit 69b0e3e

File tree

4 files changed

+13
-69
lines changed

4 files changed

+13
-69
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

+1-11
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,7 @@ class QueryExecution(
6363
// TODO: Move the planner an optimizer into here from SessionState.
6464
protected def planner = sparkSession.sessionState.planner
6565

66-
def assertAnalyzed(): Unit = {
67-
try {
68-
analyzed
69-
} catch {
70-
case e: AnalysisException =>
71-
// Because we do eager analysis for Dataframe, there will be no execution created after
72-
// AnalysisException occurs. So we need to explicitly create a new execution to post
73-
// start/end events to notify the listener and UI components.
74-
SQLExecution.withNewExecutionId(this, Some("analyze"))(throw e)
75-
}
76-
}
66+
def assertAnalyzed(): Unit = analyzed
7767

7868
def assertSupported(): Unit = {
7969
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

+2-14
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,8 @@ package org.apache.spark.sql.execution
2020
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture}
2121
import java.util.concurrent.atomic.AtomicLong
2222

23-
import scala.util.control.NonFatal
24-
2523
import org.apache.spark.{ErrorMessageFormat, SparkThrowable, SparkThrowableHelper}
2624
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL}
27-
import org.apache.spark.internal.Logging
2825
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX}
2926
import org.apache.spark.internal.config.Tests.IS_TESTING
3027
import org.apache.spark.sql.SparkSession
@@ -33,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf
3330
import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH
3431
import org.apache.spark.util.Utils
3532

36-
object SQLExecution extends Logging {
33+
object SQLExecution {
3734

3835
val EXECUTION_ID_KEY = "spark.sql.execution.id"
3936
val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id"
@@ -119,15 +116,6 @@ object SQLExecution extends Logging {
119116
var ex: Option[Throwable] = None
120117
val startTime = System.nanoTime()
121118
try {
122-
val planInfo = try {
123-
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
124-
} catch {
125-
case NonFatal(e) =>
126-
logDebug("Failed to generate SparkPlanInfo", e)
127-
// If the queryExecution already failed before this, we are not able to generate the
128-
// the plan info, so we use and empty graphviz node to make the UI happy
129-
SparkPlanInfo.EMPTY
130-
}
131119
sc.listenerBus.post(SparkListenerSQLExecutionStart(
132120
executionId = executionId,
133121
rootExecutionId = Some(rootExecutionId),
@@ -136,7 +124,7 @@ object SQLExecution extends Logging {
136124
physicalPlanDescription = queryExecution.explainString(planDescriptionMode),
137125
// `queryExecution.executedPlan` triggers query planning. If it fails, the exception
138126
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
139-
sparkPlanInfo = planInfo,
127+
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
140128
time = System.currentTimeMillis(),
141129
modifiedConfigs = redactedConfigs,
142130
jobTags = sc.getJobTags()

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

-2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,4 @@ private[execution] object SparkPlanInfo {
7676
metadata,
7777
metrics)
7878
}
79-
80-
final lazy val EMPTY: SparkPlanInfo = new SparkPlanInfo("", "", Nil, Map.empty, Nil)
8179
}

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala

+10-42
Original file line numberDiff line numberDiff line change
@@ -32,31 +32,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser {
3232

3333
private var spark: SparkSession = _
3434

35-
private def creatSparkSessionWithUI: SparkSession = SparkSession.builder()
36-
.master("local[1,1]")
37-
.appName("sql ui test")
38-
.config("spark.ui.enabled", "true")
39-
.config("spark.ui.port", "0")
40-
.getOrCreate()
41-
4235
implicit val webDriver: HtmlUnitDriver = new HtmlUnitDriver {
4336
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
4437
}
4538

46-
private def findErrorMessageOnSQLUI(): List[String] = {
47-
val webUrl = spark.sparkContext.uiWebUrl
48-
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
49-
go to s"${webUrl.get}/SQL"
50-
findAll(cssSelector("""#failed-table td .stacktrace-details""")).map(_.text).toList
51-
}
52-
53-
private def findExecutionIDOnSQLUI(): Int = {
54-
val webUrl = spark.sparkContext.uiWebUrl
55-
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
56-
go to s"${webUrl.get}/SQL"
57-
findAll(cssSelector("""#failed-table td""")).map(_.text).toList.head.toInt
58-
}
59-
6039
override def afterAll(): Unit = {
6140
try {
6241
webDriver.quit()
@@ -75,32 +54,21 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser {
7554
}
7655

7756
test("SPARK-44737: Should not display json format errors on SQL page for non-SparkThrowables") {
78-
spark = creatSparkSessionWithUI
57+
spark = SparkSession.builder()
58+
.master("local[1,1]")
59+
.appName("sql ui test")
60+
.config("spark.ui.enabled", "true")
61+
.config("spark.ui.port", "0")
62+
.getOrCreate()
7963

8064
intercept[Exception](spark.sql("SET mapreduce.job.reduces = 0").isEmpty)
8165
eventually(timeout(10.seconds), interval(100.milliseconds)) {
82-
val sd = findErrorMessageOnSQLUI()
66+
val webUrl = spark.sparkContext.uiWebUrl
67+
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
68+
go to s"${webUrl.get}/SQL"
69+
val sd = findAll(cssSelector("""#failed-table td .stacktrace-details""")).map(_.text).toList
8370
assert(sd.size === 1, "SET mapreduce.job.reduces = 0 shall fail")
8471
assert(sd.head.startsWith("java.lang.IllegalArgumentException:"))
8572
}
8673
}
87-
88-
test("SPARK-44801: Analyzer failure shall show the query in failed table") {
89-
spark = creatSparkSessionWithUI
90-
91-
intercept[Exception](spark.sql("SELECT * FROM I_AM_A_INVISIBLE_TABLE").isEmpty)
92-
eventually(timeout(10.seconds), interval(100.milliseconds)) {
93-
val sd = findErrorMessageOnSQLUI()
94-
assert(sd.size === 1, "Analyze fail shall show the query in failed table")
95-
assert(sd.head.startsWith("[TABLE_OR_VIEW_NOT_FOUND]"))
96-
97-
val id = findExecutionIDOnSQLUI()
98-
// check query detail page
99-
go to s"${spark.sparkContext.uiWebUrl.get}/SQL/execution/?id=$id"
100-
val planDot = findAll(cssSelector(""".dot-file""")).map(_.text).toList
101-
assert(planDot.head.startsWith("digraph G {"))
102-
val planDetails = findAll(cssSelector("""#physical-plan-details""")).map(_.text).toList
103-
assert(planDetails.head.contains("TABLE_OR_VIEW_NOT_FOUND"))
104-
}
105-
}
10674
}

0 commit comments

Comments
 (0)