Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,17 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
*/
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
private def numPartitions: Int = sqlContext.conf.numShufflePartitions
private def numPartitions: Int =
Option(sqlContext.sparkContext.getLocalProperty("spark.sql.shuffle.partitions")).map { str =>
try {
logDebug(s"Use spark.sql.shuffle.partitions = $str from local property")
str.toInt
} catch {
case _: NumberFormatException =>
logError(s"spark.sql.shuffle.partitions from local property value $str, expect number")
sqlContext.conf.numShufflePartitions
}
}.getOrElse(sqlContext.conf.numShufflePartitions)

/**
* Given a required distribution, returns a partitioning that satisfies that distribution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ private[hive] case class MetastoreRelation

@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
Expand All @@ -801,13 +802,20 @@ private[hive] case class MetastoreRelation
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))

// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
val sizeEst = Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(hiveContext.hadoopFileSelector.flatMap(
_.getFilesSizeInBytes(
hiveQlTable.getTableName,
hiveQlTable.getPath.getFileSystem(hiveContext.hiveconf),
hiveQlTable.getPath)).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
logDebug(s"Size estimation for table ${hiveQlTable.getTableName}: $sizeEst bytes")
BigInt(sizeEst)
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,17 @@ abstract class HadoopFileSelector {
* to this table.
*/
def selectFiles(tableName: String, fs: FileSystem, basePath: Path): Option[Seq[Path]]

/**
* Get size in bytes of files constituting a table from the given base path according to the
* client's custom algorithm. This is only applied to non-partitioned tables.
* @param tableName table name to select files for. This is the exact table name specified
* in the query, not a "preprocessed" file name returned by the user-defined
* function registered via [[HiveContext.setTableNamePreprocessor]].
* @param fs the filesystem containing the table
* @param basePath base path of the table in the filesystem
* @return the total sum of file size in bytes, or [[None]] if the custom file selection
* algorithm does not apply to this table.
*/
def getFilesSizeInBytes(tableName: String, fs: FileSystem, basePath: Path): Option[Long]
}