Skip to content

Commit c699435

Browse files
committed
[SPARK-33545][CORE] Support Fallback Storage during Worker decommission
### What changes were proposed in this pull request? This PR aims to support storage migration to the fallback storage like cloud storage (`S3`) during worker decommission for the corner cases where the exceptions occur or there is no live peer left. Although this PR focuses on cloud storage like `S3` which has a TTL feature in order to simplify Spark's logic, we can use alternative fallback storages like HDFS/NFS(EFS) if the user provides a clean-up mechanism. ### Why are the changes needed? Currently, storage migration is not possible when there is no available executor. For example, when there is one executor, the executor cannot perform storage migration because it has no peer. ### Does this PR introduce _any_ user-facing change? Yes. This is a new feature. ### How was this patch tested? Pass the CIs with newly added test cases. Closes #30492 from dongjoon-hyun/SPARK-33545. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f3c2583 commit c699435

File tree

9 files changed

+517
-4
lines changed

9 files changed

+517
-4
lines changed

core/pom.xml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,47 @@
461461
<scope>test</scope>
462462
</dependency>
463463

464+
<dependency>
465+
<groupId>org.apache.hadoop</groupId>
466+
<artifactId>hadoop-aws</artifactId>
467+
<version>${hadoop.version}</version>
468+
<scope>test</scope>
469+
<exclusions>
470+
<exclusion>
471+
<groupId>org.apache.hadoop</groupId>
472+
<artifactId>hadoop-common</artifactId>
473+
</exclusion>
474+
<exclusion>
475+
<groupId>commons-logging</groupId>
476+
<artifactId>commons-logging</artifactId>
477+
</exclusion>
478+
<exclusion>
479+
<groupId>org.codehaus.jackson</groupId>
480+
<artifactId>jackson-mapper-asl</artifactId>
481+
</exclusion>
482+
<exclusion>
483+
<groupId>org.codehaus.jackson</groupId>
484+
<artifactId>jackson-core-asl</artifactId>
485+
</exclusion>
486+
<exclusion>
487+
<groupId>com.fasterxml.jackson.core</groupId>
488+
<artifactId>jackson-core</artifactId>
489+
</exclusion>
490+
<exclusion>
491+
<groupId>com.fasterxml.jackson.core</groupId>
492+
<artifactId>jackson-databind</artifactId>
493+
</exclusion>
494+
<exclusion>
495+
<groupId>com.fasterxml.jackson.core</groupId>
496+
<artifactId>jackson-annotations</artifactId>
497+
</exclusion>
498+
<!-- Keep old SDK out of the assembly to avoid conflict with Kinesis module -->
499+
<exclusion>
500+
<groupId>com.amazonaws</groupId>
501+
<artifactId>aws-java-sdk</artifactId>
502+
</exclusion>
503+
</exclusions>
504+
</dependency>
464505
<dependency>
465506
<groupId>org.apache.commons</groupId>
466507
<artifactId>commons-crypto</artifactId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ class SparkContext(config: SparkConf) extends Logging {
576576
}
577577
_ui.foreach(_.setAppId(_applicationId))
578578
_env.blockManager.initialize(_applicationId)
579+
FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf)
579580

580581
// The metrics system for Driver need to be set spark.app.id to app ID.
581582
// So it should start after we get app ID from the task scheduler and set spark.app.id.

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,16 @@ package object config {
471471
"cache block replication should be positive.")
472472
.createWithDefaultString("30s")
473473

474+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
475+
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
476+
.doc("The location for fallback storage during block manager decommissioning. " +
477+
"For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " +
478+
"The storage should be managed by TTL because Spark will not clean it up.")
479+
.version("3.1.0")
480+
.stringConf
481+
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
482+
.createOptional
483+
474484
private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
475485
ConfigBuilder("spark.storage.replication.topologyFile")
476486
.version("2.1.0")

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private[spark] class IndexShuffleBlockResolver(
9191
* When the dirs parameter is None then use the disk manager's local directories. Otherwise,
9292
* read from the specified directories.
9393
*/
94-
private def getIndexFile(
94+
def getIndexFile(
9595
shuffleId: Int,
9696
mapId: Long,
9797
dirs: Option[Array[String]] = None): File = {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,16 @@ private[spark] class BlockManager(
627627
override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
628628
if (blockId.isShuffle) {
629629
logDebug(s"Getting local shuffle block ${blockId}")
630-
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
630+
try {
631+
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
632+
} catch {
633+
case e: IOException =>
634+
if (conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
635+
FallbackStorage.read(conf, blockId)
636+
} else {
637+
throw e
638+
}
639+
}
631640
} else {
632641
getLocalBytes(blockId) match {
633642
case Some(blockData) =>
@@ -1580,7 +1589,12 @@ private[spark] class BlockManager(
15801589
lastPeerFetchTimeNs = System.nanoTime()
15811590
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
15821591
}
1583-
cachedPeers
1592+
if (cachedPeers.isEmpty &&
1593+
conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
1594+
Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
1595+
} else {
1596+
cachedPeers
1597+
}
15841598
}
15851599
}
15861600

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private[storage] class BlockManagerDecommissioner(
3939
conf: SparkConf,
4040
bm: BlockManager) extends Logging {
4141

42+
private val fallbackStorage = FallbackStorage.getFallbackStorage(conf)
4243
private val maxReplicationFailuresForDecommission =
4344
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
4445

@@ -114,6 +115,8 @@ private[storage] class BlockManagerDecommissioner(
114115
// driver a no longer referenced RDD with shuffle files.
115116
if (bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).isEmpty) {
116117
logWarning(s"Skipping block ${shuffleBlockInfo}, block deleted.")
118+
} else if (fallbackStorage.isDefined) {
119+
fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
117120
} else {
118121
throw e
119122
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import java.io.DataInputStream
21+
import java.nio.ByteBuffer
22+
23+
import scala.concurrent.Future
24+
import scala.reflect.ClassTag
25+
26+
import org.apache.hadoop.fs.{FileSystem, Path}
27+
28+
import org.apache.spark.SparkConf
29+
import org.apache.spark.deploy.SparkHadoopUtil
30+
import org.apache.spark.internal.Logging
31+
import org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
32+
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
33+
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
34+
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
35+
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
36+
import org.apache.spark.util.Utils
37+
38+
/**
39+
* A fallback storage used by storage decommissioners.
40+
*/
41+
private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
42+
require(conf.contains("spark.app.id"))
43+
require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
44+
45+
private val fallbackPath = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
46+
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
47+
private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, hadoopConf)
48+
private val appId = conf.getAppId
49+
50+
// Visible for testing
51+
def copy(
52+
shuffleBlockInfo: ShuffleBlockInfo,
53+
bm: BlockManager): Unit = {
54+
val shuffleId = shuffleBlockInfo.shuffleId
55+
val mapId = shuffleBlockInfo.mapId
56+
57+
bm.migratableResolver match {
58+
case r: IndexShuffleBlockResolver =>
59+
val indexFile = r.getIndexFile(shuffleId, mapId)
60+
61+
if (indexFile.exists()) {
62+
fallbackFileSystem.copyFromLocalFile(
63+
new Path(indexFile.getAbsolutePath),
64+
new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}"))
65+
66+
val dataFile = r.getDataFile(shuffleId, mapId)
67+
if (dataFile.exists()) {
68+
fallbackFileSystem.copyFromLocalFile(
69+
new Path(dataFile.getAbsolutePath),
70+
new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}"))
71+
}
72+
73+
// Report block statuses
74+
val reduceId = NOOP_REDUCE_ID
75+
val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, reduceId)
76+
FallbackStorage.reportBlockStatus(bm, indexBlockId, indexFile.length)
77+
if (dataFile.exists) {
78+
val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, reduceId)
79+
FallbackStorage.reportBlockStatus(bm, dataBlockId, dataFile.length)
80+
}
81+
}
82+
case r =>
83+
logWarning(s"Unsupported Resolver: ${r.getClass.getName}")
84+
}
85+
}
86+
87+
def exists(shuffleId: Int, filename: String): Boolean = {
88+
fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$filename"))
89+
}
90+
}
91+
92+
class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
93+
import scala.concurrent.ExecutionContext.Implicits.global
94+
override def address: RpcAddress = null
95+
override def name: String = "fallback"
96+
override def send(message: Any): Unit = {}
97+
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
98+
Future{true.asInstanceOf[T]}
99+
}
100+
}
101+
102+
object FallbackStorage extends Logging {
103+
/** We use one block manager id as a place holder. */
104+
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
105+
106+
def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
107+
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
108+
Some(new FallbackStorage(conf))
109+
} else {
110+
None
111+
}
112+
}
113+
114+
/** Register the fallback block manager and its RPC endpoint. */
115+
def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: SparkConf): Unit = {
116+
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
117+
master.registerBlockManager(
118+
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new NoopRpcEndpointRef(conf))
119+
}
120+
}
121+
122+
/** Report block status to block manager master and map output tracker master. */
123+
private def reportBlockStatus(blockManager: BlockManager, blockId: BlockId, dataLength: Long) = {
124+
assert(blockManager.master != null)
125+
blockManager.master.updateBlockInfo(
126+
FALLBACK_BLOCK_MANAGER_ID, blockId, StorageLevel.DISK_ONLY, memSize = 0, dataLength)
127+
}
128+
129+
/**
130+
* Read a ManagedBuffer.
131+
*/
132+
def read(conf: SparkConf, blockId: BlockId): ManagedBuffer = {
133+
logInfo(s"Read $blockId")
134+
val fallbackPath = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
135+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
136+
val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, hadoopConf)
137+
val appId = conf.getAppId
138+
139+
val (shuffleId, mapId, startReduceId, endReduceId) = blockId match {
140+
case id: ShuffleBlockId =>
141+
(id.shuffleId, id.mapId, id.reduceId, id.reduceId + 1)
142+
case batchId: ShuffleBlockBatchId =>
143+
(batchId.shuffleId, batchId.mapId, batchId.startReduceId, batchId.endReduceId)
144+
case _ =>
145+
throw new IllegalArgumentException("unexpected shuffle block id format: " + blockId)
146+
}
147+
148+
val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
149+
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
150+
val start = startReduceId * 8L
151+
val end = endReduceId * 8L
152+
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
153+
Utils.tryWithResource(new DataInputStream(inputStream)) { index =>
154+
index.skip(start)
155+
val offset = index.readLong()
156+
index.skip(end - (start + 8L))
157+
val nextOffset = index.readLong()
158+
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
159+
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
160+
val f = fallbackFileSystem.open(dataFile)
161+
val size = nextOffset - 1 - offset
162+
logDebug(s"To byte array $size")
163+
val array = new Array[Byte](size.toInt)
164+
val startTimeNs = System.nanoTime()
165+
f.seek(offset)
166+
f.read(array)
167+
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
168+
f.close()
169+
new NioManagedBuffer(ByteBuffer.wrap(array))
170+
}
171+
}
172+
}
173+
}
174+

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,9 @@ final class ShuffleBlockFetcherIterator(
295295
var hostLocalBlockBytes = 0L
296296
var remoteBlockBytes = 0L
297297

298+
val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
298299
for ((address, blockInfos) <- blocksByAddress) {
299-
if (address.executorId == blockManager.blockManagerId.executorId) {
300+
if (Seq(blockManager.blockManagerId.executorId, fallback).contains(address.executorId)) {
300301
checkBlockSizes(blockInfos)
301302
val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
302303
blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch)

0 commit comments

Comments
 (0)