Skip to content

Commit 3972455

Browse files
anishshri-dbragnarok56
authored andcommitted
[SPARK-44818] Fix race for pending task kill issued before taskThread is initialized
### What changes were proposed in this pull request? Fix race for pending task kill issued before taskThread is initialized ### Why are the changes needed? We see that there is a race for tasks that are interrupted through stage cancellation and that may be added to the TaskSet, but don't yet have taskThread initialized. Basically, we try to kill ongoing task attempts to handle stage cancellation ``` logInfo("Cancelling stage " + stageId) // Kill all running tasks for the stage. killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " + reason) // Cancel all attempts for the stage. ``` However, there is a chance that taskThread is not initialized yet and we only set the reasonIfKilled. ``` def kill(interruptThread: Boolean, reason: String): Unit = { require(reason != null) _reasonIfKilled = reason if (context != null) { context.markInterrupted(reason) } if (interruptThread && taskThread != null) { taskThread.interrupt(). <--- never hit } ``` Then within the task execution thread itself, we try to call kill again since the reasonIfKilled is set. However, this time we pass interruptThread as false explicitly since we don't know the status of the previous call. ``` taskThread = Thread.currentThread() if (_reasonIfKilled != null) { kill(interruptThread = false, _reasonIfKilled) <-- only context will be set, } ``` The TaskReaper has also finished its previous and only attempt at task interruption since we don't try for multiple times in this case. Eventually, the task is not interrupted even once and it gets blocked on some I/O or wait calls which might not finish within the reaper timeout, leading to the JVM being killed. ``` taskRunner.kill(interruptThread = interruptThread, reason = reason) ``` The change tries to fix this issue by checking for the presence of `reasonIfKilled` on the context and issuing a `TaskKilledException` before we execute `runTask` thereby preventing execution of the actual task and freeing up the slot and also preventing future issues with the reaper. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` [info] JobCancellationSuite: ... [info] Run completed in 35 seconds, 781 milliseconds. [info] Total number of tests run: 13 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes apache#42504 from anishshri-db/task/SPARK-44818. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Josh Rosen <[email protected]>
1 parent 438ff01 commit 3972455

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

core/src/main/scala/org/apache/spark/TaskContext.scala

+5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ abstract class TaskContext extends Serializable {
158158
/** Runs a task with this context, ensuring failure and completion listeners get triggered. */
159159
private[spark] def runTaskWithListeners[T](task: Task[T]): T = {
160160
try {
161+
// SPARK-44818 - Its possible that taskThread has not been initialized when kill is initially
162+
// called with interruptThread=true. We do set the reason and eventually will set it on the
163+
// context too within run(). If that's the case, kill the thread before it starts executing
164+
// the actual task.
165+
killTaskIfInterrupted()
161166
task.runTask(this)
162167
} catch {
163168
case e: Throwable =>

0 commit comments

Comments
 (0)