Skip to content

Commit 5e31f4d

Browse files
bozhang2820MaxGekk
authored andcommitted
[SPARK-38477][CORE] Use error class in org.apache.spark.shuffle
### What changes were proposed in this pull request? This PR aims to change exceptions created in package org.apache.spark.shuffle to use error class. This also adds an error class INTERNAL_ERROR_SHUFFLE and uses that for the internal errors in the package. ### Why are the changes needed? This is to move exceptions created in package org.apache.spark.shuffle to error class. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #41575 from bozhang2820/spark-38477. Authored-by: Bo Zhang <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent fdeb8d8 commit 5e31f4d

File tree

6 files changed

+52
-21
lines changed

6 files changed

+52
-21
lines changed

common/utils/src/main/resources/error/error-classes.json

+11-5
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,11 @@
729729
],
730730
"sqlState" : "42K04"
731731
},
732+
"FAILED_RENAME_TEMP_FILE" : {
733+
"message" : [
734+
"Failed to rename temp file <srcPath> to <dstPath> as FileSystem.rename returned false."
735+
]
736+
},
732737
"FIELDS_ALREADY_EXISTS" : {
733738
"message" : [
734739
"Cannot <op> column, because <fieldNames> already exists in <struct>."
@@ -1024,6 +1029,12 @@
10241029
],
10251030
"sqlState" : "XX000"
10261031
},
1032+
"INTERNAL_ERROR_SHUFFLE" : {
1033+
"message" : [
1034+
"<message>"
1035+
],
1036+
"sqlState" : "XX000"
1037+
},
10271038
"INTERVAL_ARITHMETIC_OVERFLOW" : {
10281039
"message" : [
10291040
"<message>.<alternative>"
@@ -5300,11 +5311,6 @@
53005311
"Failed to rename as <dstPath> already exists."
53015312
]
53025313
},
5303-
"_LEGACY_ERROR_TEMP_2199" : {
5304-
"message" : [
5305-
"Failed to rename temp file <srcPath> to <dstPath> as rename returned false."
5306-
]
5307-
},
53085314
"_LEGACY_ERROR_TEMP_2200" : {
53095315
"message" : [
53105316
"Error: we detected a possible problem with the location of your \"_spark_metadata\"",

core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.errors
1919

20-
import java.io.IOException
20+
import java.io.{File, IOException}
2121
import java.util.concurrent.TimeoutException
2222

2323
import scala.collection.JavaConverters._
@@ -467,6 +467,15 @@ private[spark] object SparkCoreErrors {
467467
"receivedBytes" -> receivedBytes.toString).asJava)
468468
}
469469

470+
def failedRenameTempFileError(srcFile: File, dstFile: File): Throwable = {
471+
new SparkException(
472+
errorClass = "FAILED_RENAME_TEMP_FILE",
473+
messageParameters = Map(
474+
"srcPath" -> srcFile.toString,
475+
"dstPath" -> dstFile.toString),
476+
cause = null)
477+
}
478+
470479
def addLocalDirectoryError(path: Path): Throwable = {
471480
new SparkException(
472481
errorClass = "UNSUPPORTED_ADD_FILE.LOCAL_DIRECTORY",

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

+15-12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.nio.file.Files
2525
import scala.collection.mutable.ArrayBuffer
2626

2727
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
28+
import org.apache.spark.errors.SparkCoreErrors
2829
import org.apache.spark.internal.{config, Logging}
2930
import org.apache.spark.io.NioBufferedFileInputStream
3031
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
@@ -227,7 +228,8 @@ private[spark] class IndexShuffleBlockResolver(
227228
remoteShuffleMaxDisk.foreach { maxBytes =>
228229
val bytesUsed = getShuffleBytesStored()
229230
if (maxBytes < bytesUsed) {
230-
throw new SparkException(s"Not storing remote shuffles $bytesUsed exceeds $maxBytes")
231+
throw SparkException.internalError(
232+
s"Not storing remote shuffles $bytesUsed exceeds $maxBytes", category = "SHUFFLE")
231233
}
232234
}
233235
val file = blockId match {
@@ -236,8 +238,8 @@ private[spark] class IndexShuffleBlockResolver(
236238
case ShuffleDataBlockId(shuffleId, mapId, _) =>
237239
getDataFile(shuffleId, mapId)
238240
case _ =>
239-
throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " +
240-
s"${blockId.getClass().getSimpleName()}")
241+
throw SparkException.internalError(s"Unexpected shuffle block transfer $blockId as " +
242+
s"${blockId.getClass().getSimpleName()}", category = "SHUFFLE")
241243
}
242244
val fileTmp = createTempFile(file)
243245
val channel = Channels.newChannel(
@@ -263,7 +265,7 @@ private[spark] class IndexShuffleBlockResolver(
263265
file.delete()
264266
}
265267
if (!fileTmp.renameTo(file)) {
266-
throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
268+
throw SparkCoreErrors.failedRenameTempFileError(fileTmp, file)
267269
}
268270
}
269271
blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
@@ -300,7 +302,7 @@ private[spark] class IndexShuffleBlockResolver(
300302

301303
// Make sure the index exist.
302304
if (!indexFile.exists()) {
303-
throw new FileNotFoundException("Index file is deleted already.")
305+
throw SparkException.internalError("Index file is deleted already.", category = "SHUFFLE")
304306
}
305307
if (dataFile.exists()) {
306308
List((dataBlockId, dataBlockData), (indexBlockId, indexBlockData))
@@ -389,7 +391,7 @@ private[spark] class IndexShuffleBlockResolver(
389391
dataFile.delete()
390392
}
391393
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
392-
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
394+
throw SparkCoreErrors.failedRenameTempFileError(dataTmp, dataFile)
393395
}
394396

395397
// write the checksum file
@@ -462,11 +464,10 @@ private[spark] class IndexShuffleBlockResolver(
462464
}
463465

464466
if (!tmpFile.renameTo(targetFile)) {
465-
val errorMsg = s"fail to rename file $tmpFile to $targetFile"
466467
if (propagateError) {
467-
throw new IOException(errorMsg)
468+
throw SparkCoreErrors.failedRenameTempFileError(tmpFile, targetFile)
468469
} else {
469-
logWarning(errorMsg)
470+
logWarning(s"fail to rename file $tmpFile to $targetFile")
470471
}
471472
}
472473
}
@@ -567,7 +568,8 @@ private[spark] class IndexShuffleBlockResolver(
567568
case batchId: ShuffleBlockBatchId =>
568569
(batchId.shuffleId, batchId.mapId, batchId.startReduceId, batchId.endReduceId)
569570
case _ =>
570-
throw new IllegalArgumentException("unexpected shuffle block id format: " + blockId)
571+
throw SparkException.internalError(
572+
s"unexpected shuffle block id format: $blockId", category = "SHUFFLE")
571573
}
572574
// The block is actually going to be a range of a single map output file for this map, so
573575
// find out the consolidated file, then the offset within that from our index
@@ -589,8 +591,9 @@ private[spark] class IndexShuffleBlockResolver(
589591
val actualPosition = channel.position()
590592
val expectedPosition = endReduceId * 8L + 8
591593
if (actualPosition != expectedPosition) {
592-
throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
593-
s"expected $expectedPosition but actual position was $actualPosition.")
594+
throw SparkException.internalError(s"SPARK-22982: Incorrect channel position after index" +
595+
s" file reads: expected $expectedPosition but actual position was $actualPosition.",
596+
category = "SHUFFLE")
594597
}
595598
new FileSegmentManagedBuffer(
596599
transportConf,

core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.spark.shuffle
1919

20-
import java.io.{Closeable, IOException, OutputStream}
20+
import java.io.{Closeable, OutputStream}
2121
import java.util.zip.Checksum
2222

23+
import org.apache.spark.SparkException
2324
import org.apache.spark.io.MutableCheckedOutputStream
2425
import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager}
2526
import org.apache.spark.shuffle.api.ShufflePartitionWriter
@@ -53,7 +54,7 @@ private[spark] class ShufflePartitionPairsWriter(
5354

5455
override def write(key: Any, value: Any): Unit = {
5556
if (isClosed) {
56-
throw new IOException("Partition pairs writer is already closed.")
57+
throw SparkException.internalError("Partition pairs writer is already closed.", "SHUFFLE")
5758
}
5859
if (objOut == null) {
5960
open()

docs/sql-error-conditions.md

+12
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,12 @@ Failed parsing struct: `<raw>`.
494494

495495
Failed to rename `<sourcePath>` to `<targetPath>` as destination already exists.
496496

497+
### FAILED_RENAME_TEMP_FILE
498+
499+
SQLSTATE: none assigned
500+
501+
Failed to rename temp file `<srcPath>` to `<dstPath>` as FileSystem.rename returned false.
502+
497503
### FIELDS_ALREADY_EXISTS
498504

499505
SQLSTATE: none assigned
@@ -722,6 +728,12 @@ For more details see [INSUFFICIENT_TABLE_PROPERTY](sql-error-conditions-insuffic
722728

723729
`<message>`
724730

731+
### INTERNAL_ERROR_SHUFFLE
732+
733+
[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error)
734+
735+
`<message>`
736+
725737
### INTERVAL_ARITHMETIC_OVERFLOW
726738

727739
[SQLSTATE: 22015](sql-error-conditions-sqlstates.html#class-22-data-exception)

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1970,7 +1970,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
19701970

19711971
def failedRenameTempFileError(srcPath: Path, dstPath: Path): Throwable = {
19721972
new SparkException(
1973-
errorClass = "_LEGACY_ERROR_TEMP_2199",
1973+
errorClass = "FAILED_RENAME_TEMP_FILE",
19741974
messageParameters = Map(
19751975
"srcPath" -> srcPath.toString(),
19761976
"dstPath" -> dstPath.toString()),

0 commit comments

Comments
 (0)