Skip to content

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented Dec 18, 2023

Which issue does this PR close?

Closes #8568

Rationale for this change

This PR is a step towards #8045:

  1. I want to make it easier to extend DataFusion's function packages (so that we can support many more different implementatins)
  2. Splitting out functions into packages (e.g. datafusion-functions) needs an API that is easier to implement
  3. We need a place to put more advanced ScalarUDF features (like Specialized / Pre-compiled / Prepared ScalarUDFs #8051 and add examples and description to scalar/aggregate functions?  #8366) and the current lower level API is very hard to extend in a backwards compatible way

What changes are included in this PR?

  1. introducing a trait based API for scalar functions (ScalarUDFImpl -- better names welcomed)
  2. Add example of how you can use the trait based APIs for more advanced implementations (advanced_udf.rs)

If this PR is accepted, I plan to file tickets to track

  • Clean up internal implementation of ScalarUDF (e.g. make it use the trait based API rather than the current function pointers)
  • Add a similar trait for AggregateUDF and WindowUDF, for the same reasons

Are these changes tested?

Yes, both new tests as well as updated existing tests

Are there any user-facing changes?

There is a new way to define ScalarUDFs and additional documentation.

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules labels Dec 18, 2023
@alamb alamb force-pushed the alamb/better_scalar_api branch from bdd8ca1 to 7e7dae7 Compare December 18, 2023 18:32
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
use std::sync::Arc;

/// This example shows how to use the full ScalarUDFImpl API to implement a user
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to create an example that shows how to make a more advanced UDF that special cases constant values.

This also shows how to create a ScalarUDF using a trait (rather than free functions and closures)

&return_type,
&fun,
));
struct TestScalarUDF {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows an example of the difference in trait based vs low level ScalarValue::new API that I propose to deprecate

While the trait requires more lines, I think it is much easier to implement as it is simply a standard trait implementation which I believe is far more common than Arc'd closures

///
/// See [`ScalarUDFImpl`] for a more convenient way to create a
/// `ScalarUDF` using trait objects
#[deprecated(since = "34.0.0", note = "please implement ScalarUDFImpl instead")]
Copy link
Contributor Author

@alamb alamb Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this low level API is quite akward to use and very hard to extend in backwards compatible ways. The trait is easer to use and easier to extend.

Thus I propose marking this API as deprecated (note most of the examples in codebase use create_udf rather than ScalarUDF:new() directly) so I think the impact will be limited

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that current low-level API looks awkward to use. Ideally a trait defining what a UDF should implement should be better solution.

where
F: ScalarUDFImpl + Send + Sync + 'static,
{
// TODO change the internal implementation to use the trait object
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to improve the internal representation as a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#8713 is the follow on PR

}
}

/// Trait for implementing [`ScalarUDF`].
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the proposed new trait. I think we can use this trait to add things such as "pre-compiling" arguments #8051 and adding better examples / documentation add examples and description to scalar/aggregate functions #8366.

cc @universalmind303 for your comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks much more intuitive than the current implementation. I actually just commented on an open issue about the api before reviewing this & my suggestion was nearly identical!

#8568 (comment)

@alamb alamb marked this pull request as ready for review December 18, 2023 18:41
@alamb
Copy link
Contributor Author

alamb commented Dec 18, 2023

cc @2010YOUY01, @thinkharderdev, @viirya and @andygrove -- in case you have comments about the proposed way of implementing ScalarUDF.

This PR doesn't make any API changes, but it does deprecate ScalarUDF::new()

/// Create a new `ScalarUDF` from a `[ScalarUDFImpl]` trait object
///
/// Note this is the same as using the `From` impl (`ScalarUDF::from`)
pub fn new_from_trait<F>(fun: F) -> ScalarUDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_from_impl?

/// can be used to implement any function.
///
/// See [`advanced_udf.rs`] for a full example with implementation. See
/// [`ScalarUDF`] for details on a simpler API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simpler API, do you mean create_udf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was trying to avoid replicating the same content (e.g. with links to create_udf, and simple example) all over the place (and just have it linked on ScalarUDF). I have tried to make this clearer


The challenge however is that DataFusion doesn't know about this function. We need to register it with DataFusion so that it can be used in the context of a query.

### Registering a Scalar UDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add advanced example advanced_udf.rs link to this document and also document ScalarUDFImpl there too? Maybe a follow up.

Comment on lines 274 to 276
/// # Performance
/// Many functions can be optimized for the case when one or more of their
/// arguments are constant values [`ColumnarValue::Scalar`].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this performance section, does it mean the implementations should optimize the case or DataFusion will optimize the case? Looks a bit unclear to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that the implementations should optimize the case -- I have tried to clarify the comments in this regard.

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the (as always) insightful review @viirya

/// can be used to implement any function.
///
/// See [`advanced_udf.rs`] for a full example with implementation. See
/// [`ScalarUDF`] for details on a simpler API.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was trying to avoid replicating the same content (e.g. with links to create_udf, and simple example) all over the place (and just have it linked on ScalarUDF). I have tried to make this clearer

Comment on lines 274 to 276
/// # Performance
/// Many functions can be optimized for the case when one or more of their
/// arguments are constant values [`ColumnarValue::Scalar`].
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that the implementations should optimize the case -- I have tried to clarify the comments in this regard.

Comment on lines 137 to 139
// calculate the result for every row. The `unary` very
// fast, "vectorized" code and handles things like null
// values for us.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I read it correctly:

Suggested change
// calculate the result for every row. The `unary` very
// fast, "vectorized" code and handles things like null
// values for us.
// calculate the result for every row. The `unary` is very
// fast "vectorized" code and handles things like null
// values for us.

}

/// Return the type of the function given its input types
/// The datatype this function returns given the input argument input types
Copy link
Member

@viirya viirya Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe?

Suggested change
/// The datatype this function returns given the input argument input types
/// The datatype this function returns given the input argument types

);
```

[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html
[`ScalarUDF`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason this lower casing is done by prettier so I can't impmement this suggestion without causing CI to fail 😬

andrewlamb@Andrews-MacBook-Pro:~/Software/arrow-datafusion$ git diff
diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md
index c51e4de32..1d2cc0a12 100644
--- a/docs/source/library-user-guide/adding-udfs.md
+++ b/docs/source/library-user-guide/adding-udfs.md
@@ -95,7 +95,7 @@ let udf = create_udf(
 );

-[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html
+[`ScalarUDF`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html
 [`create_udf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udf.html
 [`make_scalar_function`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/functions/fn.make_scalar_function.html
 [`advanced_udf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udf.rs
andrewlamb@Andrews-MacBook-Pro:~/Software/arrow-datafusion$ npx [email protected] --check  '{datafusion,datafusion-cli,datafusion-examples,dev,docs}/**/*.md'     '!datafusion/CHANGELOG.md'     README.md     CONTRIBUTING.md
Checking formatting...
[warn] docs/source/library-user-guide/adding-udfs.md
[warn] Code style issues found in the above file. Forgot to run Prettier?
andrewlamb@Andrews-MacBook-Pro:~/Software/arrow-datafusion$ git reset --hard
HEAD is now at 3ce1802df Improve docs for aliases
andrewlamb@Andrews-MacBook-Pro:~/Software/arrow-datafusion$ npx [email protected] --check  '{datafusion,datafusion-cli,datafusion-examples,dev,docs}/**/*.md'     '!datafusion/CHANGELOG.md'     README.md     CONTRIBUTING.md
Checking formatting...
All matched files use Prettier code style!
andrewlamb@Andrews-MacBook-Pro:~/Software/arrow-datafusion$

@alamb
Copy link
Contributor Author

alamb commented Dec 26, 2023

Update here is I plan to merge this tomorrow unless anyone would like more time to review

@thinkharderdev
Copy link
Contributor

Nice! It would be very useful to be able to handle serde as well for custom implementations (perhaps in a different PR?). I think this could fit relatively easily into LogicalExtensionCodec

@alamb
Copy link
Contributor Author

alamb commented Dec 28, 2023

I have several follow on tasks I will do like shortly:

@alamb alamb merged commit b2cbc78 into apache:main Dec 28, 2023
@alamb alamb deleted the alamb/better_scalar_api branch December 28, 2023 20:07
@alamb
Copy link
Contributor Author

alamb commented Jan 1, 2024

Nice! It would be very useful to be able to handle serde as well for custom implementations (perhaps in a different PR?). I think this could fit relatively easily into LogicalExtensionCodec

Filed #8706

/// This struct contains the information DataFusion needs to plan and invoke
/// functions such name, type signature, return type, and actual implementation.
///
/// 1. For simple (less performant) use cases, use [`create_udf`] and [`simple_udf.rs`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less performant

Hi, is there anyone who would like to explain a bit about why create_udf() is less performant than the UDFs created by ScalarUDFImpl?

Copy link
Contributor Author

@alamb alamb Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that create_udf() always converts its arguments to ArrayRef and thus you can't implement special cases for constant values (ScalarValue) -- instead the scalar value is always converted into an array.

Update: this does not seem to be correct. I will do some more investigation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #9384 to clarify docs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using create_udf create an extra indirection. Under the hood it's creating a

pub struct SimpleScalarUDF {
    name: String,
    signature: Signature,
    return_type: DataType,
    fun: ScalarFunctionImplementation,
}

impl ScalarUDFImpl for SimpleScalarUDF {
    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
        (self.fun)(args)
    }
}

so it adds an extra call for every batch processed through the UDF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense 👍 -- I don't think the overhead of a single function call is worth calling out in the docs however (I think it is more confusing than helpfl), though please let me know if you disagree on #9384

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I agree it's not meaningful enough to call out tin docs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

logical-expr Logical plan and expressions optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement trait based API for defining ScalarUDFs

5 participants