Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.FilterV2;

/**
* A mix-in interface for {@link Table} delete support. Data sources can implement this
Expand All @@ -41,5 +41,5 @@ public interface SupportsDelete {
* @param filters filter expressions, used to select rows to delete when all expressions match
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Filter[] filters);
void deleteWhere(FilterV2[] filters);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that a good way to switch between v1 filters and v2 filters is to add both methods and convert from v2 to v1 in a default implementation of the v2 version. That's an easy way for people to update to the new filter API.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.connector.read;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.FilterV2;

/**
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
Expand All @@ -33,10 +33,10 @@ public interface SupportsPushDownFilters extends ScanBuilder {
* Rows should be returned from the data source if and only if all of the filters match. That is,
* filters must be interpreted as ANDed together.
*/
Filter[] pushFilters(Filter[] filters);
FilterV2[] pushFilters(FilterV2[] filters);

/**
* Returns the filters that are pushed to the data source via {@link #pushFilters(Filter[])}.
* Returns the filters that are pushed to the data source via {@link #pushFilters(FilterV2[])}.
*
* There are 3 kinds of filters:
* 1. pushable filters which don't need to be evaluated again after scanning.
Expand All @@ -45,8 +45,8 @@ public interface SupportsPushDownFilters extends ScanBuilder {
* 3. non-pushable filters.
* Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
*
* It's possible that there is no filters in the query and {@link #pushFilters(Filter[])}
* It's possible that there is no filters in the query and {@link #pushFilters(FilterV2[])}
* is never called, empty array should be returned for this case.
*/
Filter[] pushedFilters();
FilterV2[] pushedFilters();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql.connector.write;

import org.apache.spark.sql.sources.AlwaysTrue$;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.AlwaysTrue$;
import org.apache.spark.sql.sources.v2.FilterV2;

/**
* Write builder trait for tables that support overwrite by filter.
Expand All @@ -36,10 +36,10 @@ public interface SupportsOverwrite extends WriteBuilder, SupportsTruncate {
* @param filters filters used to match data to overwrite
* @return this write builder for method chaining
*/
WriteBuilder overwrite(Filter[] filters);
WriteBuilder overwrite(FilterV2[] filters);

@Override
default WriteBuilder truncate() {
return overwrite(new Filter[] { AlwaysTrue$.MODULE$ });
return overwrite(new FilterV2[] { AlwaysTrue$.MODULE$ });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.NamedReference;

@Experimental
public abstract class FilterV2 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The package is already v2. Do we need add v2?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use Filter or should we use Predicate for expressions that evaluate to a boolean?

/**
* Returns list of columns that are referenced by this filter.
*/
public abstract NamedReference[] references();

protected NamedReference[] findReferences(Object valve) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valve -> value?

if (valve instanceof FilterV2) {
return ((FilterV2) valve).references();
} else {
return new NamedReference[0];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.spark.sql.sources

import org.apache.spark.annotation.{Evolving, Stable}
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.sources.v2.FilterV2

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines all the filters that we can push down to the data sources.
Expand All @@ -31,7 +34,7 @@ import org.apache.spark.annotation.{Evolving, Stable}
@Stable
abstract class Filter {
/**
* List of columns that are referenced by this filter.
* List of top-level columns that are referenced by this filter.
* @since 2.1.0
*/
def references: Array[String]
Expand All @@ -40,6 +43,17 @@ abstract class Filter {
case f: Filter => f.references
case _ => Array.empty
}

private[sql] def toV2: FilterV2

private[sql] def attToRef(attribute: String): NamedReference = {
FieldReference(Seq(attribute))
}

private[sql] def toV2Value(value: Any): Any = value match {
case f: Filter => f.toV2
case _ => value
}
}

/**
Expand All @@ -51,6 +65,8 @@ abstract class Filter {
@Stable
case class EqualTo(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute) ++ findReferences(value)

override private[sql] def toV2 = v2.EqualTo(attToRef(attribute), toV2Value(value))
}

/**
Expand All @@ -63,6 +79,8 @@ case class EqualTo(attribute: String, value: Any) extends Filter {
@Stable
case class EqualNullSafe(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute) ++ findReferences(value)

override private[sql] def toV2 = v2.EqualNullSafe(attToRef(attribute), toV2Value(value))
}

/**
Expand All @@ -74,6 +92,8 @@ case class EqualNullSafe(attribute: String, value: Any) extends Filter {
@Stable
case class GreaterThan(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute) ++ findReferences(value)

override private[sql] def toV2 = v2.GreaterThan(attToRef(attribute), toV2Value(value))
}

/**
Expand All @@ -85,6 +105,9 @@ case class GreaterThan(attribute: String, value: Any) extends Filter {
@Stable
case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute) ++ findReferences(value)

override private[sql] def toV2 =
v2.GreaterThanOrEqual(attToRef(attribute), toV2Value(value))
}

/**
Expand All @@ -96,6 +119,8 @@ case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
@Stable
case class LessThan(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute) ++ findReferences(value)

override private[sql] def toV2 = v2.LessThan(attToRef(attribute), toV2Value(value))
}

/**
Expand All @@ -107,6 +132,9 @@ case class LessThan(attribute: String, value: Any) extends Filter {
@Stable
case class LessThanOrEqual(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute) ++ findReferences(value)

override private[sql] def toV2 =
v2.LessThanOrEqual(attToRef(attribute), toV2Value(value))
}

/**
Expand Down Expand Up @@ -134,6 +162,8 @@ case class In(attribute: String, values: Array[Any]) extends Filter {
}

override def references: Array[String] = Array(attribute) ++ values.flatMap(findReferences)

override private[sql] def toV2 = v2.In(attToRef(attribute), values.map(toV2Value))
}

/**
Expand All @@ -144,6 +174,8 @@ case class In(attribute: String, values: Array[Any]) extends Filter {
@Stable
case class IsNull(attribute: String) extends Filter {
override def references: Array[String] = Array(attribute)

override private[sql] def toV2 = v2.IsNull(attToRef(attribute))
}

/**
Expand All @@ -154,6 +186,8 @@ case class IsNull(attribute: String) extends Filter {
@Stable
case class IsNotNull(attribute: String) extends Filter {
override def references: Array[String] = Array(attribute)

override private[sql] def toV2 = v2.IsNotNull(attToRef(attribute))
}

/**
Expand All @@ -164,6 +198,8 @@ case class IsNotNull(attribute: String) extends Filter {
@Stable
case class And(left: Filter, right: Filter) extends Filter {
override def references: Array[String] = left.references ++ right.references

override private[sql] def toV2 = v2.And(left.toV2, right.toV2)
}

/**
Expand All @@ -174,6 +210,8 @@ case class And(left: Filter, right: Filter) extends Filter {
@Stable
case class Or(left: Filter, right: Filter) extends Filter {
override def references: Array[String] = left.references ++ right.references

override private[sql] def toV2 = v2.Or(left.toV2, right.toV2)
}

/**
Expand All @@ -184,6 +222,8 @@ case class Or(left: Filter, right: Filter) extends Filter {
@Stable
case class Not(child: Filter) extends Filter {
override def references: Array[String] = child.references

override private[sql] def toV2 = v2.Not(child.toV2)
}

/**
Expand All @@ -195,6 +235,8 @@ case class Not(child: Filter) extends Filter {
@Stable
case class StringStartsWith(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)

override private[sql] def toV2 = v2.StringStartsWith(attToRef(attribute), value)
}

/**
Expand All @@ -206,6 +248,8 @@ case class StringStartsWith(attribute: String, value: String) extends Filter {
@Stable
case class StringEndsWith(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)

override private[sql] def toV2 = v2.StringEndsWith(attToRef(attribute), value)
}

/**
Expand All @@ -217,6 +261,8 @@ case class StringEndsWith(attribute: String, value: String) extends Filter {
@Stable
case class StringContains(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)

override private[sql] def toV2 = v2.StringContains(attToRef(attribute), value)
}

/**
Expand All @@ -225,6 +271,8 @@ case class StringContains(attribute: String, value: String) extends Filter {
@Evolving
case class AlwaysTrue() extends Filter {
override def references: Array[String] = Array.empty

override private[sql] def toV2 = v2.AlwaysTrue
}

@Evolving
Expand All @@ -237,6 +285,8 @@ object AlwaysTrue extends AlwaysTrue {
@Evolving
case class AlwaysFalse() extends Filter {
override def references: Array[String] = Array.empty

override private[sql] def toV2 = v2.AlwaysFalse
}

@Evolving
Expand Down
Loading