From e1c566ce6768624947366cbbce83784f0518550e Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sat, 19 May 2018 19:56:02 +0800 Subject: [PATCH 01/14] SPARK-24215: Implement __repr__ and _repr_html_ for dataframes in PySpark --- python/pyspark/sql/dataframe.py | 15 ++- .../scala/org/apache/spark/sql/Dataset.scala | 121 +++++++++--------- .../org/apache/spark/sql/DataFrameSuite.scala | 10 ++ 3 files changed, 85 insertions(+), 61 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 808235ab25440..c4499991e57d0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -78,6 +78,12 @@ def __init__(self, jdf, sql_ctx): self.is_cached = False self._schema = None # initialized lazily self._lazy_rdd = None + self._eager_eval = sql_ctx.getConf( + "spark.jupyter.eagerEval.enabled", "false").lower() == "true" + self._default_console_row = int(sql_ctx.getConf( + "spark.jupyter.default.showRows", u"20")) + self._default_console_truncate = int(sql_ctx.getConf( + "spark.jupyter.default.showRows", u"20")) @property @since(1.3) @@ -347,13 +353,18 @@ def show(self, n=20, truncate=True, vertical=False): name | Bob """ if isinstance(truncate, bool) and truncate: - print(self._jdf.showString(n, 20, vertical)) + print(self._jdf.showString(n, 20, vertical, False)) else: - print(self._jdf.showString(n, int(truncate), vertical)) + print(self._jdf.showString(n, int(truncate), vertical, False)) def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) + def _repr_html_(self): + if self._eager_eval: + return self._jdf.showString( + self._default_console_row, self._default_console_truncate, False, True) + @since(2.1) def checkpoint(self, eager=True): """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index abb5ae53f4d73..dc73a9af815a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -159,7 +159,6 @@ private[sql] object Dataset { * @groupname action Actions * @groupname untypedrel Untyped transformations * @groupname typedrel Typed transformations - * * @since 1.6.0 */ @InterfaceStability.Stable @@ -237,9 +236,13 @@ class Dataset[T] private[sql]( * @param truncate If set to more than 0, truncates strings to `truncate` characters and * all cells will be aligned right. * @param vertical If set to true, prints output rows vertically (one line per column value). + * @param html If set to true, return output as html table. */ private[sql] def showString( - _numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = { + _numRows: Int, + truncate: Int = 20, + vertical: Boolean = false, + html: Boolean = false): String = { val numRows = _numRows.max(0).min(Int.MaxValue - 1) val newDf = toDF() val castCols = newDf.logicalPlan.output.map { col => @@ -292,31 +295,24 @@ class Dataset[T] private[sql]( } // Create SeparateLine - val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() + val sep: String = if (html) { + // Initial append table label + sb.append("\n") + "\n" + } else { + colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() + } // column names - rows.head.zipWithIndex.map { case (cell, i) => - if (truncate > 0) { - StringUtils.leftPad(cell, colWidths(i)) - } else { - StringUtils.rightPad(cell, colWidths(i)) - } - }.addString(sb, "|", "|", "|\n") - + appendRowString(rows.head, truncate, colWidths, html, true, sb) sb.append(sep) // data - rows.tail.foreach { - _.zipWithIndex.map { case (cell, i) => - if (truncate > 0) { - StringUtils.leftPad(cell.toString, colWidths(i)) - } else { - StringUtils.rightPad(cell.toString, colWidths(i)) - } - }.addString(sb, "|", "|", "|\n") + rows.tail.foreach { row => + appendRowString(row.map(_.toString), truncate, colWidths, html, false, sb) } - sb.append(sep) + if (html) sb.append("
\n") } else { // Extended display mode enabled val fieldNames = rows.head @@ -358,6 +354,41 @@ class Dataset[T] private[sql]( sb.toString() } + /** + * Transform current row string and append to builder + * + * @param row Current row of string + * @param truncate If set to more than 0, truncates strings to `truncate` characters and + * all cells will be aligned right. + * @param colWidths The width of each column + * @param html If set to true, return output as html table. + * @param head Set to true while current row is table head. + * @param sb StringBuilder for current row. + */ + private[sql] def appendRowString( + row: Seq[String], + truncate: Int, + colWidths: Array[Int], + html: Boolean, + head: Boolean, + sb: StringBuilder): Unit = { + val data = row.zipWithIndex.map { case (cell, i) => + if (truncate > 0) { + StringUtils.leftPad(cell, colWidths(i)) + } else { + StringUtils.rightPad(cell, colWidths(i)) + } + } + (html, head) match { + case (true, true) => + data.addString(sb, "", "", "") + case (true, false) => + data.addString(sb, "", "", "") + case _ => + data.addString(sb, "|", "|", "|\n") + } + } + override def toString: String = { try { val builder = new StringBuilder @@ -655,7 +686,6 @@ class Dataset[T] private[sql]( * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative. - * * @group streaming * @since 2.1.0 */ @@ -685,7 +715,6 @@ class Dataset[T] private[sql]( * }}} * * @param numRows Number of rows to show - * * @group action * @since 1.6.0 */ @@ -705,7 +734,6 @@ class Dataset[T] private[sql]( * * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right - * * @group action * @since 1.6.0 */ @@ -721,10 +749,10 @@ class Dataset[T] private[sql]( * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} + * * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right - * * @group action * @since 1.6.0 */ @@ -837,7 +865,6 @@ class Dataset[T] private[sql]( * Behaves as an INNER JOIN and requires a subsequent join predicate. * * @param right Right side of the join operation. - * * @group untypedrel * @since 2.0.0 */ @@ -858,11 +885,9 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumn Name of the column to join on. This column must exist on both sides. - * * @note If you perform a self-join using this function without aliasing the input * `DataFrame`s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. - * * @group untypedrel * @since 2.0.0 */ @@ -883,11 +908,9 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. - * * @note If you perform a self-join using this function without aliasing the input * `DataFrame`s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. - * * @group untypedrel * @since 2.0.0 */ @@ -908,11 +931,9 @@ class Dataset[T] private[sql]( * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * `right`, `right_outer`, `left_semi`, `left_anti`. - * * @note If you perform a self-join using this function without aliasing the input * `DataFrame`s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. - * * @group untypedrel * @since 2.0.0 */ @@ -965,7 +986,6 @@ class Dataset[T] private[sql]( * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * `right`, `right_outer`, `left_semi`, `left_anti`. - * * @group untypedrel * @since 2.0.0 */ @@ -1016,9 +1036,7 @@ class Dataset[T] private[sql]( * Explicit cartesian join with another `DataFrame`. * * @param right Right side of the join operation. - * * @note Cartesian joins are very expensive without an extra filter that can be pushed down. - * * @group untypedrel * @since 2.1.0 */ @@ -1044,7 +1062,6 @@ class Dataset[T] private[sql]( * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * `right`, `right_outer`. - * * @group typedrel * @since 1.6.0 */ @@ -1120,7 +1137,6 @@ class Dataset[T] private[sql]( * * @param other Right side of the join. * @param condition Join expression. - * * @group typedrel * @since 1.6.0 */ @@ -1211,7 +1227,6 @@ class Dataset[T] private[sql]( * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. - * * @group untypedrel * @since 2.0.0 */ @@ -1237,7 +1252,6 @@ class Dataset[T] private[sql]( * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. - * * @group untypedrel * @since 2.0.0 */ @@ -1255,6 +1269,7 @@ class Dataset[T] private[sql]( /** * Selects column based on the column name specified as a regex and returns it as [[Column]]. + * * @group untypedrel * @since 2.3.0 */ @@ -1596,6 +1611,7 @@ class Dataset[T] private[sql]( * "age" -> "max" * )) * }}} + * * @group untypedrel * @since 2.0.0 */ @@ -1717,6 +1733,7 @@ class Dataset[T] private[sql]( * "age" -> "max" * )) * }}} + * * @group untypedrel * @since 2.0.0 */ @@ -1909,7 +1926,6 @@ class Dataset[T] private[sql]( * * @note Equality checking is performed directly on the encoded representation of the data * and thus is not affected by a custom `equals` function defined on `T`. - * * @group typedrel * @since 1.6.0 */ @@ -1923,7 +1939,6 @@ class Dataset[T] private[sql]( * * @note Equality checking is performed directly on the encoded representation of the data * and thus is not affected by a custom `equals` function defined on `T`. - * * @group typedrel * @since 2.0.0 */ @@ -1937,10 +1952,8 @@ class Dataset[T] private[sql]( * * @param fraction Fraction of rows to generate, range [0.0, 1.0]. * @param seed Seed for sampling. - * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[Dataset]]. - * * @group typedrel * @since 2.3.0 */ @@ -1953,10 +1966,8 @@ class Dataset[T] private[sql]( * using a random seed. * * @param fraction Fraction of rows to generate, range [0.0, 1.0]. - * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[Dataset]]. - * * @group typedrel * @since 2.3.0 */ @@ -1970,10 +1981,8 @@ class Dataset[T] private[sql]( * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate, range [0.0, 1.0]. * @param seed Seed for sampling. - * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[Dataset]]. - * * @group typedrel * @since 1.6.0 */ @@ -1988,10 +1997,8 @@ class Dataset[T] private[sql]( * * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate, range [0.0, 1.0]. - * * @note This is NOT guaranteed to provide exactly the fraction of the total count * of the given [[Dataset]]. - * * @group typedrel * @since 1.6.0 */ @@ -2006,7 +2013,6 @@ class Dataset[T] private[sql]( * @param seed Seed for sampling. * * For Java API, use [[randomSplitAsList]]. - * * @group typedrel * @since 2.0.0 */ @@ -2044,7 +2050,6 @@ class Dataset[T] private[sql]( * * @param weights weights for splits, will be normalized if they don't sum to 1. * @param seed Seed for sampling. - * * @group typedrel * @since 2.0.0 */ @@ -2414,7 +2419,6 @@ class Dataset[T] private[sql]( * Use [[summary]] for expanded statistics and control over which statistics to compute. * * @param cols Columns to compute statistics on. - * * @group action * @since 1.6.0 */ @@ -2477,7 +2481,6 @@ class Dataset[T] private[sql]( * See also [[describe]] for basic statistics. * * @param statistics Statistics from above list to be computed. - * * @group action * @since 2.3.0 */ @@ -2489,7 +2492,6 @@ class Dataset[T] private[sql]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * * @group action * @since 1.6.0 */ @@ -2497,6 +2499,7 @@ class Dataset[T] private[sql]( /** * Returns the first row. + * * @group action * @since 1.6.0 */ @@ -2504,6 +2507,7 @@ class Dataset[T] private[sql]( /** * Returns the first row. Alias for head(). + * * @group action * @since 1.6.0 */ @@ -2755,7 +2759,6 @@ class Dataset[T] private[sql]( * @note this results in multiple Spark jobs, and if the input Dataset is the result * of a wide transformation (e.g. join with different partitioners), to avoid * recomputing the input Dataset should be cached first. - * * @group action * @since 2.0.0 */ @@ -2775,6 +2778,7 @@ class Dataset[T] private[sql]( /** * Returns the number of rows in the Dataset. + * * @group action * @since 1.6.0 */ @@ -2898,7 +2902,6 @@ class Dataset[T] private[sql]( * * @note Equality checking is performed directly on the encoded representation of the data * and thus is not affected by a custom `equals` function defined on `T`. - * * @group typedrel * @since 2.0.0 */ @@ -2925,10 +2928,10 @@ class Dataset[T] private[sql]( /** * Persist this Dataset with the given storage level. + * * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, * `MEMORY_AND_DISK_2`, etc. - * * @group basic * @since 1.6.0 */ @@ -2953,7 +2956,6 @@ class Dataset[T] private[sql]( * Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. - * * @group basic * @since 1.6.0 */ @@ -2991,6 +2993,7 @@ class Dataset[T] private[sql]( /** * Returns the content of the Dataset as a `JavaRDD` of `T`s. + * * @group basic * @since 1.6.0 */ @@ -2998,6 +3001,7 @@ class Dataset[T] private[sql]( /** * Returns the content of the Dataset as a `JavaRDD` of `T`s. + * * @group basic * @since 1.6.0 */ @@ -3024,7 +3028,6 @@ class Dataset[T] private[sql]( * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. * * @throws AnalysisException if the view name is invalid or already exists - * * @group basic * @since 2.0.0 */ @@ -3056,7 +3059,6 @@ class Dataset[T] private[sql]( * view, e.g. `SELECT * FROM global_temp.view1`. * * @throws AnalysisException if the view name is invalid or already exists - * * @group basic * @since 2.1.0 */ @@ -3136,6 +3138,7 @@ class Dataset[T] private[sql]( /** * Returns the content of the Dataset as a Dataset of JSON strings. + * * @since 2.0.0 */ def toJSON: Dataset[String] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 1cc8cb3874c9b..3c178cb740f23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1245,6 +1245,16 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.showString(10, vertical = true) === expectedAnswer) } + test("SPARK-24215 showString: html = true") { + val expectedAnswer = """ + | + | + |
keyvalue
1 1
+ |only showing top 1 row + |""".stripMargin + assert(testData.select($"*").showString(1, html = true) === expectedAnswer) + } + test("SPARK-7319 showString") { val expectedAnswer = """+---+-----+ ||key|value| From 435320235eeb51f44042e7eb5866e72e150660c0 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 21 May 2018 10:18:22 +0800 Subject: [PATCH 02/14] address comments --- docs/configuration.md | 23 ++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 59 +++++++++++++++---- .../org/apache/spark/sql/DataFrameSuite.scala | 10 ++++ 3 files changed, 79 insertions(+), 13 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 64af0e98a82f5..623a3737f56db 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -456,6 +456,29 @@ Apart from these, the following properties are also available, and may be useful from JVM to Python worker for every task. + + spark.jupyter.eagerEval.enabled + false + + Open eager evaluation on jupyter or not. If yes, dataframe will be ran automatically + and html table will feedback the queries user have defined (see + SPARK-24215 for more details). + + + + spark.jupyter.default.showRows + 20 + + Default number of rows in jupyter html table. + + + + spark.jupyter.default.truncate + 20 + + Default number of truncate in jupyter html table. + + spark.files diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index dc73a9af815a1..4772baacaae9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -25,6 +25,7 @@ import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal +import org.apache.commons.lang.StringEscapeUtils import org.apache.commons.lang3.StringUtils import org.apache.spark.TaskContext @@ -159,6 +160,7 @@ private[sql] object Dataset { * @groupname action Actions * @groupname untypedrel Untyped transformations * @groupname typedrel Typed transformations + * * @since 1.6.0 */ @InterfaceStability.Stable @@ -311,6 +313,7 @@ class Dataset[T] private[sql]( rows.tail.foreach { row => appendRowString(row.map(_.toString), truncate, colWidths, html, false, sb) } + sb.append(sep) if (html) sb.append("\n") } else { @@ -381,9 +384,11 @@ class Dataset[T] private[sql]( } (html, head) match { case (true, true) => - data.addString(sb, "", "", "") + data.map(StringEscapeUtils.escapeHtml).addString( + sb, "", "", "") case (true, false) => - data.addString(sb, "", "", "") + data.map(StringEscapeUtils.escapeHtml).addString( + sb, "", "", "") case _ => data.addString(sb, "|", "|", "|\n") } @@ -686,6 +691,7 @@ class Dataset[T] private[sql]( * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative. + * * @group streaming * @since 2.1.0 */ @@ -715,6 +721,7 @@ class Dataset[T] private[sql]( * }}} * * @param numRows Number of rows to show + * * @group action * @since 1.6.0 */ @@ -734,6 +741,7 @@ class Dataset[T] private[sql]( * * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right + * * @group action * @since 1.6.0 */ @@ -749,10 +757,10 @@ class Dataset[T] private[sql]( * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} - * * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right + * * @group action * @since 1.6.0 */ @@ -865,6 +873,7 @@ class Dataset[T] private[sql]( * Behaves as an INNER JOIN and requires a subsequent join predicate. * * @param right Right side of the join operation. + * * @group untypedrel * @since 2.0.0 */ @@ -885,9 +894,11 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumn Name of the column to join on. This column must exist on both sides. + * * @note If you perform a self-join using this function without aliasing the input * `DataFrame`s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. + * * @group untypedrel * @since 2.0.0 */ @@ -908,9 +919,11 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * * @note If you perform a self-join using this function without aliasing the input * `DataFrame`s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. + * * @group untypedrel * @since 2.0.0 */ @@ -931,9 +944,11 @@ class Dataset[T] private[sql]( * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * `right`, `right_outer`, `left_semi`, `left_anti`. + * * @note If you perform a self-join using this function without aliasing the input * `DataFrame`s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. + * * @group untypedrel * @since 2.0.0 */ @@ -986,6 +1001,7 @@ class Dataset[T] private[sql]( * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * `right`, `right_outer`, `left_semi`, `left_anti`. + * * @group untypedrel * @since 2.0.0 */ @@ -1036,7 +1052,9 @@ class Dataset[T] private[sql]( * Explicit cartesian join with another `DataFrame`. * * @param right Right side of the join operation. + * * @note Cartesian joins are very expensive without an extra filter that can be pushed down. + * * @group untypedrel * @since 2.1.0 */ @@ -1062,6 +1080,7 @@ class Dataset[T] private[sql]( * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * `right`, `right_outer`. + * * @group typedrel * @since 1.6.0 */ @@ -1137,6 +1156,7 @@ class Dataset[T] private[sql]( * * @param other Right side of the join. * @param condition Join expression. + * * @group typedrel * @since 1.6.0 */ @@ -1227,6 +1247,7 @@ class Dataset[T] private[sql]( * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. + * * @group untypedrel * @since 2.0.0 */ @@ -1252,6 +1273,7 @@ class Dataset[T] private[sql]( * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. + * * @group untypedrel * @since 2.0.0 */ @@ -1269,7 +1291,6 @@ class Dataset[T] private[sql]( /** * Selects column based on the column name specified as a regex and returns it as [[Column]]. - * * @group untypedrel * @since 2.3.0 */ @@ -1611,7 +1632,6 @@ class Dataset[T] private[sql]( * "age" -> "max" * )) * }}} - * * @group untypedrel * @since 2.0.0 */ @@ -1733,7 +1753,6 @@ class Dataset[T] private[sql]( * "age" -> "max" * )) * }}} - * * @group untypedrel * @since 2.0.0 */ @@ -1926,6 +1945,7 @@ class Dataset[T] private[sql]( * * @note Equality checking is performed directly on the encoded representation of the data * and thus is not affected by a custom `equals` function defined on `T`. + * * @group typedrel * @since 1.6.0 */ @@ -1939,6 +1959,7 @@ class Dataset[T] private[sql]( * * @note Equality checking is performed directly on the encoded representation of the data * and thus is not affected by a custom `equals` function defined on `T`. + * * @group typedrel * @since 2.0.0 */ @@ -1952,8 +1973,10 @@ class Dataset[T] private[sql]( * * @param fraction Fraction of rows to generate, range [0.0, 1.0]. * @param seed Seed for sampling. + * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[Dataset]]. + * * @group typedrel * @since 2.3.0 */ @@ -1966,8 +1989,10 @@ class Dataset[T] private[sql]( * using a random seed. * * @param fraction Fraction of rows to generate, range [0.0, 1.0]. + * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[Dataset]]. + * * @group typedrel * @since 2.3.0 */ @@ -1981,8 +2006,10 @@ class Dataset[T] private[sql]( * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate, range [0.0, 1.0]. * @param seed Seed for sampling. + * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[Dataset]]. + * * @group typedrel * @since 1.6.0 */ @@ -1997,8 +2024,10 @@ class Dataset[T] private[sql]( * * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate, range [0.0, 1.0]. + * * @note This is NOT guaranteed to provide exactly the fraction of the total count * of the given [[Dataset]]. + * * @group typedrel * @since 1.6.0 */ @@ -2013,6 +2042,7 @@ class Dataset[T] private[sql]( * @param seed Seed for sampling. * * For Java API, use [[randomSplitAsList]]. + * * @group typedrel * @since 2.0.0 */ @@ -2050,6 +2080,7 @@ class Dataset[T] private[sql]( * * @param weights weights for splits, will be normalized if they don't sum to 1. * @param seed Seed for sampling. + * * @group typedrel * @since 2.0.0 */ @@ -2419,6 +2450,7 @@ class Dataset[T] private[sql]( * Use [[summary]] for expanded statistics and control over which statistics to compute. * * @param cols Columns to compute statistics on. + * * @group action * @since 1.6.0 */ @@ -2481,6 +2513,7 @@ class Dataset[T] private[sql]( * See also [[describe]] for basic statistics. * * @param statistics Statistics from above list to be computed. + * * @group action * @since 2.3.0 */ @@ -2492,6 +2525,7 @@ class Dataset[T] private[sql]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. + * * @group action * @since 1.6.0 */ @@ -2499,7 +2533,6 @@ class Dataset[T] private[sql]( /** * Returns the first row. - * * @group action * @since 1.6.0 */ @@ -2507,7 +2540,6 @@ class Dataset[T] private[sql]( /** * Returns the first row. Alias for head(). - * * @group action * @since 1.6.0 */ @@ -2759,6 +2791,7 @@ class Dataset[T] private[sql]( * @note this results in multiple Spark jobs, and if the input Dataset is the result * of a wide transformation (e.g. join with different partitioners), to avoid * recomputing the input Dataset should be cached first. + * * @group action * @since 2.0.0 */ @@ -2778,7 +2811,6 @@ class Dataset[T] private[sql]( /** * Returns the number of rows in the Dataset. - * * @group action * @since 1.6.0 */ @@ -2902,6 +2934,7 @@ class Dataset[T] private[sql]( * * @note Equality checking is performed directly on the encoded representation of the data * and thus is not affected by a custom `equals` function defined on `T`. + * * @group typedrel * @since 2.0.0 */ @@ -2928,10 +2961,10 @@ class Dataset[T] private[sql]( /** * Persist this Dataset with the given storage level. - * * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, * `MEMORY_AND_DISK_2`, etc. + * * @group basic * @since 1.6.0 */ @@ -2956,6 +2989,7 @@ class Dataset[T] private[sql]( * Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. + * * @group basic * @since 1.6.0 */ @@ -2993,7 +3027,6 @@ class Dataset[T] private[sql]( /** * Returns the content of the Dataset as a `JavaRDD` of `T`s. - * * @group basic * @since 1.6.0 */ @@ -3001,7 +3034,6 @@ class Dataset[T] private[sql]( /** * Returns the content of the Dataset as a `JavaRDD` of `T`s. - * * @group basic * @since 1.6.0 */ @@ -3028,6 +3060,7 @@ class Dataset[T] private[sql]( * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. * * @throws AnalysisException if the view name is invalid or already exists + * * @group basic * @since 2.0.0 */ @@ -3059,6 +3092,7 @@ class Dataset[T] private[sql]( * view, e.g. `SELECT * FROM global_temp.view1`. * * @throws AnalysisException if the view name is invalid or already exists + * * @group basic * @since 2.1.0 */ @@ -3138,7 +3172,6 @@ class Dataset[T] private[sql]( /** * Returns the content of the Dataset as a Dataset of JSON strings. - * * @since 2.0.0 */ def toJSON: Dataset[String] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3c178cb740f23..beaa6e5ca88d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1246,13 +1246,23 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-24215 showString: html = true") { + // test for normal html val expectedAnswer = """ | | |
keyvalue
1 1
|only showing top 1 row |""".stripMargin + // test for html escape + val escapeExpectedAnswer = """ + | + | + |
key(value > 1)
1 false
+ |only showing top 1 row + |""".stripMargin assert(testData.select($"*").showString(1, html = true) === expectedAnswer) + assert(testData.select(testData("key"), testData("value") > 1).showString( + 1, html = true) === escapeExpectedAnswer) } test("SPARK-7319 showString") { From 4e366e5a5ace2dfa07af15b88df9021227719bf4 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 21 May 2018 20:58:43 +0800 Subject: [PATCH 03/14] address comments --- docs/configuration.md | 10 +++++----- python/pyspark/sql/dataframe.py | 18 ++++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 623a3737f56db..d7b4728fc8669 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -460,23 +460,23 @@ Apart from these, the following properties are also available, and may be useful spark.jupyter.eagerEval.enabled false - Open eager evaluation on jupyter or not. If yes, dataframe will be ran automatically + Enable eager evaluation on Jupyter or not. If true, dataframe will be ran automatically and html table will feedback the queries user have defined (see SPARK-24215 for more details). - spark.jupyter.default.showRows + spark.jupyter.eagerEval.showRows 20 - Default number of rows in jupyter html table. + Default number of rows in jupyter HTML table. - spark.jupyter.default.truncate + spark.jupyter.eagerEval.truncate 20 - Default number of truncate in jupyter html table. + Default number of truncate in jupyter HTML table. diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c4499991e57d0..e5e2ae20808de 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -78,12 +78,6 @@ def __init__(self, jdf, sql_ctx): self.is_cached = False self._schema = None # initialized lazily self._lazy_rdd = None - self._eager_eval = sql_ctx.getConf( - "spark.jupyter.eagerEval.enabled", "false").lower() == "true" - self._default_console_row = int(sql_ctx.getConf( - "spark.jupyter.default.showRows", u"20")) - self._default_console_truncate = int(sql_ctx.getConf( - "spark.jupyter.default.showRows", u"20")) @property @since(1.3) @@ -361,9 +355,17 @@ def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def _repr_html_(self): - if self._eager_eval: + eager_eval = self.sql_ctx.getConf( + "spark.jupyter.eagerEval.enabled", "false").lower() == "true" + console_row = int(self.sql_ctx.getConf( + "spark.jupyter.eagerEval.showRows", u"20")) + console_truncate = int(self.sql_ctx.getConf( + "spark.jupyter.eagerEval.truncate", u"20")) + if eager_eval: return self._jdf.showString( - self._default_console_row, self._default_console_truncate, False, True) + console_row, console_truncate, False, True) + else: + return None @since(2.1) def checkpoint(self, eager=True): From fc858af3d865fa2cd44b03c95cdd22ea7d4c3b92 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 23 May 2018 15:55:43 +0800 Subject: [PATCH 04/14] Address comments, add ut in pyspark and change config name --- docs/configuration.md | 16 +++---- python/pyspark/sql/dataframe.py | 10 +++-- python/pyspark/sql/tests.py | 44 +++++++++++++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 8 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 12 +++-- 5 files changed, 71 insertions(+), 19 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index d7b4728fc8669..c8115898f9f67 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -457,26 +457,26 @@ Apart from these, the following properties are also available, and may be useful - spark.jupyter.eagerEval.enabled + spark.sql.repl.eagerEval.enabled false - Enable eager evaluation on Jupyter or not. If true, dataframe will be ran automatically - and html table will feedback the queries user have defined (see - SPARK-24215 for more details). + Enable eager evaluation or not. If true and repl you're using supports eager evaluation, + dataframe will be ran automatically and html table will feedback the queries user have defined + (see SPARK-24215 for more details). - spark.jupyter.eagerEval.showRows + spark.sql.repl.eagerEval.showRows 20 - Default number of rows in jupyter HTML table. + Default number of rows in HTML table. - spark.jupyter.eagerEval.truncate + spark.sql.repl.eagerEval.truncate 20 - Default number of truncate in jupyter HTML table. + Default number of truncate in HTML table. diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e5e2ae20808de..1aa5a043c222d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -355,12 +355,16 @@ def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def _repr_html_(self): + """Returns a dataframe with html code when you enabled eager evaluation + by 'spark.sql.repl.eagerEval.enabled', this only called by repr you're + using support eager evaluation with HTML. + """ eager_eval = self.sql_ctx.getConf( - "spark.jupyter.eagerEval.enabled", "false").lower() == "true" + "spark.sql.repl.eagerEval.enabled", "false").lower() == "true" console_row = int(self.sql_ctx.getConf( - "spark.jupyter.eagerEval.showRows", u"20")) + "spark.sql.repl.eagerEval.showRows", u"20")) console_truncate = int(self.sql_ctx.getConf( - "spark.jupyter.eagerEval.truncate", u"20")) + "spark.sql.repl.eagerEval.truncate", u"20")) if eager_eval: return self._jdf.showString( console_row, console_truncate, False, True) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ea2dd7605dc57..adfffa07662f0 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3074,6 +3074,50 @@ def test_checking_csv_header(self): finally: shutil.rmtree(path) + def _get_content(self, content): + """ + Strips leading spaces from content up to the first '|' in each line. + """ + import re + pattern = re.compile(r'^ *\|', re.MULTILINE) + return re.sub(pattern, '', content) + + def test_repr_html(self): + df = self.spark.createDataFrame([(1, "1"), (22222, "22222")], ("key", "value")) + self.assertEquals(None, df._repr_html_()) + self.spark.conf.set("spark.sql.repl.eagerEval.enabled", "true") + expected1 = """ + | + | + | + | + | + | + |
keyvalue
1 1
2222222222
+ |""" + self.assertEquals(self._get_content(expected1), df._repr_html_()) + self.spark.conf.set("spark.sql.repl.eagerEval.truncate", 3) + expected2 = """ + | + | + | + | + | + | + |
keyvalue
1 1
222 222
+ |""" + self.assertEquals(self._get_content(expected2), df._repr_html_()) + self.spark.conf.set("spark.sql.repl.eagerEval.showRows", 1) + expected3 = """ + | + | + | + | + |
keyvalue
1 1
+ |only showing top 1 row + |""" + self.assertEquals(self._get_content(expected3), df._repr_html_()) + class HiveSparkSubmitTests(SparkSubmitTests): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 4772baacaae9d..a66819a8b972b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -300,7 +300,7 @@ class Dataset[T] private[sql]( val sep: String = if (html) { // Initial append table label sb.append("\n") - "\n" + "" } else { colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() } @@ -311,7 +311,7 @@ class Dataset[T] private[sql]( // data rows.tail.foreach { row => - appendRowString(row.map(_.toString), truncate, colWidths, html, false, sb) + appendRowString(row, truncate, colWidths, html, false, sb) } sb.append(sep) @@ -385,10 +385,10 @@ class Dataset[T] private[sql]( (html, head) match { case (true, true) => data.map(StringEscapeUtils.escapeHtml).addString( - sb, "") + sb, "\n\n") case (true, false) => data.map(StringEscapeUtils.escapeHtml).addString( - sb, "") + sb, "\n\n") case _ => data.addString(sb, "|", "|", "|\n") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index beaa6e5ca88d3..99a7e7ab8b983 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1248,15 +1248,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("SPARK-24215 showString: html = true") { // test for normal html val expectedAnswer = """
", "", "
", "", "
", "", "
", "", "
- | - | + | + | + | + | |
keyvalue
1 1
keyvalue
1 1
|only showing top 1 row |""".stripMargin // test for html escape val escapeExpectedAnswer = """ - | - | + | + | + | + | |
key(value > 1)
1 false
key(value > 1)
1 false
|only showing top 1 row |""".stripMargin From d03697c957a22ab58757ca22692e68311d8ddc32 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 24 May 2018 16:51:34 +0800 Subject: [PATCH 05/14] Add repl eager evaluation for __repr__ --- python/pyspark/sql/dataframe.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 1aa5a043c222d..bb62d928ce325 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -78,6 +78,7 @@ def __init__(self, jdf, sql_ctx): self.is_cached = False self._schema = None # initialized lazily self._lazy_rdd = None + self._support_repr_html = False @property @since(1.3) @@ -351,13 +352,9 @@ def show(self, n=20, truncate=True, vertical=False): else: print(self._jdf.showString(n, int(truncate), vertical, False)) - def __repr__(self): - return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) - - def _repr_html_(self): - """Returns a dataframe with html code when you enabled eager evaluation - by 'spark.sql.repl.eagerEval.enabled', this only called by repr you're - using support eager evaluation with HTML. + def _get_repl_config(self): + """Return the configs for eager evaluation each time when __repr__ or + _repr_html_ called by user or notebook. """ eager_eval = self.sql_ctx.getConf( "spark.sql.repl.eagerEval.enabled", "false").lower() == "true" @@ -365,6 +362,24 @@ def _repr_html_(self): "spark.sql.repl.eagerEval.showRows", u"20")) console_truncate = int(self.sql_ctx.getConf( "spark.sql.repl.eagerEval.truncate", u"20")) + return (eager_eval, console_row, console_truncate) + + def __repr__(self): + (eager_eval, console_row, console_truncate) = self._get_repl_config() + if not self._support_repr_html and eager_eval: + return self._jdf.showString( + console_row, console_truncate, False, False) + else: + return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) + + def _repr_html_(self): + """Returns a dataframe with html code when you enabled eager evaluation + by 'spark.sql.repl.eagerEval.enabled', this only called by repr you're + using support eager evaluation with HTML. + """ + if not self._support_repr_html: + self._support_repr_html = True + (eager_eval, console_row, console_truncate) = self._get_repl_config() if eager_eval: return self._jdf.showString( console_row, console_truncate, False, True) From cca07c2ff6a4488b329604e2078f813f43782a5a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 24 May 2018 16:53:08 +0800 Subject: [PATCH 06/14] The refactor for Dataset.scala --- .../scala/org/apache/spark/sql/Dataset.scala | 121 ++++++++---------- .../org/apache/spark/sql/DataFrameSuite.scala | 24 ---- 2 files changed, 51 insertions(+), 94 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a66819a8b972b..0880932891ea4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -25,7 +25,6 @@ import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import org.apache.commons.lang.StringEscapeUtils import org.apache.commons.lang3.StringUtils import org.apache.spark.TaskContext @@ -217,6 +216,9 @@ class Dataset[T] private[sql]( // sqlContext must be val because a stable identifier is expected when you import implicits @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext + // We set a minimum column width at '3' in showString + val minimumColWidth = 3 + private[sql] def resolve(colName: String): NamedExpression = { queryExecution.analyzed.resolveQuoted(colName, sparkSession.sessionState.analyzer.resolver) .getOrElse { @@ -232,20 +234,17 @@ class Dataset[T] private[sql]( } /** - * Compose the string representing rows for output + * Get rows represented in Sequence by specific truncate and vertical requirment. * - * @param _numRows Number of rows to show + * @param numRows Number of rows to return * @param truncate If set to more than 0, truncates strings to `truncate` characters and * all cells will be aligned right. - * @param vertical If set to true, prints output rows vertically (one line per column value). - * @param html If set to true, return output as html table. + * @param vertical If set to true, the rows to return don't need truncate. */ - private[sql] def showString( - _numRows: Int, - truncate: Int = 20, - vertical: Boolean = false, - html: Boolean = false): String = { - val numRows = _numRows.max(0).min(Int.MaxValue - 1) + private[sql] def getRows( + numRows: Int, + truncate: Int, + vertical: Boolean): (Seq[Seq[String]], Boolean) = { val newDf = toDF() val castCols = newDf.logicalPlan.output.map { col => // Since binary types in top-level schema fields have a specific format to print, @@ -263,7 +262,7 @@ class Dataset[T] private[sql]( // For array values, replace Seq and Array with square brackets // For cells that are beyond `truncate` characters, replace it with the // first `truncate-3` and "..." - val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row => + var rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row => row.toSeq.map { cell => val str = cell match { case null => "null" @@ -280,12 +279,8 @@ class Dataset[T] private[sql]( }: Seq[String] } - val sb = new StringBuilder - val numCols = schema.fieldNames.length - // We set a minimum column width at '3' - val minimumColWidth = 3 - if (!vertical) { + val numCols = schema.fieldNames.length // Initialise the width of each column to a minimum value val colWidths = Array.fill(numCols)(minimumColWidth) @@ -296,31 +291,54 @@ class Dataset[T] private[sql]( } } - // Create SeparateLine - val sep: String = if (html) { - // Initial append table label - sb.append("\n") - "" - } else { - colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() + rows = rows.map { + _.zipWithIndex.map { case (cell, i) => + if (truncate > 0) { + StringUtils.leftPad(cell, colWidths(i)) + } else { + StringUtils.rightPad(cell, colWidths(i)) + } + } } + } + (rows, hasMoreData) + } + + /** + * Compose the string representing rows for output + * + * @param _numRows Number of rows to show + * @param truncate If set to more than 0, truncates strings to `truncate` characters and + * all cells will be aligned right. + * @param vertical If set to true, prints output rows vertically (one line per column value). + */ + private[sql] def showString( + _numRows: Int, + truncate: Int = 20, + vertical: Boolean = false): String = { + val numRows = _numRows.max(0).min(Int.MaxValue - 1) + + val sb = new StringBuilder + val (rows, hasMoreData) = getRows(numRows, truncate, vertical) + val fieldNames = rows.head + val dataRows = rows.tail + + if (!vertical) { + // Create SeparateLine + val sep: String = fieldNames.map(_.length).toArray + .map("-" * _).addString(sb, "+", "+", "+\n").toString() // column names - appendRowString(rows.head, truncate, colWidths, html, true, sb) + fieldNames.addString(sb, "|", "|", "|\n") sb.append(sep) // data - rows.tail.foreach { row => - appendRowString(row, truncate, colWidths, html, false, sb) + dataRows.foreach { + _.addString(sb, "|", "|", "|\n") } - sb.append(sep) - if (html) sb.append("
\n") } else { // Extended display mode enabled - val fieldNames = rows.head - val dataRows = rows.tail - // Compute the width of field name and data columns val fieldNameColWidth = fieldNames.foldLeft(minimumColWidth) { case (curMax, fieldName) => math.max(curMax, fieldName.length) @@ -345,7 +363,7 @@ class Dataset[T] private[sql]( } // Print a footer - if (vertical && data.isEmpty) { + if (vertical && dataRows.isEmpty) { // In a vertical mode, print an empty row set explicitly sb.append("(0 rows)\n") } else if (hasMoreData) { @@ -357,43 +375,6 @@ class Dataset[T] private[sql]( sb.toString() } - /** - * Transform current row string and append to builder - * - * @param row Current row of string - * @param truncate If set to more than 0, truncates strings to `truncate` characters and - * all cells will be aligned right. - * @param colWidths The width of each column - * @param html If set to true, return output as html table. - * @param head Set to true while current row is table head. - * @param sb StringBuilder for current row. - */ - private[sql] def appendRowString( - row: Seq[String], - truncate: Int, - colWidths: Array[Int], - html: Boolean, - head: Boolean, - sb: StringBuilder): Unit = { - val data = row.zipWithIndex.map { case (cell, i) => - if (truncate > 0) { - StringUtils.leftPad(cell, colWidths(i)) - } else { - StringUtils.rightPad(cell, colWidths(i)) - } - } - (html, head) match { - case (true, true) => - data.map(StringEscapeUtils.escapeHtml).addString( - sb, "", "\n", "\n") - case (true, false) => - data.map(StringEscapeUtils.escapeHtml).addString( - sb, "", "\n", "\n") - case _ => - data.addString(sb, "|", "|", "|\n") - } - } - override def toString: String = { try { val builder = new StringBuilder diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 99a7e7ab8b983..1cc8cb3874c9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1245,30 +1245,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.showString(10, vertical = true) === expectedAnswer) } - test("SPARK-24215 showString: html = true") { - // test for normal html - val expectedAnswer = """ - | - | - | - | - |
keyvalue
1 1
- |only showing top 1 row - |""".stripMargin - // test for html escape - val escapeExpectedAnswer = """ - | - | - | - | - |
key(value > 1)
1 false
- |only showing top 1 row - |""".stripMargin - assert(testData.select($"*").showString(1, html = true) === expectedAnswer) - assert(testData.select(testData("key"), testData("value") > 1).showString( - 1, html = true) === escapeExpectedAnswer) - } - test("SPARK-7319 showString") { val expectedAnswer = """+---+-----+ ||key|value| From 3649fa0ff80e56c8bfc3328b0272aca41ce98491 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 27 May 2018 23:17:27 +0800 Subject: [PATCH 07/14] refactor for showString --- docs/configuration.md | 2 +- python/pyspark/sql/dataframe.py | 37 +++++++++++++---- python/pyspark/sql/tests.py | 40 ++++++------------- .../scala/org/apache/spark/sql/Dataset.scala | 33 ++++++++++----- 4 files changed, 67 insertions(+), 45 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index c8115898f9f67..75b655a22d710 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -466,7 +466,7 @@ Apart from these, the following properties are also available, and may be useful - spark.sql.repl.eagerEval.showRows + spark.sql.repl.eagerEval.maxNumRows 20 Default number of rows in HTML table. diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bb62d928ce325..dc6cfea1d8e75 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -348,9 +348,9 @@ def show(self, n=20, truncate=True, vertical=False): name | Bob """ if isinstance(truncate, bool) and truncate: - print(self._jdf.showString(n, 20, vertical, False)) + print(self._jdf.showString(n, 20, vertical)) else: - print(self._jdf.showString(n, int(truncate), vertical, False)) + print(self._jdf.showString(n, int(truncate), vertical)) def _get_repl_config(self): """Return the configs for eager evaluation each time when __repr__ or @@ -359,7 +359,7 @@ def _get_repl_config(self): eager_eval = self.sql_ctx.getConf( "spark.sql.repl.eagerEval.enabled", "false").lower() == "true" console_row = int(self.sql_ctx.getConf( - "spark.sql.repl.eagerEval.showRows", u"20")) + "spark.sql.repl.eagerEval.maxNumRows", u"20")) console_truncate = int(self.sql_ctx.getConf( "spark.sql.repl.eagerEval.truncate", u"20")) return (eager_eval, console_row, console_truncate) @@ -367,22 +367,45 @@ def _get_repl_config(self): def __repr__(self): (eager_eval, console_row, console_truncate) = self._get_repl_config() if not self._support_repr_html and eager_eval: + vertical = False return self._jdf.showString( - console_row, console_truncate, False, False) + console_row, console_truncate, vertical) else: return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def _repr_html_(self): """Returns a dataframe with html code when you enabled eager evaluation - by 'spark.sql.repl.eagerEval.enabled', this only called by repr you're + by 'spark.sql.repl.eagerEval.enabled', this only called by REPL you're using support eager evaluation with HTML. """ + import cgi if not self._support_repr_html: self._support_repr_html = True (eager_eval, console_row, console_truncate) = self._get_repl_config() if eager_eval: - return self._jdf.showString( - console_row, console_truncate, False, True) + with SCCallSiteSync(self._sc) as css: + vertical = False + sock_info = self._jdf.getRowsToPython( + console_row, console_truncate, vertical) + rows = list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) + head = rows[0] + row_data = rows[1:] + has_more_data = len(row_data) > console_row + row_data = row_data[0:console_row] + + html = "\n\n" + # generate table rows + for row in row_data: + data = "\n" + html += data + html += "
" + # generate table head + html += "".join(map(lambda x: cgi.escape(x), head)) + "
" + "".join(map(lambda x: cgi.escape(x), row)) + \ + "
\n" + if has_more_data: + html += "only showing top %d %s\n" % (console_row, + "row" if console_row == 1 else "rows") + return html else: return None diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index adfffa07662f0..436564841ca2c 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3074,49 +3074,35 @@ def test_checking_csv_header(self): finally: shutil.rmtree(path) - def _get_content(self, content): - """ - Strips leading spaces from content up to the first '|' in each line. - """ + def test_repr_html(self): import re pattern = re.compile(r'^ *\|', re.MULTILINE) - return re.sub(pattern, '', content) - - def test_repr_html(self): df = self.spark.createDataFrame([(1, "1"), (22222, "22222")], ("key", "value")) self.assertEquals(None, df._repr_html_()) self.spark.conf.set("spark.sql.repl.eagerEval.enabled", "true") expected1 = """ - | - | - | - | - | - | + | + | + | |
keyvalue
1 1
2222222222
keyvalue
1 1
2222222222
|""" - self.assertEquals(self._get_content(expected1), df._repr_html_()) + self.assertEquals(re.sub(pattern, '', expected1), df._repr_html_()) self.spark.conf.set("spark.sql.repl.eagerEval.truncate", 3) expected2 = """ - | - | - | - | - | - | + | + | + | |
keyvalue
1 1
222 222
keyvalue
1 1
222 222
|""" - self.assertEquals(self._get_content(expected2), df._repr_html_()) - self.spark.conf.set("spark.sql.repl.eagerEval.showRows", 1) + self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_()) + self.spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 1) expected3 = """ - | - | - | - | + | + | |
keyvalue
1 1
keyvalue
1 1
|only showing top 1 row |""" - self.assertEquals(self._get_content(expected3), df._repr_html_()) + self.assertEquals(re.sub(pattern, '', expected3), df._repr_html_()) class HiveSparkSubmitTests(SparkSubmitTests): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0880932891ea4..56c5817f5e19e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -234,7 +234,7 @@ class Dataset[T] private[sql]( } /** - * Get rows represented in Sequence by specific truncate and vertical requirment. + * Get rows represented in Sequence by specific truncate and vertical requirement. * * @param numRows Number of rows to return * @param truncate If set to more than 0, truncates strings to `truncate` characters and @@ -244,7 +244,7 @@ class Dataset[T] private[sql]( private[sql] def getRows( numRows: Int, truncate: Int, - vertical: Boolean): (Seq[Seq[String]], Boolean) = { + vertical: Boolean): Seq[Seq[String]] = { val newDf = toDF() val castCols = newDf.logicalPlan.output.map { col => // Since binary types in top-level schema fields have a specific format to print, @@ -255,9 +255,7 @@ class Dataset[T] private[sql]( Column(col).cast(StringType) } } - val takeResult = newDf.select(castCols: _*).take(numRows + 1) - val hasMoreData = takeResult.length > numRows - val data = takeResult.take(numRows) + val data = newDf.select(castCols: _*).take(numRows + 1) // For array values, replace Seq and Array with square brackets // For cells that are beyond `truncate` characters, replace it with the @@ -301,7 +299,7 @@ class Dataset[T] private[sql]( } } } - (rows, hasMoreData) + rows } /** @@ -317,12 +315,15 @@ class Dataset[T] private[sql]( truncate: Int = 20, vertical: Boolean = false): String = { val numRows = _numRows.max(0).min(Int.MaxValue - 1) - - val sb = new StringBuilder - val (rows, hasMoreData) = getRows(numRows, truncate, vertical) + // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data. + val rows = getRows(numRows, truncate, vertical) val fieldNames = rows.head - val dataRows = rows.tail + val data = rows.tail + + val hasMoreData = data.length > numRows + val dataRows = data.take(numRows) + val sb = new StringBuilder if (!vertical) { // Create SeparateLine val sep: String = fieldNames.map(_.length).toArray @@ -3226,6 +3227,18 @@ class Dataset[T] private[sql]( } } + private[sql] def getRowsToPython( + numRows: Int, + truncate: Int, + vertical: Boolean): Array[Any] = { + EvaluatePython.registerPicklers() + val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray + val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) + val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( + rows.iterator.map(toJava)) + PythonRDD.serveIterator(iter, "serve-GetRows") + } + /** * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. */ From 265ef9929042a6d7aac5956067043ad15e46c0d4 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 27 May 2018 23:39:22 +0800 Subject: [PATCH 08/14] code style --- python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index dc6cfea1d8e75..5c37c5ed85091 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -403,8 +403,8 @@ def _repr_html_(self): html += data html += "\n" if has_more_data: - html += "only showing top %d %s\n" % (console_row, - "row" if console_row == 1 else "rows") + html += "only showing top %d %s\n" % ( + console_row, "row" if console_row == 1 else "rows") return html else: return None From 148ee50b4e0c723b3e5df09b0472f8624f216c69 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 30 May 2018 17:34:47 +0800 Subject: [PATCH 09/14] Address comments --- docs/configuration.md | 2 +- python/pyspark/sql/dataframe.py | 46 +++++++++++-------- .../scala/org/apache/spark/sql/Dataset.scala | 13 +++--- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 75b655a22d710..48ac60103221e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -460,7 +460,7 @@ Apart from these, the following properties are also available, and may be useful spark.sql.repl.eagerEval.enabled false - Enable eager evaluation or not. If true and repl you're using supports eager evaluation, + Enable eager evaluation or not. If true and REPL you are using supports eager evaluation, dataframe will be ran automatically and html table will feedback the queries user have defined (see SPARK-24215 for more details). diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5c37c5ed85091..be91180bce270 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -352,46 +352,54 @@ def show(self, n=20, truncate=True, vertical=False): else: print(self._jdf.showString(n, int(truncate), vertical)) - def _get_repl_config(self): - """Return the configs for eager evaluation each time when __repr__ or - _repr_html_ called by user or notebook. + @property + def eagerEval(self): + """Returns true if the eager evaluation enabled. """ - eager_eval = self.sql_ctx.getConf( + return self.sql_ctx.getConf( "spark.sql.repl.eagerEval.enabled", "false").lower() == "true" - console_row = int(self.sql_ctx.getConf( - "spark.sql.repl.eagerEval.maxNumRows", u"20")) - console_truncate = int(self.sql_ctx.getConf( - "spark.sql.repl.eagerEval.truncate", u"20")) - return (eager_eval, console_row, console_truncate) + + @property + def maxNumRows(self): + """Returns the max row number for eager evaluation. + """ + return int(self.sql_ctx.getConf( + "spark.sql.repl.eagerEval.maxNumRows", "20")) + + @property + def truncate(self): + """Returns the truncate length for eager evaluation. + """ + return int(self.sql_ctx.getConf( + "spark.sql.repl.eagerEval.truncate", "20")) def __repr__(self): - (eager_eval, console_row, console_truncate) = self._get_repl_config() - if not self._support_repr_html and eager_eval: + if not self._support_repr_html and self.eagerEval: vertical = False return self._jdf.showString( - console_row, console_truncate, vertical) + self.maxNumRows, self.truncate, vertical) else: return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def _repr_html_(self): """Returns a dataframe with html code when you enabled eager evaluation - by 'spark.sql.repl.eagerEval.enabled', this only called by REPL you're + by 'spark.sql.repl.eagerEval.enabled', this only called by REPL you are using support eager evaluation with HTML. """ import cgi if not self._support_repr_html: self._support_repr_html = True - (eager_eval, console_row, console_truncate) = self._get_repl_config() - if eager_eval: + if self.eagerEval: + max_num_rows = self.maxNumRows with SCCallSiteSync(self._sc) as css: vertical = False sock_info = self._jdf.getRowsToPython( - console_row, console_truncate, vertical) + max_num_rows, self.truncate, vertical) rows = list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) head = rows[0] row_data = rows[1:] - has_more_data = len(row_data) > console_row - row_data = row_data[0:console_row] + has_more_data = len(row_data) > max_num_rows + row_data = row_data[0:max_num_rows] html = "\n
" # generate table head @@ -404,7 +412,7 @@ def _repr_html_(self): html += "
\n" if has_more_data: html += "only showing top %d %s\n" % ( - console_row, "row" if console_row == 1 else "rows") + max_num_rows, "row" if max_num_rows == 1 else "rows") return html else: return None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 56c5817f5e19e..2ab84964f651d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -239,7 +239,7 @@ class Dataset[T] private[sql]( * @param numRows Number of rows to return * @param truncate If set to more than 0, truncates strings to `truncate` characters and * all cells will be aligned right. - * @param vertical If set to true, the rows to return don't need truncate. + * @param vertical If set to true, the rows to return do not need truncate. */ private[sql] def getRows( numRows: Int, @@ -289,8 +289,8 @@ class Dataset[T] private[sql]( } } - rows = rows.map { - _.zipWithIndex.map { case (cell, i) => + rows = rows.map { row => + row.zipWithIndex.map { case (cell, i) => if (truncate > 0) { StringUtils.leftPad(cell, colWidths(i)) } else { @@ -334,9 +334,7 @@ class Dataset[T] private[sql]( sb.append(sep) // data - dataRows.foreach { - _.addString(sb, "|", "|", "|\n") - } + dataRows.foreach(_.addString(sb, "|", "|", "|\n")) sb.append(sep) } else { // Extended display mode enabled @@ -3228,10 +3226,11 @@ class Dataset[T] private[sql]( } private[sql] def getRowsToPython( - numRows: Int, + _numRows: Int, truncate: Int, vertical: Boolean): Array[Any] = { EvaluatePython.registerPicklers() + val numRows = _numRows.max(0).min(Int.MaxValue - 1) val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( From c933c56d799bd3ae69bbea43135086bad7533aa9 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 1 Jun 2018 00:54:49 +0800 Subject: [PATCH 10/14] address comments --- python/pyspark/sql/dataframe.py | 16 ++--- python/pyspark/sql/tests.py | 10 +-- .../scala/org/apache/spark/sql/Dataset.scala | 70 +++++++++---------- 3 files changed, 46 insertions(+), 50 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index be91180bce270..a5fbc36a274cc 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -353,31 +353,31 @@ def show(self, n=20, truncate=True, vertical=False): print(self._jdf.showString(n, int(truncate), vertical)) @property - def eagerEval(self): + def _eager_eval(self): """Returns true if the eager evaluation enabled. """ return self.sql_ctx.getConf( "spark.sql.repl.eagerEval.enabled", "false").lower() == "true" @property - def maxNumRows(self): + def _max_num_rows(self): """Returns the max row number for eager evaluation. """ return int(self.sql_ctx.getConf( "spark.sql.repl.eagerEval.maxNumRows", "20")) @property - def truncate(self): + def _truncate(self): """Returns the truncate length for eager evaluation. """ return int(self.sql_ctx.getConf( "spark.sql.repl.eagerEval.truncate", "20")) def __repr__(self): - if not self._support_repr_html and self.eagerEval: + if not self._support_repr_html and self._eager_eval: vertical = False return self._jdf.showString( - self.maxNumRows, self.truncate, vertical) + self._max_num_rows, self._truncate, vertical) else: return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) @@ -389,12 +389,12 @@ def _repr_html_(self): import cgi if not self._support_repr_html: self._support_repr_html = True - if self.eagerEval: - max_num_rows = self.maxNumRows + if self._eager_eval: + max_num_rows = self._max_num_rows with SCCallSiteSync(self._sc) as css: vertical = False sock_info = self._jdf.getRowsToPython( - max_num_rows, self.truncate, vertical) + max_num_rows, self._truncate, vertical) rows = list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) head = rows[0] row_data = rows[1:] diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 436564841ca2c..89a939037b63f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3081,8 +3081,8 @@ def test_repr_html(self): self.assertEquals(None, df._repr_html_()) self.spark.conf.set("spark.sql.repl.eagerEval.enabled", "true") expected1 = """ - | - | + | + | | |
keyvalue
1 1
keyvalue
11
2222222222
|""" @@ -3090,15 +3090,15 @@ def test_repr_html(self): self.spark.conf.set("spark.sql.repl.eagerEval.truncate", 3) expected2 = """ | - | - | + | + | |
keyvalue
1 1
222 222
11
222222
|""" self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_()) self.spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 1) expected3 = """ | - | + | |
keyvalue
1 1
11
|only showing top 1 row |""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 2ab84964f651d..54f89c3ebec7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -216,9 +216,6 @@ class Dataset[T] private[sql]( // sqlContext must be val because a stable identifier is expected when you import implicits @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext - // We set a minimum column width at '3' in showString - val minimumColWidth = 3 - private[sql] def resolve(colName: String): NamedExpression = { queryExecution.analyzed.resolveQuoted(colName, sparkSession.sessionState.analyzer.resolver) .getOrElse { @@ -260,7 +257,7 @@ class Dataset[T] private[sql]( // For array values, replace Seq and Array with square brackets // For cells that are beyond `truncate` characters, replace it with the // first `truncate-3` and "..." - var rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row => + schema.fieldNames.toSeq +: data.map { row => row.toSeq.map { cell => val str = cell match { case null => "null" @@ -276,9 +273,33 @@ class Dataset[T] private[sql]( } }: Seq[String] } + } + + /** + * Compose the string representing rows for output + * + * @param _numRows Number of rows to show + * @param truncate If set to more than 0, truncates strings to `truncate` characters and + * all cells will be aligned right. + * @param vertical If set to true, prints output rows vertically (one line per column value). + */ + private[sql] def showString( + _numRows: Int, + truncate: Int = 20, + vertical: Boolean = false): String = { + val numRows = _numRows.max(0).min(Int.MaxValue - 1) + // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data. + val tmpRows = getRows(numRows, truncate, vertical) + + val hasMoreData = tmpRows.length - 1 > numRows + val rows = tmpRows.take(numRows + 1) + + val sb = new StringBuilder + val numCols = schema.fieldNames.length + // We set a minimum column width at '3' + val minimumColWidth = 3 if (!vertical) { - val numCols = schema.fieldNames.length // Initialise the width of each column to a minimum value val colWidths = Array.fill(numCols)(minimumColWidth) @@ -289,7 +310,7 @@ class Dataset[T] private[sql]( } } - rows = rows.map { row => + val paddedRows = rows.map { row => row.zipWithIndex.map { case (cell, i) => if (truncate > 0) { StringUtils.leftPad(cell, colWidths(i)) @@ -298,46 +319,21 @@ class Dataset[T] private[sql]( } } } - } - rows - } - - /** - * Compose the string representing rows for output - * - * @param _numRows Number of rows to show - * @param truncate If set to more than 0, truncates strings to `truncate` characters and - * all cells will be aligned right. - * @param vertical If set to true, prints output rows vertically (one line per column value). - */ - private[sql] def showString( - _numRows: Int, - truncate: Int = 20, - vertical: Boolean = false): String = { - val numRows = _numRows.max(0).min(Int.MaxValue - 1) - // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data. - val rows = getRows(numRows, truncate, vertical) - val fieldNames = rows.head - val data = rows.tail - - val hasMoreData = data.length > numRows - val dataRows = data.take(numRows) - val sb = new StringBuilder - if (!vertical) { // Create SeparateLine - val sep: String = fieldNames.map(_.length).toArray - .map("-" * _).addString(sb, "+", "+", "+\n").toString() + val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() // column names - fieldNames.addString(sb, "|", "|", "|\n") + paddedRows.head.addString(sb, "|", "|", "|\n") sb.append(sep) // data - dataRows.foreach(_.addString(sb, "|", "|", "|\n")) + paddedRows.tail.foreach(_.addString(sb, "|", "|", "|\n")) sb.append(sep) } else { // Extended display mode enabled + val fieldNames = rows.head + val dataRows = rows.tail // Compute the width of field name and data columns val fieldNameColWidth = fieldNames.foldLeft(minimumColWidth) { case (curMax, fieldName) => math.max(curMax, fieldName.length) @@ -362,7 +358,7 @@ class Dataset[T] private[sql]( } // Print a footer - if (vertical && dataRows.isEmpty) { + if (vertical && rows.tail.isEmpty) { // In a vertical mode, print an empty row set explicitly sb.append("(0 rows)\n") } else if (hasMoreData) { From 68fbceb20b4667335c8b409a2db2a54d72123864 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 1 Jun 2018 17:49:04 +0800 Subject: [PATCH 11/14] address comments --- docs/configuration.md | 10 +++++++--- python/pyspark/sql/dataframe.py | 4 +++- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 48ac60103221e..921dedf10aaee 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -461,7 +461,9 @@ Apart from these, the following properties are also available, and may be useful false Enable eager evaluation or not. If true and REPL you are using supports eager evaluation, - dataframe will be ran automatically and html table will feedback the queries user have defined + dataframe will be ran automatically. HTML table will feedback the queries user have defined if + _repl_html_ called by notebooks like Jupyter, otherwise for plain Python REPL, output + will be shown like dataframe.show() (see SPARK-24215 for more details). @@ -469,14 +471,16 @@ Apart from these, the following properties are also available, and may be useful spark.sql.repl.eagerEval.maxNumRows 20 - Default number of rows in HTML table. + Default number of rows in eager evaluation output HTML table generated by _repr_html_ or plain text, + this only take effect when spark.sql.repl.eagerEval.enabled set to true. spark.sql.repl.eagerEval.truncate 20 - Default number of truncate in HTML table. + Default number of truncate in eager evaluation output HTML table generated by _repr_html_ or + plain text, this only take effect when spark.sql.repl.eagerEval.enabled set to true. diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a5fbc36a274cc..53cdc2463d723 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -78,6 +78,8 @@ def __init__(self, jdf, sql_ctx): self.is_cached = False self._schema = None # initialized lazily self._lazy_rdd = None + # Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice + # by __repr__ and _repr_html_ while eager evaluation opened. self._support_repr_html = False @property @@ -390,7 +392,7 @@ def _repr_html_(self): if not self._support_repr_html: self._support_repr_html = True if self._eager_eval: - max_num_rows = self._max_num_rows + max_num_rows = max(self._max_num_rows, 0) with SCCallSiteSync(self._sc) as css: vertical = False sock_info = self._jdf.getRowsToPython( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 54f89c3ebec7f..f5526104690d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -334,6 +334,7 @@ class Dataset[T] private[sql]( // Extended display mode enabled val fieldNames = rows.head val dataRows = rows.tail + // Compute the width of field name and data columns val fieldNameColWidth = fieldNames.foldLeft(minimumColWidth) { case (curMax, fieldName) => math.max(curMax, fieldName.length) From 6fe1afd2461af181be64108f1edaf5d859339cb8 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sat, 2 Jun 2018 11:38:41 +0800 Subject: [PATCH 12/14] address comments --- docs/configuration.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 921dedf10aaee..5588c372d3e42 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -460,10 +460,10 @@ Apart from these, the following properties are also available, and may be useful spark.sql.repl.eagerEval.enabled false - Enable eager evaluation or not. If true and REPL you are using supports eager evaluation, - dataframe will be ran automatically. HTML table will feedback the queries user have defined if - _repl_html_ called by notebooks like Jupyter, otherwise for plain Python REPL, output - will be shown like dataframe.show() + Enable eager evaluation or not. If true and the REPL you are using supports eager evaluation, + Dataset will be ran automatically. The HTML table which generated by _repl_html_ + called by notebooks like Jupyter will feedback the queries user have defined. For plain Python + REPL, the output will be shown like dataframe.show() (see SPARK-24215 for more details). @@ -472,7 +472,7 @@ Apart from these, the following properties are also available, and may be useful 20 Default number of rows in eager evaluation output HTML table generated by _repr_html_ or plain text, - this only take effect when spark.sql.repl.eagerEval.enabled set to true. + this only take effect when spark.sql.repl.eagerEval.enabled is set to true. From b9ac31645733a2142f99dffd9a7afb0a167b5d8b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 4 Jun 2018 07:10:18 -0700 Subject: [PATCH 13/14] address comments --- python/pyspark/sql/dataframe.py | 16 +++++------ python/pyspark/sql/tests.py | 48 ++++++++++++++++----------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 53cdc2463d723..bbb4c4dbdcde8 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -393,24 +393,22 @@ def _repr_html_(self): self._support_repr_html = True if self._eager_eval: max_num_rows = max(self._max_num_rows, 0) - with SCCallSiteSync(self._sc) as css: - vertical = False - sock_info = self._jdf.getRowsToPython( - max_num_rows, self._truncate, vertical) + vertical = False + sock_info = self._jdf.getRowsToPython( + max_num_rows, self._truncate, vertical) rows = list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) head = rows[0] row_data = rows[1:] has_more_data = len(row_data) > max_num_rows - row_data = row_data[0:max_num_rows] + row_data = row_data[:max_num_rows] html = "\n\n" + html += "\n" % "\n" - html += data + html += "\n" % "
" # generate table head - html += "".join(map(lambda x: cgi.escape(x), head)) + "
%s
".join(map(lambda x: cgi.escape(x), head)) # generate table rows for row in row_data: - data = "
" + "".join(map(lambda x: cgi.escape(x), row)) + \ - "
%s
".join( + map(lambda x: cgi.escape(x), row)) html += "
\n" if has_more_data: html += "only showing top %d %s\n" % ( diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 89a939037b63f..487eb19c3b98a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3079,30 +3079,30 @@ def test_repr_html(self): pattern = re.compile(r'^ *\|', re.MULTILINE) df = self.spark.createDataFrame([(1, "1"), (22222, "22222")], ("key", "value")) self.assertEquals(None, df._repr_html_()) - self.spark.conf.set("spark.sql.repl.eagerEval.enabled", "true") - expected1 = """ - | - | - | - |
keyvalue
11
2222222222
- |""" - self.assertEquals(re.sub(pattern, '', expected1), df._repr_html_()) - self.spark.conf.set("spark.sql.repl.eagerEval.truncate", 3) - expected2 = """ - | - | - | - |
keyvalue
11
222222
- |""" - self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_()) - self.spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 1) - expected3 = """ - | - | - |
keyvalue
11
- |only showing top 1 row - |""" - self.assertEquals(re.sub(pattern, '', expected3), df._repr_html_()) + with self.sql_conf({"spark.sql.repl.eagerEval.enabled": True}): + expected1 = """ + | + | + | + |
keyvalue
11
2222222222
+ |""" + self.assertEquals(re.sub(pattern, '', expected1), df._repr_html_()) + with self.sql_conf({"spark.sql.repl.eagerEval.truncate": 3}): + expected2 = """ + | + | + | + |
keyvalue
11
222222
+ |""" + self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_()) + with self.sql_conf({"spark.sql.repl.eagerEval.maxNumRows": 1}): + expected3 = """ + | + | + |
keyvalue
11
+ |only showing top 1 row + |""" + self.assertEquals(re.sub(pattern, '', expected3), df._repr_html_()) class HiveSparkSubmitTests(SparkSubmitTests): From 597b8d515fd3bcd117b22ae29b05cd8a58d37ca2 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 4 Jun 2018 07:57:52 -0700 Subject: [PATCH 14/14] bug fix --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bbb4c4dbdcde8..1e6a1acebb5ca 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -402,7 +402,7 @@ def _repr_html_(self): has_more_data = len(row_data) > max_num_rows row_data = row_data[:max_num_rows] - html = "\n
" + html = "\n" # generate table head html += "\n" % "
%s
".join(map(lambda x: cgi.escape(x), head)) # generate table rows