-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-34265][PYTHON][SQL] Instrument Python UDFs using SQL metrics #33559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
d433275 to
3077698
Compare
b1034d2 to
903feb2
Compare
903feb2 to
f43d553
Compare
f43d553 to
c7f1603
Compare
c7f1603 to
4c5ba80
Compare
|
HI @LucaCanali. Thanks for your proposal! Could you please follow the instructions in the failed workflow run to enable GitHub actions for your fork? TIA. |
|
cc @HyukjinKwon |
4c5ba80 to
0979510
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
65520d1 to
46c986c
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
Outdated
Show resolved
Hide resolved
|
Looks fine from a cursory look .. but let me add some more Python and SQL people here - @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler FYI |
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
228961e to
b7febaa
Compare
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine but it'd be great to have another look from SQL or other people to check, e.g. if the name is consistent, etc.
cc @srowen too FYI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, @LucaCanali I think we should ideally make the existing case working instead of avoiding it in the test. Can we try to make the matrix lazy val, and assign it to the local variable, and use it in rdd.mapPartitions block at doExecute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g., define val localPythonMetrics = pythonMetrics at line 79, and replace pythonMetrics to localPythonMetrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon for looking into this.
Unfortunately the proposed solution/test of using val localPythonMetrics = pythonMetrics does not appear to work.
Using lazy val for the metrics appears to break many tesdts with Python. In particular I can see in that case that when using pyspark and "going through rdd" as in df_with_udf.rdd.collect() we get java.lang.NullPointerException.
I would not propose to skip the failing test in postgreSQL/udf-aggregates_part3.sql, but rather move it to a Python test: see test_pandas_udf_nested in test_pandas_udf.py
However if we can understand more clearly where this issue comes from, all the better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to clarify that the proposed solution for the failing test is to move the test to a Python test in python/pyspark/sql/tests/test_pandas_udf.py where it runs OK. I would (try to) argue that the issue may come for the way the Python UDF test in udf-aggregates_part3.sql is executed via Scala and the fact that that particular test is referring to udf twice: udf((select udf(count(*)) which apparently creates an issue there, while it works fine in a Python test.
|
I understand from @HyukjinKwon comment on January 18 that there should be more people expert in Spark's use of Python and SQL to review this. @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler would you be interested? |
9e3a61e to
436589e
Compare
436589e to
044964b
Compare
044964b to
b4898d6
Compare
b4898d6 to
04b5e4e
Compare
|
The issue with SQLQueryTestSuite.udf/postgreSQL/udf-aggregates_part3.sql should be fixed now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it just metrics.toSeq?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cloud-fan for reviewing. Good point, I have refactored this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we mix-in PythonSQLMetrics here?
e8360a4 to
d43b1c7
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
Show resolved
Hide resolved
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| metrics // force lazy init at driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we don't do this force lazy init?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not force lazy init the test sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part3.sql fails, see discussion above.
| } | ||
|
|
||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| metrics // force lazy init at driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I have now removed it, thanks.
|
thanks, merging to master! |
|
Thank you @cloud-fan ! |
…se in Scala side ### What changes were proposed in this pull request? This PR adds `assume` in the Python test added in #33559. ### Why are the changes needed? In some testing environment, Python does not exist. This is consistent with other tests in this file. Otherwise, it'd fails as below: ``` java.lang.RuntimeException: Python availability: [true], pyspark availability: [false] at org.apache.spark.sql.IntegratedUDFTestUtils$.pythonFunc$lzycompute(IntegratedUDFTestUtils.scala:192) at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pythonFunc(IntegratedUDFTestUtils.scala:172) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF$$anon$1.<init>(IntegratedUDFTestUtils.scala:337) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:334) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf(IntegratedUDFTestUtils.scala:334) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.apply(IntegratedUDFTestUtils.scala:359) at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$11(PythonUDFSuite.scala:105) ... ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually checked. Closes #38407 from HyukjinKwon/SPARK-34265. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes are proposed in this pull request? This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow. The proposed metrics are: - data sent to Python workers - data returned from Python workers - number of output rows ### Why are the changes needed? This aims at improving monitoring and performance troubleshooting of Python UDFs. In particular it is intended as an aid to answer performance-related questions such as: why is the UDF slow?, how much work has been done so far?, etc. ### Does this PR introduce _any_ user-facing change? SQL metrics are made available in the WEB UI. See the following examples:  ### How was this patch tested? Manually tested + a Python unit test and a Scala unit test have been added. Example code used for testing: ``` from pyspark.sql.functions import col, pandas_udf import time pandas_udf("long") def test_pandas(col1): time.sleep(0.02) return col1 * col1 spark.udf.register("test_pandas", test_pandas) spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1") spark.sql("select max(test_pandas(col1)) from t1").collect() ``` This is used to test with more data pushed to the Python workers: ``` from pyspark.sql.functions import col, pandas_udf import time pandas_udf("long") def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17): time.sleep(0.02) return col1 spark.udf.register("test_pandas", test_pandas) spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1") spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect() ``` This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows: ``` import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df.mapInPandas(filter_func, schema=df.schema).show() ``` This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received): ``` from pyspark.sql.functions import udf udf def test_udf(col1, col2): return col1 * col1 spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect() ``` Closes apache#33559 from LucaCanali/pythonUDFKeySQLMetrics. Authored-by: Luca Canali <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…se in Scala side ### What changes were proposed in this pull request? This PR adds `assume` in the Python test added in apache#33559. ### Why are the changes needed? In some testing environment, Python does not exist. This is consistent with other tests in this file. Otherwise, it'd fails as below: ``` java.lang.RuntimeException: Python availability: [true], pyspark availability: [false] at org.apache.spark.sql.IntegratedUDFTestUtils$.pythonFunc$lzycompute(IntegratedUDFTestUtils.scala:192) at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pythonFunc(IntegratedUDFTestUtils.scala:172) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF$$anon$1.<init>(IntegratedUDFTestUtils.scala:337) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:334) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf(IntegratedUDFTestUtils.scala:334) at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.apply(IntegratedUDFTestUtils.scala:359) at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$11(PythonUDFSuite.scala:105) ... ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually checked. Closes apache#38407 from HyukjinKwon/SPARK-34265. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? This PR proposes to support Python metrics in Python Data Source so the metrics are reported same as other Python execution and API. ### Why are the changes needed? Same metics (apache#33559) should be shown in Python Data Source reading. This is last missing part compared to other Python execution and API. ### Does this PR introduce _any_ user-facing change? Python Data Source has not been released yet, so no end-user facing change. It shows some new metrics in UI. Example: ```python from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition class TestDataSourceReader(DataSourceReader): def __init__(self, options): self.options = options def partitions(self): return [InputPartition(i) for i in range(3)] def read(self, partition): yield partition.value, str(partition.value) class TestDataSource(DataSource): classmethod def name(cls): return "test" def schema(self): return "x INT, y STRING" def reader(self, schema) -> "DataSourceReader": return TestDataSourceReader(self.options) spark.dataSource.register(TestDataSource) sql("CREATE TABLE tblA USING test") sql("SELECT * from tblA").show() ``` <img width="515" alt="Screenshot 2023-12-15 at 5 54 55 PM" src="https://github.com/apache/spark/assets/6477701/5b98af8c-798e-4b9f-9fde-5549ad8b3c65"> This is same as other Python nodes, UDFs, etc. ### How was this patch tested? Unittests were added, and manually tested via UI. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#44375 from HyukjinKwon/SPARK-46424. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes are proposed in this pull request?
This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow.
The proposed metrics are:
Why are the changes needed?
This aims at improving monitoring and performance troubleshooting of Python UDFs.
In particular it is intended as an aid to answer performance-related questions such as:
why is the UDF slow?, how much work has been done so far?, etc.
Does this PR introduce any user-facing change?
SQL metrics are made available in the WEB UI.
See the following examples:
How was this patch tested?
Manually tested + a Python unit test and a Scala unit test have been added.
Example code used for testing:
This is used to test with more data pushed to the Python workers:
This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows:
This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received):