-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-24215][PySpark] Implement eager evaluation for DataFrame APIs in PySpark #21370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e1c566c
4353202
4e366e5
fc858af
d03697c
cca07c2
3649fa0
265ef99
148ee50
c933c56
68fbceb
6fe1afd
b9ac316
597b8d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -456,6 +456,33 @@ Apart from these, the following properties are also available, and may be useful | |
| from JVM to Python worker for every task. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.repl.eagerEval.enabled</code></td> | ||
| <td>false</td> | ||
| <td> | ||
| Enable eager evaluation or not. If true and the REPL you are using supports eager evaluation, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a question. When the REPL does not support eager evaluation, could we do anything better instead of silently ignoring the user inputs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it's hard to have a better way because we can hardly perceive it in dataset while REPL does not support eager evaluation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't completely follow actually - what makes a "REPL does not support eager evaluation"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is true. We are not adding the eager execution in the our Dataset/DataFrame. We just rely on the REPL to trigger it. We need an update on the parameter description to emphasize it. |
||
| Dataset will be ran automatically. The HTML table which generated by <code>_repl_html_</code> | ||
| called by notebooks like Jupyter will feedback the queries user have defined. For plain Python | ||
| REPL, the output will be shown like <code>dataframe.show()</code> | ||
| (see <a href="https://issues.apache.org/jira/browse/SPARK-24215">SPARK-24215</a> for more details). | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.repl.eagerEval.maxNumRows</code></td> | ||
| <td>20</td> | ||
| <td> | ||
| Default number of rows in eager evaluation output HTML table generated by <code>_repr_html_</code> or plain text, | ||
| this only take effect when <code>spark.sql.repl.eagerEval.enabled</code> is set to true. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take -> takes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks. |
||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.repl.eagerEval.truncate</code></td> | ||
| <td>20</td> | ||
| <td> | ||
| Default number of truncate in eager evaluation output HTML table generated by <code>_repr_html_</code> or | ||
| plain text, this only take effect when <code>spark.sql.repl.eagerEval.enabled</code> set to true. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take -> takes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks, address in next follow up PR. |
||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.files</code></td> | ||
| <td></td> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,9 @@ def __init__(self, jdf, sql_ctx): | |
| self.is_cached = False | ||
| self._schema = None # initialized lazily | ||
| self._lazy_rdd = None | ||
| # Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice | ||
| # by __repr__ and _repr_html_ while eager evaluation opened. | ||
| self._support_repr_html = False | ||
|
|
||
| @property | ||
| @since(1.3) | ||
|
|
@@ -351,8 +354,68 @@ def show(self, n=20, truncate=True, vertical=False): | |
| else: | ||
| print(self._jdf.showString(n, int(truncate), vertical)) | ||
|
|
||
| @property | ||
| def _eager_eval(self): | ||
| """Returns true if the eager evaluation enabled. | ||
| """ | ||
| return self.sql_ctx.getConf( | ||
| "spark.sql.repl.eagerEval.enabled", "false").lower() == "true" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that possible we can avoid hard-coding these conf key values? cc @ueshin @HyukjinKwon There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably, we should access to SQLConf object. 1. Agree with not hardcoding it in general but 2. IMHO I want to avoid Py4J JVM accesses in the test because the test can likely be more flaky up to my knowledge, on the other hand (unlike Scala or Java side). Maybe we should try to take a look about this hardcoding if we see more occurrences next time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the ongoing release, a nice-to-have refactoring is to move all the Core Confs into a single file just like what we did in Spark SQL Conf. Default values, boundary checking, types and descriptions. Thus, in PySpark, it would be better to do it starting from now. |
||
|
|
||
| @property | ||
| def _max_num_rows(self): | ||
| """Returns the max row number for eager evaluation. | ||
| """ | ||
| return int(self.sql_ctx.getConf( | ||
| "spark.sql.repl.eagerEval.maxNumRows", "20")) | ||
|
|
||
| @property | ||
| def _truncate(self): | ||
| """Returns the truncate length for eager evaluation. | ||
| """ | ||
| return int(self.sql_ctx.getConf( | ||
| "spark.sql.repl.eagerEval.truncate", "20")) | ||
|
|
||
| def __repr__(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR also changed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done for this and also the PR description as your comment, sorry for this and I'll pay more attention to that next time when the implementation changes during PR review. Is there any way to change the committed git log? Sorry for this. |
||
| return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) | ||
| if not self._support_repr_html and self._eager_eval: | ||
| vertical = False | ||
| return self._jdf.showString( | ||
| self._max_num_rows, self._truncate, vertical) | ||
| else: | ||
| return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) | ||
|
|
||
| def _repr_html_(self): | ||
| """Returns a dataframe with html code when you enabled eager evaluation | ||
| by 'spark.sql.repl.eagerEval.enabled', this only called by REPL you are | ||
| using support eager evaluation with HTML. | ||
| """ | ||
| import cgi | ||
| if not self._support_repr_html: | ||
| self._support_repr_html = True | ||
| if self._eager_eval: | ||
| max_num_rows = max(self._max_num_rows, 0) | ||
| vertical = False | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any discussion about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the discussion before linked below: |
||
| sock_info = self._jdf.getRowsToPython( | ||
| max_num_rows, self._truncate, vertical) | ||
| rows = list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) | ||
| head = rows[0] | ||
| row_data = rows[1:] | ||
| has_more_data = len(row_data) > max_num_rows | ||
| row_data = row_data[:max_num_rows] | ||
|
|
||
| html = "<table border='1'>\n" | ||
| # generate table head | ||
| html += "<tr><th>%s</th></tr>\n" % "</th><th>".join(map(lambda x: cgi.escape(x), head)) | ||
| # generate table rows | ||
| for row in row_data: | ||
| html += "<tr><td>%s</td></tr>\n" % "</td><td>".join( | ||
| map(lambda x: cgi.escape(x), row)) | ||
| html += "</table>\n" | ||
| if has_more_data: | ||
| html += "only showing top %d %s\n" % ( | ||
| max_num_rows, "row" if max_num_rows == 1 else "rows") | ||
| return html | ||
| else: | ||
| return None | ||
|
|
||
| @since(2.1) | ||
| def checkpoint(self, eager=True): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3074,6 +3074,36 @@ def test_checking_csv_header(self): | |
| finally: | ||
| shutil.rmtree(path) | ||
|
|
||
| def test_repr_html(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function only covers the most basic positive case. We need also add more test cases. For example, the results when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, the follow up pr will enhance this test. |
||
| import re | ||
| pattern = re.compile(r'^ *\|', re.MULTILINE) | ||
| df = self.spark.createDataFrame([(1, "1"), (22222, "22222")], ("key", "value")) | ||
| self.assertEquals(None, df._repr_html_()) | ||
| with self.sql_conf({"spark.sql.repl.eagerEval.enabled": True}): | ||
| expected1 = """<table border='1'> | ||
| |<tr><th>key</th><th>value</th></tr> | ||
| |<tr><td>1</td><td>1</td></tr> | ||
| |<tr><td>22222</td><td>22222</td></tr> | ||
| |</table> | ||
| |""" | ||
| self.assertEquals(re.sub(pattern, '', expected1), df._repr_html_()) | ||
| with self.sql_conf({"spark.sql.repl.eagerEval.truncate": 3}): | ||
| expected2 = """<table border='1'> | ||
| |<tr><th>key</th><th>value</th></tr> | ||
| |<tr><td>1</td><td>1</td></tr> | ||
| |<tr><td>222</td><td>222</td></tr> | ||
| |</table> | ||
| |""" | ||
| self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_()) | ||
| with self.sql_conf({"spark.sql.repl.eagerEval.maxNumRows": 1}): | ||
| expected3 = """<table border='1'> | ||
| |<tr><th>key</th><th>value</th></tr> | ||
| |<tr><td>1</td><td>1</td></tr> | ||
| |</table> | ||
| |only showing top 1 row | ||
| |""" | ||
| self.assertEquals(re.sub(pattern, '', expected3), df._repr_html_()) | ||
|
|
||
|
|
||
| class HiveSparkSubmitTests(SparkSubmitTests): | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -231,16 +231,17 @@ class Dataset[T] private[sql]( | |||
| } | ||||
|
|
||||
| /** | ||||
| * Compose the string representing rows for output | ||||
| * Get rows represented in Sequence by specific truncate and vertical requirement. | ||||
| * | ||||
| * @param _numRows Number of rows to show | ||||
| * @param numRows Number of rows to return | ||||
| * @param truncate If set to more than 0, truncates strings to `truncate` characters and | ||||
| * all cells will be aligned right. | ||||
| * @param vertical If set to true, prints output rows vertically (one line per column value). | ||||
| * @param vertical If set to true, the rows to return do not need truncate. | ||||
| */ | ||||
| private[sql] def showString( | ||||
| _numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = { | ||||
| val numRows = _numRows.max(0).min(Int.MaxValue - 1) | ||||
| private[sql] def getRows( | ||||
| numRows: Int, | ||||
| truncate: Int, | ||||
| vertical: Boolean): Seq[Seq[String]] = { | ||||
| val newDf = toDF() | ||||
| val castCols = newDf.logicalPlan.output.map { col => | ||||
| // Since binary types in top-level schema fields have a specific format to print, | ||||
|
|
@@ -251,14 +252,12 @@ class Dataset[T] private[sql]( | |||
| Column(col).cast(StringType) | ||||
| } | ||||
| } | ||||
| val takeResult = newDf.select(castCols: _*).take(numRows + 1) | ||||
| val hasMoreData = takeResult.length > numRows | ||||
| val data = takeResult.take(numRows) | ||||
| val data = newDf.select(castCols: _*).take(numRows + 1) | ||||
|
|
||||
| // For array values, replace Seq and Array with square brackets | ||||
| // For cells that are beyond `truncate` characters, replace it with the | ||||
| // first `truncate-3` and "..." | ||||
| val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row => | ||||
| schema.fieldNames.toSeq +: data.map { row => | ||||
| row.toSeq.map { cell => | ||||
| val str = cell match { | ||||
| case null => "null" | ||||
|
|
@@ -274,6 +273,26 @@ class Dataset[T] private[sql]( | |||
| } | ||||
| }: Seq[String] | ||||
| } | ||||
| } | ||||
|
|
||||
| /** | ||||
| * Compose the string representing rows for output | ||||
| * | ||||
| * @param _numRows Number of rows to show | ||||
| * @param truncate If set to more than 0, truncates strings to `truncate` characters and | ||||
| * all cells will be aligned right. | ||||
| * @param vertical If set to true, prints output rows vertically (one line per column value). | ||||
| */ | ||||
| private[sql] def showString( | ||||
| _numRows: Int, | ||||
| truncate: Int = 20, | ||||
| vertical: Boolean = false): String = { | ||||
| val numRows = _numRows.max(0).min(Int.MaxValue - 1) | ||||
| // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data. | ||||
| val tmpRows = getRows(numRows, truncate, vertical) | ||||
|
|
||||
| val hasMoreData = tmpRows.length - 1 > numRows | ||||
| val rows = tmpRows.take(numRows + 1) | ||||
|
|
||||
| val sb = new StringBuilder | ||||
| val numCols = schema.fieldNames.length | ||||
|
|
@@ -291,31 +310,25 @@ class Dataset[T] private[sql]( | |||
| } | ||||
| } | ||||
|
|
||||
| val paddedRows = rows.map { row => | ||||
| row.zipWithIndex.map { case (cell, i) => | ||||
| if (truncate > 0) { | ||||
| StringUtils.leftPad(cell, colWidths(i)) | ||||
| } else { | ||||
| StringUtils.rightPad(cell, colWidths(i)) | ||||
| } | ||||
| } | ||||
| } | ||||
|
||||
|
|
||||
| // Create SeparateLine | ||||
| val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() | ||||
|
|
||||
| // column names | ||||
| rows.head.zipWithIndex.map { case (cell, i) => | ||||
| if (truncate > 0) { | ||||
| StringUtils.leftPad(cell, colWidths(i)) | ||||
| } else { | ||||
| StringUtils.rightPad(cell, colWidths(i)) | ||||
| } | ||||
| }.addString(sb, "|", "|", "|\n") | ||||
|
|
||||
| paddedRows.head.addString(sb, "|", "|", "|\n") | ||||
| sb.append(sep) | ||||
|
|
||||
| // data | ||||
| rows.tail.foreach { | ||||
| _.zipWithIndex.map { case (cell, i) => | ||||
| if (truncate > 0) { | ||||
| StringUtils.leftPad(cell.toString, colWidths(i)) | ||||
| } else { | ||||
| StringUtils.rightPad(cell.toString, colWidths(i)) | ||||
| } | ||||
| }.addString(sb, "|", "|", "|\n") | ||||
| } | ||||
|
|
||||
| paddedRows.tail.foreach(_.addString(sb, "|", "|", "|\n")) | ||||
| sb.append(sep) | ||||
| } else { | ||||
| // Extended display mode enabled | ||||
|
|
@@ -346,7 +359,7 @@ class Dataset[T] private[sql]( | |||
| } | ||||
|
|
||||
| // Print a footer | ||||
| if (vertical && data.isEmpty) { | ||||
| if (vertical && rows.tail.isEmpty) { | ||||
| // In a vertical mode, print an empty row set explicitly | ||||
| sb.append("(0 rows)\n") | ||||
| } else if (hasMoreData) { | ||||
|
|
@@ -3209,6 +3222,19 @@ class Dataset[T] private[sql]( | |||
| } | ||||
| } | ||||
|
|
||||
| private[sql] def getRowsToPython( | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In DataFrameSuite, we have multiple test cases for We also need the unit test cases for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, the follow up pr I'm working on will add more test for |
||||
| _numRows: Int, | ||||
| truncate: Int, | ||||
| vertical: Boolean): Array[Any] = { | ||||
| EvaluatePython.registerPicklers() | ||||
| val numRows = _numRows.max(0).min(Int.MaxValue - 1) | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be also part of the conf description. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, will be fixed in another pr. |
||||
| val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray | ||||
| val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) | ||||
| val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( | ||||
| rows.iterator.map(toJava)) | ||||
| PythonRDD.serveIterator(iter, "serve-GetRows") | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we return
Did I maybe miss something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same answer with @HyukjinKwon about the return type, and actually the exact return type we need here is Array[Array[String]], this defined in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes of the return types were made in a commit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup .. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. re the py4j commit - there's a good reason for it @gatorsmile There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we better don't talk about this here though. |
||||
| } | ||||
|
|
||||
| /** | ||||
| * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. | ||||
| */ | ||||
|
|
||||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the SQL configurations should follow what we did in the section of
Spark SQLhttps://spark.apache.org/docs/latest/configuration.html#spark-sql.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These confs are not part of
spark.sql("SET -v").show(numRows = 200, truncate = false).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I'll add there configurations into SQLConf.scala in the follow up pr.