Skip to content
Closed
27 changes: 27 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,33 @@ Apart from these, the following properties are also available, and may be useful
from JVM to Python worker for every task.
</td>
</tr>
<tr>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the SQL configurations should follow what we did in the section of Spark SQL https://spark.apache.org/docs/latest/configuration.html#spark-sql.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These confs are not part of spark.sql("SET -v").show(numRows = 200, truncate = false).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I'll add there configurations into SQLConf.scala in the follow up pr.

<td><code>spark.sql.repl.eagerEval.enabled</code></td>
<td>false</td>
<td>
Enable eager evaluation or not. If true and the REPL you are using supports eager evaluation,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. When the REPL does not support eager evaluation, could we do anything better instead of silently ignoring the user inputs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's hard to have a better way because we can hardly perceive it in dataset while REPL does not support eager evaluation.

Copy link
Member

@felixcheung felixcheung Jun 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't completely follow actually - what makes a "REPL does not support eager evaluation"?
in fact, this "eager evaluation" is just object rendering support build into REPL, notebook etc (like print), it's not really a design to be "eager evaluation"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. We are not adding the eager execution in the our Dataset/DataFrame. We just rely on the REPL to trigger it. We need an update on the parameter description to emphasize it.

Dataset will be ran automatically. The HTML table which generated by <code>_repl_html_</code>
called by notebooks like Jupyter will feedback the queries user have defined. For plain Python
REPL, the output will be shown like <code>dataframe.show()</code>
(see <a href="https://issues.apache.org/jira/browse/SPARK-24215">SPARK-24215</a> for more details).
</td>
</tr>
<tr>
<td><code>spark.sql.repl.eagerEval.maxNumRows</code></td>
<td>20</td>
<td>
Default number of rows in eager evaluation output HTML table generated by <code>_repr_html_</code> or plain text,
this only take effect when <code>spark.sql.repl.eagerEval.enabled</code> is set to true.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take -> takes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

</td>
</tr>
<tr>
<td><code>spark.sql.repl.eagerEval.truncate</code></td>
<td>20</td>
<td>
Default number of truncate in eager evaluation output HTML table generated by <code>_repr_html_</code> or
plain text, this only take effect when <code>spark.sql.repl.eagerEval.enabled</code> set to true.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take -> takes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks, address in next follow up PR.

</td>
</tr>
<tr>
<td><code>spark.files</code></td>
<td></td>
Expand Down
65 changes: 64 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def __init__(self, jdf, sql_ctx):
self.is_cached = False
self._schema = None # initialized lazily
self._lazy_rdd = None
# Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice
# by __repr__ and _repr_html_ while eager evaluation opened.
self._support_repr_html = False

@property
@since(1.3)
Expand Down Expand Up @@ -351,8 +354,68 @@ def show(self, n=20, truncate=True, vertical=False):
else:
print(self._jdf.showString(n, int(truncate), vertical))

@property
def _eager_eval(self):
"""Returns true if the eager evaluation enabled.
"""
return self.sql_ctx.getConf(
"spark.sql.repl.eagerEval.enabled", "false").lower() == "true"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that possible we can avoid hard-coding these conf key values? cc @ueshin @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should access to SQLConf object. 1. Agree with not hardcoding it in general but 2. IMHO I want to avoid Py4J JVM accesses in the test because the test can likely be more flaky up to my knowledge, on the other hand (unlike Scala or Java side).

Maybe we should try to take a look about this hardcoding if we see more occurrences next time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ongoing release, a nice-to-have refactoring is to move all the Core Confs into a single file just like what we did in Spark SQL Conf. Default values, boundary checking, types and descriptions. Thus, in PySpark, it would be better to do it starting from now.


@property
def _max_num_rows(self):
"""Returns the max row number for eager evaluation.
"""
return int(self.sql_ctx.getConf(
"spark.sql.repl.eagerEval.maxNumRows", "20"))

@property
def _truncate(self):
"""Returns the truncate length for eager evaluation.
"""
return int(self.sql_ctx.getConf(
"spark.sql.repl.eagerEval.truncate", "20"))

def __repr__(self):
Copy link
Member

@gatorsmile gatorsmile Jun 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also changed __repr__. Thus, we need to update the PR title and description. A better PR title should be like Implement eager evaluation for DataFrame APIs in PySpark

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for this and also the PR description as your comment, sorry for this and I'll pay more attention to that next time when the implementation changes during PR review. Is there any way to change the committed git log? Sorry for this.

return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
if not self._support_repr_html and self._eager_eval:
vertical = False
return self._jdf.showString(
self._max_num_rows, self._truncate, vertical)
else:
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))

def _repr_html_(self):
"""Returns a dataframe with html code when you enabled eager evaluation
by 'spark.sql.repl.eagerEval.enabled', this only called by REPL you are
using support eager evaluation with HTML.
"""
import cgi
if not self._support_repr_html:
self._support_repr_html = True
if self._eager_eval:
max_num_rows = max(self._max_num_rows, 0)
vertical = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any discussion about this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the discussion before linked below:
#21370 (comment)
and
#21370 (comment)
We need a named arguments for boolean flags here, but here limited by the named arguments can't work during python call _jdf func, so we do the work around like this.

sock_info = self._jdf.getRowsToPython(
max_num_rows, self._truncate, vertical)
rows = list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
head = rows[0]
row_data = rows[1:]
has_more_data = len(row_data) > max_num_rows
row_data = row_data[:max_num_rows]

html = "<table border='1'>\n"
# generate table head
html += "<tr><th>%s</th></tr>\n" % "</th><th>".join(map(lambda x: cgi.escape(x), head))
# generate table rows
for row in row_data:
html += "<tr><td>%s</td></tr>\n" % "</td><td>".join(
map(lambda x: cgi.escape(x), row))
html += "</table>\n"
if has_more_data:
html += "only showing top %d %s\n" % (
max_num_rows, "row" if max_num_rows == 1 else "rows")
return html
else:
return None

@since(2.1)
def checkpoint(self, eager=True):
Expand Down
30 changes: 30 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,36 @@ def test_checking_csv_header(self):
finally:
shutil.rmtree(path)

def test_repr_html(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function only covers the most basic positive case. We need also add more test cases. For example, the results when spark.sql.repl.eagerEval.enabled is set to false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the follow up pr will enhance this test.

import re
pattern = re.compile(r'^ *\|', re.MULTILINE)
df = self.spark.createDataFrame([(1, "1"), (22222, "22222")], ("key", "value"))
self.assertEquals(None, df._repr_html_())
with self.sql_conf({"spark.sql.repl.eagerEval.enabled": True}):
expected1 = """<table border='1'>
|<tr><th>key</th><th>value</th></tr>
|<tr><td>1</td><td>1</td></tr>
|<tr><td>22222</td><td>22222</td></tr>
|</table>
|"""
self.assertEquals(re.sub(pattern, '', expected1), df._repr_html_())
with self.sql_conf({"spark.sql.repl.eagerEval.truncate": 3}):
expected2 = """<table border='1'>
|<tr><th>key</th><th>value</th></tr>
|<tr><td>1</td><td>1</td></tr>
|<tr><td>222</td><td>222</td></tr>
|</table>
|"""
self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_())
with self.sql_conf({"spark.sql.repl.eagerEval.maxNumRows": 1}):
expected3 = """<table border='1'>
|<tr><th>key</th><th>value</th></tr>
|<tr><td>1</td><td>1</td></tr>
|</table>
|only showing top 1 row
|"""
self.assertEquals(re.sub(pattern, '', expected3), df._repr_html_())


class HiveSparkSubmitTests(SparkSubmitTests):

Expand Down
84 changes: 55 additions & 29 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ class Dataset[T] private[sql](
}

/**
* Compose the string representing rows for output
* Get rows represented in Sequence by specific truncate and vertical requirement.
*
* @param _numRows Number of rows to show
* @param numRows Number of rows to return
* @param truncate If set to more than 0, truncates strings to `truncate` characters and
* all cells will be aligned right.
* @param vertical If set to true, prints output rows vertically (one line per column value).
* @param vertical If set to true, the rows to return do not need truncate.
*/
private[sql] def showString(
_numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = {
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
private[sql] def getRows(
numRows: Int,
truncate: Int,
vertical: Boolean): Seq[Seq[String]] = {
val newDf = toDF()
val castCols = newDf.logicalPlan.output.map { col =>
// Since binary types in top-level schema fields have a specific format to print,
Expand All @@ -251,14 +252,12 @@ class Dataset[T] private[sql](
Column(col).cast(StringType)
}
}
val takeResult = newDf.select(castCols: _*).take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
val data = newDf.select(castCols: _*).take(numRows + 1)

// For array values, replace Seq and Array with square brackets
// For cells that are beyond `truncate` characters, replace it with the
// first `truncate-3` and "..."
val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>
schema.fieldNames.toSeq +: data.map { row =>
row.toSeq.map { cell =>
val str = cell match {
case null => "null"
Expand All @@ -274,6 +273,26 @@ class Dataset[T] private[sql](
}
}: Seq[String]
}
}

/**
* Compose the string representing rows for output
*
* @param _numRows Number of rows to show
* @param truncate If set to more than 0, truncates strings to `truncate` characters and
* all cells will be aligned right.
* @param vertical If set to true, prints output rows vertically (one line per column value).
*/
private[sql] def showString(
_numRows: Int,
truncate: Int = 20,
vertical: Boolean = false): String = {
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
// Get rows represented by Seq[Seq[String]], we may get one more line if it has more data.
val tmpRows = getRows(numRows, truncate, vertical)

val hasMoreData = tmpRows.length - 1 > numRows
val rows = tmpRows.take(numRows + 1)

val sb = new StringBuilder
val numCols = schema.fieldNames.length
Expand All @@ -291,31 +310,25 @@ class Dataset[T] private[sql](
}
}

val paddedRows = rows.map { row =>
row.zipWithIndex.map { case (cell, i) =>
if (truncate > 0) {
StringUtils.leftPad(cell, colWidths(i))
} else {
StringUtils.rightPad(cell, colWidths(i))
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do this in showString? And we can move minimumColWidth into the showString in that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this in getRows here is to reuse the truncate logic. I think its the same problem with we discuss in here:
image
If we don't need truncate, we can move the logic and minimumColWidth in showString. I would like to hear your suggestions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the truncation is already done when creating rows above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, the pad rows only useful in console mode, so not need in html code. I'll do this ASAP.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 9c6b3bb.


// Create SeparateLine
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()

// column names
rows.head.zipWithIndex.map { case (cell, i) =>
if (truncate > 0) {
StringUtils.leftPad(cell, colWidths(i))
} else {
StringUtils.rightPad(cell, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")

paddedRows.head.addString(sb, "|", "|", "|\n")
sb.append(sep)

// data
rows.tail.foreach {
_.zipWithIndex.map { case (cell, i) =>
if (truncate > 0) {
StringUtils.leftPad(cell.toString, colWidths(i))
} else {
StringUtils.rightPad(cell.toString, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")
}

paddedRows.tail.foreach(_.addString(sb, "|", "|", "|\n"))
sb.append(sep)
} else {
// Extended display mode enabled
Expand Down Expand Up @@ -346,7 +359,7 @@ class Dataset[T] private[sql](
}

// Print a footer
if (vertical && data.isEmpty) {
if (vertical && rows.tail.isEmpty) {
// In a vertical mode, print an empty row set explicitly
sb.append("(0 rows)\n")
} else if (hasMoreData) {
Expand Down Expand Up @@ -3209,6 +3222,19 @@ class Dataset[T] private[sql](
}
}

private[sql] def getRowsToPython(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DataFrameSuite, we have multiple test cases for showString instead of getRows , which is introduced in this PR.

We also need the unit test cases for getRowsToPython.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the follow up pr I'm working on will add more test for getRows and getRowsToPython.

_numRows: Int,
truncate: Int,
vertical: Boolean): Array[Any] = {
EvaluatePython.registerPicklers()
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be also part of the conf description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, will be fixed in another pr.

val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray
val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType)))
val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
rows.iterator.map(toJava))
PythonRDD.serveIterator(iter, "serve-GetRows")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PythonRDD.serveIterator(iter, "serve-GetRows") returns Int, but the return type of getRowsToPython is Array[Any]. How does it work? cc @xuanyuanking @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we return Array[Any] for PythonRDD.serveIterator too.

def serveIterator(items: Iterator[_], threadName: String): Array[Any] = {

Did I maybe miss something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer with @HyukjinKwon about the return type, and actually the exact return type we need here is Array[Array[String]], this defined in toJava func.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes of the return types were made in a commit [PYSPARK] Update py4j to version 0.10.7.... No PR was opened...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re the py4j commit - there's a good reason for it @gatorsmile
not sure if the change to return type is required with the py4j change though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we better don't talk about this here though.

}

/**
* Collect a Dataset as ArrowPayload byte arrays and serve to PySpark.
*/
Expand Down