diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 894c2545a36c6..d9d9230b45bd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -194,7 +194,17 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una */ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. - private def numPartitions: Int = sqlContext.conf.numShufflePartitions + private def numPartitions: Int = + Option(sqlContext.sparkContext.getLocalProperty("spark.sql.shuffle.partitions")).map { str => + try { + logDebug(s"Use spark.sql.shuffle.partitions = $str from local property") + str.toInt + } catch { + case _: NumberFormatException => + logError(s"spark.sql.shuffle.partitions from local property value $str, expect number") + sqlContext.conf.numShufflePartitions + } + }.getOrElse(sqlContext.conf.numShufflePartitions) /** * Given a required distribution, returns a partitioning that satisfies that distribution. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 619c57a347e08..3d75f366d361e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -793,6 +793,7 @@ private[hive] case class MetastoreRelation @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) // TODO: check if this estimate is valid for tables after partition pruning. @@ -801,13 +802,20 @@ private[hive] case class MetastoreRelation // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. - BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // if the size is still less than zero, we use default size - Option(totalSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes))) + + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead + // if the size is still less than zero, we use default size + val sizeEst = Option(totalSize).map(_.toLong).filter(_ > 0) + .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) + .getOrElse(hiveContext.hadoopFileSelector.flatMap( + _.getFilesSizeInBytes( + hiveQlTable.getTableName, + hiveQlTable.getPath.getFileSystem(hiveContext.hiveconf), + hiveQlTable.getPath)).filter(_ > 0) + .getOrElse(sqlContext.conf.defaultSizeInBytes))) + logDebug(s"Size estimation for table ${hiveQlTable.getTableName}: $sizeEst bytes") + BigInt(sizeEst) } ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 80690e71c8803..0d146d1d9a0f6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -440,4 +440,17 @@ abstract class HadoopFileSelector { * to this table. */ def selectFiles(tableName: String, fs: FileSystem, basePath: Path): Option[Seq[Path]] + + /** + * Get size in bytes of files constituting a table from the given base path according to the + * client's custom algorithm. This is only applied to non-partitioned tables. + * @param tableName table name to select files for. This is the exact table name specified + * in the query, not a "preprocessed" file name returned by the user-defined + * function registered via [[HiveContext.setTableNamePreprocessor]]. + * @param fs the filesystem containing the table + * @param basePath base path of the table in the filesystem + * @return the total sum of file size in bytes, or [[None]] if the custom file selection + * algorithm does not apply to this table. + */ + def getFilesSizeInBytes(tableName: String, fs: FileSystem, basePath: Path): Option[Long] }