@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
8484 } else {
8585 // This dataset is partitioned. We need to check whether all partitions have the same
8686 // partition columns and resolve potential type conflicts.
87- val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2) )
87+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
8888
8989 // Creates the StructType which represents the partition columns.
9090 val fields = {
@@ -181,24 +181,41 @@ private[sql] object PartitioningUtils {
181181 * StringType
182182 * }}}
183183 */
184- private [sql] def resolvePartitions (values : Seq [PartitionValues ]): Seq [PartitionValues ] = {
185- // Column names of all partitions must match
186- val distinctPartitionsColNames = values.map(_.columnNames).distinct
187-
188- if (distinctPartitionsColNames.isEmpty) {
184+ private [sql] def resolvePartitions (
185+ pathsWithPartitionValues : Seq [(Path , PartitionValues )]): Seq [PartitionValues ] = {
186+ if (pathsWithPartitionValues.isEmpty) {
189187 Seq .empty
190188 } else {
191- assert(distinctPartitionsColNames.size == 1 , {
192- val list = distinctPartitionsColNames.map(_.mkString(" , " )).zipWithIndex.map {
189+ val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
190+
191+ def listConflictingPartitionColumns : String = {
192+ def groupByKey [K , V ](seq : Seq [(K , V )]): Map [K , Iterable [V ]] =
193+ seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
194+
195+ val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map {
196+ case (path, partValues) => partValues.columnNames -> path
197+ })
198+
199+ val distinctPartColLists = distinctPartColNames.map(_.mkString(" , " )).zipWithIndex.map {
193200 case (names, index) =>
194- s " \t Partition column name list # $index: $names"
201+ s " Partition column name list # $index: $names"
195202 }
196203
197- s " Conflicting partition column names detected: \n ${list.mkString(" \n " )}\n " +
198- " For partitioned table directories, data files should only live in leaf directories."
199- })
204+ // Lists out those non-leaf partition directories that also contain files
205+ val suspiciousPaths =
206+ distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths)
207+
208+ s " Conflicting partition column names detected: \n " +
209+ distinctPartColLists.mkString(" \n\t " , " \n\t " , " \n\n " ) +
210+ " For partitioned table directories, data files should only live in leaf directories. " +
211+ " Please check the following directories for unexpected files:\n " +
212+ suspiciousPaths.mkString(" \n\t " , " \n\t " , " \n " )
213+ }
214+
215+ assert(distinctPartColNames.size == 1 , listConflictingPartitionColumns)
200216
201217 // Resolves possible type conflicts for each column
218+ val values = pathsWithPartitionValues.map(_._2)
202219 val columnCount = values.head.columnNames.size
203220 val resolvedValues = (0 until columnCount).map { i =>
204221 resolveTypeConflicts(values.map(_.literals(i)))
0 commit comments