Skip to content

Commit 1f92995

Browse files
committed
[SPARK-44801][SQL][UI] Capture analyzing failed queries in Listener and UI
### What changes were proposed in this pull request? This PR wraps the catch-block with a new execution id to QueryExecution.assertAnalyzed. It will reuse `SQLExecution.withNewExecutionId` to produce execution events to the listener and UI. ### Why are the changes needed? The listener and UI are not able to track analyzing failed queries ### Does this PR introduce _any_ user-facing change? Yes. UI improvements. ### How was this patch tested? new tests locally verified. Prior to this, there is nothing. #### List ![image](https://github.com/apache/spark/assets/8326978/aac1fbac-e339-4781-9c11-c92a35f56633) #### Details ![image](https://github.com/apache/spark/assets/8326978/667c2038-f8e2-4b5b-9176-f214c9e3aa0a) Closes #42481 from yaooqinn/SPARK-44801. Authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent 04024fd commit 1f92995

File tree

4 files changed

+69
-13
lines changed

4 files changed

+69
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

+11-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,17 @@ class QueryExecution(
6363
// TODO: Move the planner an optimizer into here from SessionState.
6464
protected def planner = sparkSession.sessionState.planner
6565

66-
def assertAnalyzed(): Unit = analyzed
66+
def assertAnalyzed(): Unit = {
67+
try {
68+
analyzed
69+
} catch {
70+
case e: AnalysisException =>
71+
// Because we do eager analysis for Dataframe, there will be no execution created after
72+
// AnalysisException occurs. So we need to explicitly create a new execution to post
73+
// start/end events to notify the listener and UI components.
74+
SQLExecution.withNewExecutionId(this, Some("analyze"))(throw e)
75+
}
76+
}
6777

6878
def assertSupported(): Unit = {
6979
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

+14-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ package org.apache.spark.sql.execution
2020
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture}
2121
import java.util.concurrent.atomic.AtomicLong
2222

23+
import scala.util.control.NonFatal
24+
2325
import org.apache.spark.{ErrorMessageFormat, SparkThrowable, SparkThrowableHelper}
2426
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL}
27+
import org.apache.spark.internal.Logging
2528
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX}
2629
import org.apache.spark.internal.config.Tests.IS_TESTING
2730
import org.apache.spark.sql.SparkSession
@@ -30,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
3033
import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH
3134
import org.apache.spark.util.Utils
3235

33-
object SQLExecution {
36+
object SQLExecution extends Logging {
3437

3538
val EXECUTION_ID_KEY = "spark.sql.execution.id"
3639
val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id"
@@ -116,6 +119,15 @@ object SQLExecution {
116119
var ex: Option[Throwable] = None
117120
val startTime = System.nanoTime()
118121
try {
122+
val planInfo = try {
123+
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
124+
} catch {
125+
case NonFatal(e) =>
126+
logDebug("Failed to generate SparkPlanInfo", e)
127+
// If the queryExecution already failed before this, we are not able to generate the
128+
// the plan info, so we use and empty graphviz node to make the UI happy
129+
SparkPlanInfo.EMPTY
130+
}
119131
sc.listenerBus.post(SparkListenerSQLExecutionStart(
120132
executionId = executionId,
121133
rootExecutionId = Some(rootExecutionId),
@@ -124,7 +136,7 @@ object SQLExecution {
124136
physicalPlanDescription = queryExecution.explainString(planDescriptionMode),
125137
// `queryExecution.executedPlan` triggers query planning. If it fails, the exception
126138
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
127-
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
139+
sparkPlanInfo = planInfo,
128140
time = System.currentTimeMillis(),
129141
modifiedConfigs = redactedConfigs,
130142
jobTags = sc.getJobTags()

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

+2
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,6 @@ private[execution] object SparkPlanInfo {
7676
metadata,
7777
metrics)
7878
}
79+
80+
final lazy val EMPTY: SparkPlanInfo = new SparkPlanInfo("", "", Nil, Map.empty, Nil)
7981
}

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala

+42-10
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser {
3232

3333
private var spark: SparkSession = _
3434

35+
private def creatSparkSessionWithUI: SparkSession = SparkSession.builder()
36+
.master("local[1,1]")
37+
.appName("sql ui test")
38+
.config("spark.ui.enabled", "true")
39+
.config("spark.ui.port", "0")
40+
.getOrCreate()
41+
3542
implicit val webDriver: HtmlUnitDriver = new HtmlUnitDriver {
3643
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
3744
}
3845

46+
private def findErrorMessageOnSQLUI(): List[String] = {
47+
val webUrl = spark.sparkContext.uiWebUrl
48+
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
49+
go to s"${webUrl.get}/SQL"
50+
findAll(cssSelector("""#failed-table td .stacktrace-details""")).map(_.text).toList
51+
}
52+
53+
private def findExecutionIDOnSQLUI(): Int = {
54+
val webUrl = spark.sparkContext.uiWebUrl
55+
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
56+
go to s"${webUrl.get}/SQL"
57+
findAll(cssSelector("""#failed-table td""")).map(_.text).toList.head.toInt
58+
}
59+
3960
override def afterAll(): Unit = {
4061
try {
4162
webDriver.quit()
@@ -54,21 +75,32 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser {
5475
}
5576

5677
test("SPARK-44737: Should not display json format errors on SQL page for non-SparkThrowables") {
57-
spark = SparkSession.builder()
58-
.master("local[1,1]")
59-
.appName("sql ui test")
60-
.config("spark.ui.enabled", "true")
61-
.config("spark.ui.port", "0")
62-
.getOrCreate()
78+
spark = creatSparkSessionWithUI
6379

6480
intercept[Exception](spark.sql("SET mapreduce.job.reduces = 0").isEmpty)
6581
eventually(timeout(10.seconds), interval(100.milliseconds)) {
66-
val webUrl = spark.sparkContext.uiWebUrl
67-
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
68-
go to s"${webUrl.get}/SQL"
69-
val sd = findAll(cssSelector("""#failed-table td .stacktrace-details""")).map(_.text).toList
82+
val sd = findErrorMessageOnSQLUI()
7083
assert(sd.size === 1, "SET mapreduce.job.reduces = 0 shall fail")
7184
assert(sd.head.startsWith("java.lang.IllegalArgumentException:"))
7285
}
7386
}
87+
88+
test("SPARK-44801: Analyzer failure shall show the query in failed table") {
89+
spark = creatSparkSessionWithUI
90+
91+
intercept[Exception](spark.sql("SELECT * FROM I_AM_A_INVISIBLE_TABLE").isEmpty)
92+
eventually(timeout(10.seconds), interval(100.milliseconds)) {
93+
val sd = findErrorMessageOnSQLUI()
94+
assert(sd.size === 1, "Analyze fail shall show the query in failed table")
95+
assert(sd.head.startsWith("[TABLE_OR_VIEW_NOT_FOUND]"))
96+
97+
val id = findExecutionIDOnSQLUI()
98+
// check query detail page
99+
go to s"${spark.sparkContext.uiWebUrl.get}/SQL/execution/?id=$id"
100+
val planDot = findAll(cssSelector(""".dot-file""")).map(_.text).toList
101+
assert(planDot.head.startsWith("digraph G {"))
102+
val planDetails = findAll(cssSelector("""#physical-plan-details""")).map(_.text).toList
103+
assert(planDetails.head.contains("TABLE_OR_VIEW_NOT_FOUND"))
104+
}
105+
}
74106
}

0 commit comments

Comments
 (0)