Skip to content

Commit 9f90518

Browse files
committed
Merge pull request #115 from yeweizhang/spy-787
SKIPME: Add logic to get the size estimation from Hadoop filesystem.
2 parents 6e6d82f + c82b2a2 commit 9f90518

File tree

3 files changed

+39
-8
lines changed

3 files changed

+39
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,17 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
194194
*/
195195
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {
196196
// TODO: Determine the number of partitions.
197-
private def numPartitions: Int = sqlContext.conf.numShufflePartitions
197+
private def numPartitions: Int =
198+
Option(sqlContext.sparkContext.getLocalProperty("spark.sql.shuffle.partitions")).map { str =>
199+
try {
200+
logDebug(s"Use spark.sql.shuffle.partitions = $str from local property")
201+
str.toInt
202+
} catch {
203+
case _: NumberFormatException =>
204+
logError(s"spark.sql.shuffle.partitions from local property value $str, expect number")
205+
sqlContext.conf.numShufflePartitions
206+
}
207+
}.getOrElse(sqlContext.conf.numShufflePartitions)
198208

199209
/**
200210
* Given a required distribution, returns a partitioning that satisfies that distribution.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ private[hive] case class MetastoreRelation
793793

794794
@transient override lazy val statistics: Statistics = Statistics(
795795
sizeInBytes = {
796+
val hiveContext = sqlContext.asInstanceOf[HiveContext]
796797
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
797798
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
798799
// TODO: check if this estimate is valid for tables after partition pruning.
@@ -801,13 +802,20 @@ private[hive] case class MetastoreRelation
801802
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
802803
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
803804
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
804-
BigInt(
805-
// When table is external,`totalSize` is always zero, which will influence join strategy
806-
// so when `totalSize` is zero, use `rawDataSize` instead
807-
// if the size is still less than zero, we use default size
808-
Option(totalSize).map(_.toLong).filter(_ > 0)
809-
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
810-
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
805+
806+
// When table is external,`totalSize` is always zero, which will influence join strategy
807+
// so when `totalSize` is zero, use `rawDataSize` instead
808+
// if the size is still less than zero, we use default size
809+
val sizeEst = Option(totalSize).map(_.toLong).filter(_ > 0)
810+
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
811+
.getOrElse(hiveContext.hadoopFileSelector.flatMap(
812+
_.getFilesSizeInBytes(
813+
hiveQlTable.getTableName,
814+
hiveQlTable.getPath.getFileSystem(hiveContext.hiveconf),
815+
hiveQlTable.getPath)).filter(_ > 0)
816+
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
817+
logDebug(s"Size estimation for table ${hiveQlTable.getTableName}: $sizeEst bytes")
818+
BigInt(sizeEst)
811819
}
812820
)
813821

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,17 @@ abstract class HadoopFileSelector {
440440
* to this table.
441441
*/
442442
def selectFiles(tableName: String, fs: FileSystem, basePath: Path): Option[Seq[Path]]
443+
444+
/**
445+
* Get size in bytes of files constituting a table from the given base path according to the
446+
* client's custom algorithm. This is only applied to non-partitioned tables.
447+
* @param tableName table name to select files for. This is the exact table name specified
448+
* in the query, not a "preprocessed" file name returned by the user-defined
449+
* function registered via [[HiveContext.setTableNamePreprocessor]].
450+
* @param fs the filesystem containing the table
451+
* @param basePath base path of the table in the filesystem
452+
* @return the total sum of file size in bytes, or [[None]] if the custom file selection
453+
* algorithm does not apply to this table.
454+
*/
455+
def getFilesSizeInBytes(tableName: String, fs: FileSystem, basePath: Path): Option[Long]
443456
}

0 commit comments

Comments
 (0)