Skip to content

Conversation

@LucaCanali
Copy link
Contributor

@LucaCanali LucaCanali commented Jul 28, 2021

What changes are proposed in this pull request?

This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow.
The proposed metrics are:

  • data sent to Python workers
  • data returned from Python workers
  • number of output rows

Why are the changes needed?

This aims at improving monitoring and performance troubleshooting of Python UDFs.
In particular it is intended as an aid to answer performance-related questions such as:
why is the UDF slow?, how much work has been done so far?, etc.

Does this PR introduce any user-facing change?

SQL metrics are made available in the WEB UI.
See the following examples:

image1

How was this patch tested?

Manually tested + a Python unit test and a Scala unit test have been added.

Example code used for testing:

from pyspark.sql.functions import col, pandas_udf
import time

@pandas_udf("long")
def test_pandas(col1):
  time.sleep(0.02)
  return col1 * col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1)) from t1").collect()

This is used to test with more data pushed to the Python workers:

from pyspark.sql.functions import col, pandas_udf
import time

@pandas_udf("long")
def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17):
  time.sleep(0.02)
  return col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect()

This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows:

import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()

This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received):

from pyspark.sql.functions import udf

@udf
def test_udf(col1, col2):
     return col1 * col1

spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect()

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from d433275 to 3077698 Compare August 18, 2021 10:04
@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch 2 times, most recently from b1034d2 to 903feb2 Compare August 26, 2021 09:22
@LucaCanali LucaCanali changed the title [SPARK-34265][PYTHON] Instrument Python UDFs using SQL metrics [SPARK-34265][WIP][PYTHON] Instrument Python UDFs using SQL metrics Aug 26, 2021
@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from 903feb2 to f43d553 Compare October 19, 2021 09:42
@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from f43d553 to c7f1603 Compare January 11, 2022 14:57
@LucaCanali LucaCanali changed the title [SPARK-34265][WIP][PYTHON] Instrument Python UDFs using SQL metrics [SPARK-34265][WIP][PYTHON] Instrument Python Pandas UDFs using SQL metrics Jan 12, 2022
@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from c7f1603 to 4c5ba80 Compare January 12, 2022 19:16
@LucaCanali LucaCanali changed the title [SPARK-34265][WIP][PYTHON] Instrument Python Pandas UDFs using SQL metrics [SPARK-34265][PYTHON] Instrument Pandas UDFs using SQL metrics Jan 12, 2022
@LucaCanali LucaCanali changed the title [SPARK-34265][PYTHON] Instrument Pandas UDFs using SQL metrics [SPARK-34265][PYTHON][SQL] Instrument Pandas UDFs using SQL metrics Jan 12, 2022
@zero323
Copy link
Member

zero323 commented Jan 12, 2022

HI @LucaCanali. Thanks for your proposal!

Could you please follow the instructions in the failed workflow run to enable GitHub actions for your fork? TIA.

@zero323
Copy link
Member

zero323 commented Jan 12, 2022

cc @HyukjinKwon

@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from 4c5ba80 to 0979510 Compare January 13, 2022 08:46
@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from 65520d1 to 46c986c Compare January 17, 2022 09:10
@HyukjinKwon
Copy link
Member

Looks fine from a cursory look .. but let me add some more Python and SQL people here - @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler FYI

@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from 228961e to b7febaa Compare February 2, 2022 20:12
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine but it'd be great to have another look from SQL or other people to check, e.g. if the name is consistent, etc.

cc @srowen too FYI.

Copy link
Member

@HyukjinKwon HyukjinKwon Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, @LucaCanali I think we should ideally make the existing case working instead of avoiding it in the test. Can we try to make the matrix lazy val, and assign it to the local variable, and use it in rdd.mapPartitions block at doExecute?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., define val localPythonMetrics = pythonMetrics at line 79, and replace pythonMetrics to localPythonMetrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HyukjinKwon for looking into this.
Unfortunately the proposed solution/test of using val localPythonMetrics = pythonMetrics does not appear to work.
Using lazy val for the metrics appears to break many tesdts with Python. In particular I can see in that case that when using pyspark and "going through rdd" as in df_with_udf.rdd.collect() we get java.lang.NullPointerException.
I would not propose to skip the failing test in postgreSQL/udf-aggregates_part3.sql, but rather move it to a Python test: see test_pandas_udf_nested in test_pandas_udf.py
However if we can understand more clearly where this issue comes from, all the better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to clarify that the proposed solution for the failing test is to move the test to a Python test in python/pyspark/sql/tests/test_pandas_udf.py where it runs OK. I would (try to) argue that the issue may come for the way the Python UDF test in udf-aggregates_part3.sql is executed via Scala and the fact that that particular test is referring to udf twice: udf((select udf(count(*)) which apparently creates an issue there, while it works fine in a Python test.

@LucaCanali
Copy link
Contributor Author

I understand from @HyukjinKwon comment on January 18 that there should be more people expert in Spark's use of Python and SQL to review this. @cloud-fan, @maryannxue, @viirya @ueshin @BryanCutler would you be interested?

@LucaCanali
Copy link
Contributor Author

The issue with SQLQueryTestSuite.udf/postgreSQL/udf-aggregates_part3.sql should be fixed now.
I have also extended the instrumentation to applyInPandasWithState recently introduced in SPARK-40434

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it just metrics.toSeq?

Copy link
Contributor Author

@LucaCanali LucaCanali Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cloud-fan for reviewing. Good point, I have refactored this part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we mix-in PythonSQLMetrics here?

@LucaCanali LucaCanali force-pushed the pythonUDFKeySQLMetrics branch from e8360a4 to d43b1c7 Compare October 14, 2022 08:12
}

protected override def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we don't do this force lazy init?

Copy link
Contributor Author

@LucaCanali LucaCanali Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not force lazy init the test sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part3.sql fails, see discussion above.

}

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I have now removed it, thanks.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in e966c38 Oct 24, 2022
@LucaCanali
Copy link
Contributor Author

Thank you @cloud-fan !

HyukjinKwon added a commit that referenced this pull request Oct 27, 2022
…se in Scala side

### What changes were proposed in this pull request?

This PR adds `assume` in the Python test added in #33559.

### Why are the changes needed?

In some testing environment, Python does not exist. This is consistent with other tests in this file. Otherwise, it'd fails as below:

```
        java.lang.RuntimeException: Python availability: [true], pyspark availability: [false]
      at org.apache.spark.sql.IntegratedUDFTestUtils$.pythonFunc$lzycompute(IntegratedUDFTestUtils.scala:192)
      at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pythonFunc(IntegratedUDFTestUtils.scala:172)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF$$anon$1.<init>(IntegratedUDFTestUtils.scala:337)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:334)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf(IntegratedUDFTestUtils.scala:334)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.apply(IntegratedUDFTestUtils.scala:359)
      at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$11(PythonUDFSuite.scala:105)
...
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually checked.

Closes #38407 from HyukjinKwon/SPARK-34265.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes are proposed in this pull request?

This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow.
The proposed metrics are:

- data sent to Python workers
- data returned from Python workers
- number of output rows

### Why are the changes needed?
This aims at improving monitoring and performance troubleshooting of Python UDFs.
In particular it is intended as an aid to answer performance-related questions such as:
why is the UDF slow?, how much work has been done so far?, etc.

### Does this PR introduce _any_ user-facing change?
SQL metrics are made available in the WEB UI.
See the following examples:

![image1](https://issues.apache.org/jira/secure/attachment/13038693/PandasUDF_ArrowEvalPython_Metrics.png)

### How was this patch tested?

Manually tested + a Python unit test and a Scala unit test have been added.

Example code used for testing:

```
from pyspark.sql.functions import col, pandas_udf
import time

pandas_udf("long")
def test_pandas(col1):
  time.sleep(0.02)
  return col1 * col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1)) from t1").collect()
```

This is used to test with more data pushed to the Python workers:

```
from pyspark.sql.functions import col, pandas_udf
import time

pandas_udf("long")
def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17):
  time.sleep(0.02)
  return col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect()
```

This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows:

```
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
```

This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received):
```
from pyspark.sql.functions import udf

udf
def test_udf(col1, col2):
     return col1 * col1

spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect()
```

Closes apache#33559 from LucaCanali/pythonUDFKeySQLMetrics.

Authored-by: Luca Canali <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…se in Scala side

### What changes were proposed in this pull request?

This PR adds `assume` in the Python test added in apache#33559.

### Why are the changes needed?

In some testing environment, Python does not exist. This is consistent with other tests in this file. Otherwise, it'd fails as below:

```
        java.lang.RuntimeException: Python availability: [true], pyspark availability: [false]
      at org.apache.spark.sql.IntegratedUDFTestUtils$.pythonFunc$lzycompute(IntegratedUDFTestUtils.scala:192)
      at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pythonFunc(IntegratedUDFTestUtils.scala:172)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF$$anon$1.<init>(IntegratedUDFTestUtils.scala:337)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:334)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf(IntegratedUDFTestUtils.scala:334)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.apply(IntegratedUDFTestUtils.scala:359)
      at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$11(PythonUDFSuite.scala:105)
...
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually checked.

Closes apache#38407 from HyukjinKwon/SPARK-34265.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Dec 16, 2023
### What changes were proposed in this pull request?

This PR proposes to support Python metrics in Python Data Source so the metrics are reported same as other Python execution and API.

### Why are the changes needed?

Same metics (apache#33559) should be shown in Python Data Source reading. This is last missing part compared to other Python execution and API.

### Does this PR introduce _any_ user-facing change?

Python Data Source has not been released yet, so no end-user facing change.
It shows some new metrics in UI.

Example:

```python
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class TestDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.options = options
    def partitions(self):
        return [InputPartition(i) for i in range(3)]
    def read(self, partition):
        yield partition.value, str(partition.value)

class TestDataSource(DataSource):
    classmethod
    def name(cls):
        return "test"
    def schema(self):
        return "x INT, y STRING"
    def reader(self, schema) -> "DataSourceReader":
        return TestDataSourceReader(self.options)

spark.dataSource.register(TestDataSource)
sql("CREATE TABLE tblA USING test")
sql("SELECT * from tblA").show()
```

<img width="515" alt="Screenshot 2023-12-15 at 5 54 55 PM" src="https://github.com/apache/spark/assets/6477701/5b98af8c-798e-4b9f-9fde-5549ad8b3c65">

This is same as other Python nodes, UDFs, etc.

### How was this patch tested?

Unittests were added, and manually tested via UI.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44375 from HyukjinKwon/SPARK-46424.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants