Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.SparkException
import org.apache.spark.{DebugFilesystem, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
Expand Down Expand Up @@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}

/**
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
* to increase the chance of failure
*/
ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}

for (i <- 1 to 100) {
DebugFilesystem.clearOpenStreams()
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
}
DebugFilesystem.assertNoOpenStreams()
}
}

test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import java.net.URI
import java.nio.file.Files
import java.util.UUID

import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
Expand All @@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
*/
private[sql] trait SQLTestUtils
extends SparkFunSuite
extends SparkFunSuite with Eventually
with BeforeAndAfterAll
with SQLTestData { self =>

Expand Down Expand Up @@ -138,6 +140,15 @@ private[sql] trait SQLTestUtils
}
}

/**
* Waits for all tasks on all executors to be finished.
*/
protected def waitForTasksToFinish(): Unit = {
eventually(timeout(10.seconds)) {
assert(spark.sparkContext.statusTracker
.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
}
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
Expand All @@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
try f(dir) finally {
// wait for all tasks to finish before deleting files
waitForTasksToFinish()
Utils.deleteRecursively(dir)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.spark.sql.test

import scala.concurrent.duration._

import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.internal.SQLConf


/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {

protected val sparkConf = new SparkConf()

Expand Down Expand Up @@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {

protected override def afterEach(): Unit = {
super.afterEach()
DebugFilesystem.assertNoOpenStreams()
// files can be closed from other threads, so wait a bit
// normally this doesn't take more than 1s
eventually(timeout(10.seconds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a catch all for when we are not using withTempDir right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so that DebugFilesystem.assertNoOpenStreams has a better chance of working. And in general, whatever else runs at the end of the test. For now it's just assertNoOpenStreams

DebugFilesystem.assertNoOpenStreams()
}
}
}