Skip to content

Commit 4686c27

Browse files
committed
[SPARK-45073][PS][CONNECT] Replace LastNotNull with Last(ignoreNulls=True)
### What changes were proposed in this pull request? Replace `LastNotNull` with `Last(ignoreNulls=True)` ### Why are the changes needed? #36127 introduced a PS dedicated expression `LastNotNull`, which was actually not needed and can be replaced with built-in `Last` ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42808 from zhengruifeng/del_last_not_none. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 66fb225 commit 4686c27

File tree

5 files changed

+1
-58
lines changed

5 files changed

+1
-58
lines changed

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala

-4
Original file line numberDiff line numberDiff line change
@@ -1905,10 +1905,6 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
19051905
val ignoreNA = extractBoolean(children(2), "ignoreNA")
19061906
Some(EWM(children(0), alpha, ignoreNA))
19071907

1908-
case "last_non_null" if fun.getArgumentsCount == 1 =>
1909-
val children = fun.getArgumentsList.asScala.map(transformExpression)
1910-
Some(LastNonNull(children(0)))
1911-
19121908
case "null_index" if fun.getArgumentsCount == 1 =>
19131909
val children = fun.getArgumentsList.asScala.map(transformExpression)
19141910
Some(NullIndex(children(0)))

python/pyspark/pandas/series.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2257,7 +2257,7 @@ def _interpolate(
22572257
return self._psdf.copy()._psser_for(self._column_label)
22582258

22592259
scol = self.spark.column
2260-
last_non_null = SF.last_non_null(scol)
2260+
last_non_null = F.last(scol, True)
22612261
null_index = SF.null_index(scol)
22622262

22632263
Window = get_window_class()

python/pyspark/pandas/spark/functions.py

-14
Original file line numberDiff line numberDiff line change
@@ -159,20 +159,6 @@ def ewm(col: Column, alpha: float, ignore_na: bool) -> Column:
159159
return Column(sc._jvm.PythonSQLUtils.ewm(col._jc, alpha, ignore_na))
160160

161161

162-
def last_non_null(col: Column) -> Column:
163-
if is_remote():
164-
from pyspark.sql.connect.functions import _invoke_function_over_columns
165-
166-
return _invoke_function_over_columns( # type: ignore[return-value]
167-
"last_non_null",
168-
col, # type: ignore[arg-type]
169-
)
170-
171-
else:
172-
sc = SparkContext._active_spark_context
173-
return Column(sc._jvm.PythonSQLUtils.lastNonNull(col._jc))
174-
175-
176162
def null_index(col: Column) -> Column:
177163
if is_remote():
178164
from pyspark.sql.connect.functions import _invoke_function_over_columns

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala

-37
Original file line numberDiff line numberDiff line change
@@ -1152,43 +1152,6 @@ case class EWM(input: Expression, alpha: Double, ignoreNA: Boolean)
11521152
}
11531153

11541154

1155-
/**
1156-
* Keep the last non-null value seen if any. This expression is dedicated only for
1157-
* Pandas API on Spark.
1158-
* For example,
1159-
* Input: null, 1, 2, 3, null, 4, 5, null
1160-
* Output: null, 1, 2, 3, 3, 4, 5, 5
1161-
*/
1162-
case class LastNonNull(input: Expression)
1163-
extends AggregateWindowFunction with UnaryLike[Expression] {
1164-
1165-
override def dataType: DataType = input.dataType
1166-
1167-
private lazy val last = AttributeReference("last", dataType, nullable = true)()
1168-
1169-
override def aggBufferAttributes: Seq[AttributeReference] = last :: Nil
1170-
1171-
override lazy val initialValues: Seq[Expression] = Seq(Literal.create(null, dataType))
1172-
1173-
override lazy val updateExpressions: Seq[Expression] = {
1174-
Seq(
1175-
/* last = */ If(IsNull(input), last, input)
1176-
)
1177-
}
1178-
1179-
override lazy val evaluateExpression: Expression = last
1180-
1181-
override def prettyName: String = "last_non_null"
1182-
1183-
override def sql: String = s"$prettyName(${input.sql})"
1184-
1185-
override def child: Expression = input
1186-
1187-
override protected def withNewChildInternal(newChild: Expression): LastNonNull =
1188-
copy(input = newChild)
1189-
}
1190-
1191-
11921155
/**
11931156
* Return the indices for consecutive null values, for non-null values, it returns 0.
11941157
* This expression is dedicated only for Pandas API on Spark.

sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala

-2
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,6 @@ private[sql] object PythonSQLUtils extends Logging {
145145
def ewm(e: Column, alpha: Double, ignoreNA: Boolean): Column =
146146
Column(EWM(e.expr, alpha, ignoreNA))
147147

148-
def lastNonNull(e: Column): Column = Column(LastNonNull(e.expr))
149-
150148
def nullIndex(e: Column): Column = Column(NullIndex(e.expr))
151149

152150
def makeInterval(unit: String, e: Column): Column = {

0 commit comments

Comments
 (0)