Skip to content

Commit f3c2583

Browse files
xkrogenMridul Muralidharan
authored and
Mridul Muralidharan
committed
[SPARK-33185][YARN][FOLLOW-ON] Leverage RM's RPC API instead of REST to fetch driver log links in yarn.Client
### What changes were proposed in this pull request? This is a follow-on to PR #30096 which initially added support for printing direct links to the driver stdout/stderr logs from the application report output in `yarn.Client` using the `spark.yarn.includeDriverLogsLink` configuration. That PR made use of the ResourceManager's REST APIs to fetch the necessary information to construct the links. This PR proposes removing the dependency on the REST API, since the new logic is the only place in `yarn.Client` which makes use of this API, and instead leverages the RPC API via `YarnClient`, which brings the code in line with the rest of `yarn.Client`. ### Why are the changes needed? While the old logic worked okay when running a Spark application in a "standard" environment with full access to Kerberos credentials, it can fail when run in an environment with restricted Kerberos credentials. In our case, this environment is represented by [Azkaban](https://azkaban.github.io/), but it likely affects other job scheduling systems as well. In such an environment, the application has delegation tokens which enabled it to communicate with services such as YARN, but the RM REST API is not typically covered by such delegation tokens (note that although YARN does actually support accessing the RM REST API via a delegation token as documented [here](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Delegation_Tokens_API), it is a new feature in alpha phase, and most deployments are likely not retrieving this token today). Besides this enhancement, leveraging the `YarnClient` APIs greatly simplifies the processing logic, such as removing all JSON parsing. ### Does this PR introduce _any_ user-facing change? Very minimal user-facing changes on top of PR #30096. Basically expands the scope of environments in which that feature will operate correctly. ### How was this patch tested? In addition to redoing the `spark-submit` testing as mentioned in PR #30096, I also tested this logic in a restricted-credentials environment (Azkaban). It succeeds where the previous logic would fail with a 401 error. Closes #30450 from xkrogen/xkrogen-SPARK-33185-driverlogs-followon. Authored-by: Erik Krogen <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 030b313 commit f3c2583

File tree

3 files changed

+54
-91
lines changed

3 files changed

+54
-91
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,8 @@ import scala.collection.immutable.{Map => IMap}
2929
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
3030
import scala.util.control.NonFatal
3131

32-
import com.fasterxml.jackson.databind.ObjectMapper
3332
import com.google.common.base.Objects
3433
import com.google.common.io.Files
35-
import javax.ws.rs.client.ClientBuilder
36-
import javax.ws.rs.core.MediaType
37-
import javax.ws.rs.core.Response.Status.Family
3834
import org.apache.hadoop.conf.Configuration
3935
import org.apache.hadoop.fs._
4036
import org.apache.hadoop.fs.permission.FsPermission
@@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
5147
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
5248
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
5349
import org.apache.hadoop.yarn.util.Records
54-
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
5550

5651
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
5752
import org.apache.spark.api.python.PythonUtils
@@ -1089,9 +1084,9 @@ private[spark] class Client(
10891084
// If DEBUG is enabled, log report details every iteration
10901085
// Otherwise, log them every time the application changes state
10911086
if (log.isDebugEnabled) {
1092-
logDebug(formatReportDetails(report, getDriverLogsLink(report.getApplicationId)))
1087+
logDebug(formatReportDetails(report, getDriverLogsLink(report)))
10931088
} else if (lastState != state) {
1094-
logInfo(formatReportDetails(report, getDriverLogsLink(report.getApplicationId)))
1089+
logInfo(formatReportDetails(report, getDriverLogsLink(report)))
10951090
}
10961091
}
10971092

@@ -1192,33 +1187,31 @@ private[spark] class Client(
11921187
}
11931188

11941189
/**
1195-
* Fetch links to the logs of the driver for the given application ID. This requires hitting the
1196-
* RM REST API. Returns an empty map if the links could not be fetched. If this feature is
1197-
* disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], an empty map is returned immediately.
1190+
* Fetch links to the logs of the driver for the given application report. This requires
1191+
* query the ResourceManager via RPC. Returns an empty map if the links could not be fetched.
1192+
* If this feature is disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], or if the application
1193+
* report indicates that the driver container isn't currently running, an empty map is
1194+
* returned immediately.
11981195
*/
1199-
private def getDriverLogsLink(appId: ApplicationId): IMap[String, String] = {
1200-
if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)) {
1201-
return IMap()
1196+
private def getDriverLogsLink(appReport: ApplicationReport): IMap[String, String] = {
1197+
if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)
1198+
|| appReport.getYarnApplicationState != YarnApplicationState.RUNNING) {
1199+
return IMap.empty
12021200
}
12031201
try {
1204-
val baseRmUrl = WebAppUtils.getRMWebAppURLWithScheme(hadoopConf)
1205-
val response = ClientBuilder.newClient()
1206-
.target(baseRmUrl)
1207-
.path("ws").path("v1").path("cluster").path("apps")
1208-
.path(appId.toString).path("appattempts")
1209-
.request(MediaType.APPLICATION_JSON)
1210-
.get()
1211-
response.getStatusInfo.getFamily match {
1212-
case Family.SUCCESSFUL => parseAppAttemptsJsonResponse(response.readEntity(classOf[String]))
1213-
case _ =>
1214-
logWarning(s"Unable to fetch app attempts info from $baseRmUrl, got "
1215-
+ s"status code ${response.getStatus}: ${response.getStatusInfo.getReasonPhrase}")
1216-
IMap()
1217-
}
1202+
Option(appReport.getCurrentApplicationAttemptId)
1203+
.flatMap(attemptId => Option(yarnClient.getApplicationAttemptReport(attemptId)))
1204+
.flatMap(attemptReport => Option(attemptReport.getAMContainerId))
1205+
.flatMap(amContainerId => Option(yarnClient.getContainerReport(amContainerId)))
1206+
.flatMap(containerReport => Option(containerReport.getLogUrl))
1207+
.map(YarnContainerInfoHelper.getLogUrlsFromBaseUrl)
1208+
.getOrElse(IMap.empty)
12181209
} catch {
12191210
case e: Exception =>
1220-
logWarning(s"Unable to get driver log links for $appId", e)
1221-
IMap()
1211+
logWarning(s"Unable to get driver log links for $appId: $e")
1212+
// Include the full stack trace only at DEBUG level to reduce verbosity
1213+
logDebug(s"Unable to get driver log links for $appId", e)
1214+
IMap.empty
12221215
}
12231216
}
12241217

@@ -1236,7 +1229,7 @@ private[spark] class Client(
12361229
val report = getApplicationReport(appId)
12371230
val state = report.getYarnApplicationState
12381231
logInfo(s"Application report for $appId (state: $state)")
1239-
logInfo(formatReportDetails(report, getDriverLogsLink(report.getApplicationId)))
1232+
logInfo(formatReportDetails(report, getDriverLogsLink(report)))
12401233
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
12411234
throw new SparkException(s"Application $appId finished with status: $state")
12421235
}
@@ -1627,20 +1620,6 @@ private object Client extends Logging {
16271620
writer.flush()
16281621
out.closeEntry()
16291622
}
1630-
1631-
private[yarn] def parseAppAttemptsJsonResponse(jsonString: String): IMap[String, String] = {
1632-
val objectMapper = new ObjectMapper()
1633-
// If JSON response is malformed somewhere along the way, MissingNode will be returned,
1634-
// which allows for safe continuation of chaining. The `elements()` call will be empty,
1635-
// and None will get returned.
1636-
objectMapper.readTree(jsonString)
1637-
.path("appAttempts").path("appAttempt")
1638-
.elements().asScala.toList.takeRight(1).headOption
1639-
.map(_.path("logsLink").asText(""))
1640-
.filterNot(_ == "")
1641-
.map(baseUrl => YarnContainerInfoHelper.getLogUrlsFromBaseUrl(baseUrl))
1642-
.getOrElse(IMap())
1643-
}
16441623
}
16451624

16461625
private[spark] class YarnClusterApplication extends SparkApplication {

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -583,53 +583,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
583583
}
584584
}
585585

586-
test("SPARK-33185 Parse YARN AppAttempts valid JSON response") {
587-
val appIdSuffix = "1500000000000_1234567"
588-
val containerId = s"container_e1_${appIdSuffix}_01_000001"
589-
val nodeHost = "node.example.com"
590-
val jsonString =
591-
s"""
592-
|{"appAttempts": {
593-
| "appAttempt": [ {
594-
| "id":1,
595-
| "startTime":1600000000000,
596-
| "finishedTime":1600000100000,
597-
| "containerId":"$containerId",
598-
| "nodeHttpAddress":"$nodeHost:8042",
599-
| "nodeId":"node.example.com:8041",
600-
| "logsLink":"http://$nodeHost:8042/node/containerlogs/$containerId/username",
601-
| "blacklistedNodes":"",
602-
| "nodesBlacklistedBySystem":"",
603-
| "appAttemptId":"appattempt_${appIdSuffix}_000001"
604-
| }]
605-
|}}
606-
|""".stripMargin
607-
val logLinkMap = Client.parseAppAttemptsJsonResponse(jsonString)
608-
assert(logLinkMap.keySet === Set("stdout", "stderr"))
609-
assert(logLinkMap("stdout") ===
610-
s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stdout?start=-4096")
611-
assert(logLinkMap("stderr") ===
612-
s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stderr?start=-4096")
613-
}
614-
615-
test("SPARK-33185 Parse YARN AppAttempts invalid JSON response") {
616-
// No "appAttempt" present
617-
assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { } }""") === Map())
618-
619-
// "appAttempt" is empty
620-
assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { "appAttempt": [ ] } }""")
621-
=== Map())
622-
623-
// logsLink is missing
624-
assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"id":1}]}}""")
625-
=== Map())
626-
627-
// logsLink is present but empty
628-
assert(
629-
Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"logsLink":""}]}}""")
630-
=== Map())
631-
}
632-
633586
private val matching = Seq(
634587
("files URI match test1", "file:///file1", "file:///file2"),
635588
("files URI match test2", "file:///c:file1", "file://c:file2"),

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,37 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
230230
}
231231
}
232232

233+
test("running Spark in yarn-cluster mode displays driver log links") {
234+
val log4jConf = new File(tempDir, "log4j.properties")
235+
val logOutFile = new File(tempDir, "logs")
236+
Files.write(
237+
s"""log4j.rootCategory=DEBUG,file
238+
|log4j.appender.file=org.apache.log4j.FileAppender
239+
|log4j.appender.file.file=$logOutFile
240+
|log4j.appender.file.layout=org.apache.log4j.PatternLayout
241+
|""".stripMargin,
242+
log4jConf, StandardCharsets.UTF_8)
243+
// Since this test is trying to extract log output from the SparkSubmit process itself,
244+
// standard options to the Spark process don't take effect. Leverage the java-opts file which
245+
// will get picked up for the SparkSubmit process.
246+
val confDir = new File(tempDir, "conf")
247+
confDir.mkdir()
248+
val javaOptsFile = new File(confDir, "java-opts")
249+
Files.write(s"-Dlog4j.configuration=file://$log4jConf\n", javaOptsFile, StandardCharsets.UTF_8)
250+
251+
val result = File.createTempFile("result", null, tempDir)
252+
val finalState = runSpark(clientMode = false,
253+
mainClassName(YarnClusterDriver.getClass),
254+
appArgs = Seq(result.getAbsolutePath),
255+
extraEnv = Map("SPARK_CONF_DIR" -> confDir.getAbsolutePath),
256+
extraConf = Map(CLIENT_INCLUDE_DRIVER_LOGS_LINK.key -> true.toString))
257+
checkResult(finalState, result)
258+
val logOutput = Files.toString(logOutFile, StandardCharsets.UTF_8)
259+
val logFilePattern = raw"""(?s).+\sDriver Logs \(<NAME>\): https?://.+/<NAME>(\?\S+)?\s.+"""
260+
logOutput should fullyMatch regex logFilePattern.replace("<NAME>", "stdout")
261+
logOutput should fullyMatch regex logFilePattern.replace("<NAME>", "stderr")
262+
}
263+
233264
test("timeout to get SparkContext in cluster mode triggers failure") {
234265
val timeout = 2000
235266
val finalState = runSpark(false, mainClassName(SparkContextTimeoutApp.getClass),

0 commit comments

Comments
 (0)