Skip to content

Commit a76bd9d

Browse files
committed
[SPARK-42379][SS] Use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists
### What changes were proposed in this pull request? This PR proposes to use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists, which is consistent with other methods in FileSystemBasedCheckpointFileManager. This PR also removes the test case QueryExecutionErrorsSuite.FAILED_RENAME_PATH: rename when destination path already exists because the test relies on incorrect custom file system instance with non-symmetric implementation between `FileSystemBasedCheckpointFileManager.exists` vs `FileSystem.exists`. (See detailed explanation from #39936 (comment)) ### Why are the changes needed? Other methods in FileSystemBasedCheckpointFileManager already uses FileSystem.exists for all cases checking existence of the path. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #39936 from HeartSaVioR/MINOR-FileSystemBasedCheckpointFileManager-calls-fs-exists-in-exists. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 67b6f0e commit a76bd9d

File tree

2 files changed

+1
-41
lines changed

2 files changed

+1
-41
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
256256
fs.open(path)
257257
}
258258

259-
override def exists(path: Path): Boolean =
260-
try {
261-
fs.getFileStatus(path) != null
262-
} catch {
263-
case _: FileNotFoundException => false
264-
}
259+
override def exists(path: Path): Boolean = fs.exists(path)
265260

266261
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
267262
if (!overwriteIfPossible && fs.exists(dstPath)) {

sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -634,37 +634,6 @@ class QueryExecutionErrorsSuite
634634
sqlState = "0A000")
635635
}
636636

637-
test("FAILED_RENAME_PATH: rename when destination path already exists") {
638-
withTempPath { p =>
639-
withSQLConf(
640-
"spark.sql.streaming.checkpointFileManagerClass" ->
641-
classOf[FileSystemBasedCheckpointFileManager].getName,
642-
"fs.file.impl" -> classOf[FakeFileSystemAlwaysExists].getName,
643-
// FileSystem caching could cause a different implementation of fs.file to be used
644-
"fs.file.impl.disable.cache" -> "true") {
645-
val checkpointLocation = p.getAbsolutePath
646-
647-
val ds = spark.readStream.format("rate").load()
648-
val e = intercept[SparkConcurrentModificationException] {
649-
ds.writeStream
650-
.option("checkpointLocation", checkpointLocation)
651-
.queryName("_")
652-
.format("memory")
653-
.start()
654-
}
655-
656-
val expectedPath = p.toURI
657-
checkError(
658-
exception = e.getCause.asInstanceOf[SparkFileAlreadyExistsException],
659-
errorClass = "FAILED_RENAME_PATH",
660-
sqlState = Some("42K04"),
661-
matchPVals = true,
662-
parameters = Map("sourcePath" -> s"$expectedPath.+",
663-
"targetPath" -> s"$expectedPath.+"))
664-
}
665-
}
666-
}
667-
668637
test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not exist") {
669638
withTempPath { p =>
670639
withSQLConf(
@@ -805,10 +774,6 @@ class FakeFileSystemSetPermission extends LocalFileSystem {
805774
}
806775
}
807776

808-
class FakeFileSystemAlwaysExists extends DebugFilesystem {
809-
override def exists(f: Path): Boolean = true
810-
}
811-
812777
class FakeFileSystemNeverExists extends DebugFilesystem {
813778
override def exists(f: Path): Boolean = false
814779
}

0 commit comments

Comments
 (0)