Skip to content

Commit 32d170f

Browse files
panbingkunzhengruifeng
authored andcommitted
[SPARK-44331][CONNECT][PYTHON] Add bitmap functions to Scala and Python
### What changes were proposed in this pull request? Add following functions: - bitmap_bucket_number - bitmap_bit_position - bitmap_construct_agg - bitmap_count - bitmap_or_agg to: - Scala API - Python API - Spark Connect Scala Client - Spark Connect Python Client ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? Yes, new functions. ### How was this patch tested? - Add New UT. Closes #41902 from panbingkun/SPARK-44331. Authored-by: panbingkun <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 5e31f4d commit 32d170f

24 files changed

+465
-3
lines changed

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala

+45
Original file line numberDiff line numberDiff line change
@@ -3575,6 +3575,51 @@ object functions {
35753575
*/
35763576
def random(): Column = Column.fn("random")
35773577

3578+
/**
3579+
* Returns the bit position for the given input column.
3580+
*
3581+
* @group misc_funcs
3582+
* @since 3.5.0
3583+
*/
3584+
def bitmap_bit_position(col: Column): Column =
3585+
Column.fn("bitmap_bit_position", col)
3586+
3587+
/**
3588+
* Returns the bucket number for the given input column.
3589+
*
3590+
* @group misc_funcs
3591+
* @since 3.5.0
3592+
*/
3593+
def bitmap_bucket_number(col: Column): Column =
3594+
Column.fn("bitmap_bucket_number", col)
3595+
3596+
/**
3597+
* Returns a bitmap with the positions of the bits set from all the values from the input
3598+
* column. The input column will most likely be bitmap_bit_position().
3599+
*
3600+
* @group misc_funcs
3601+
* @since 3.5.0
3602+
*/
3603+
def bitmap_construct_agg(col: Column): Column =
3604+
Column.fn("bitmap_construct_agg", col)
3605+
3606+
/**
3607+
* Returns the number of set bits in the input bitmap.
3608+
*
3609+
* @group misc_funcs
3610+
* @since 3.5.0
3611+
*/
3612+
def bitmap_count(col: Column): Column = Column.fn("bitmap_count", col)
3613+
3614+
/**
3615+
* Returns a bitmap that is the bitwise OR of all of the bitmaps from the input column. The
3616+
* input column should be bitmaps created from bitmap_construct_agg().
3617+
*
3618+
* @group misc_funcs
3619+
* @since 3.5.0
3620+
*/
3621+
def bitmap_or_agg(col: Column): Column = Column.fn("bitmap_or_agg", col)
3622+
35783623
//////////////////////////////////////////////////////////////////////////////////////////////
35793624
// String functions
35803625
//////////////////////////////////////////////////////////////////////////////////////////////

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

+20
Original file line numberDiff line numberDiff line change
@@ -1979,6 +1979,26 @@ class PlanGenerationTestSuite
19791979
fn.row_number().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
19801980
}
19811981

1982+
functionTest("bitmap_bucket_number") {
1983+
fn.bitmap_bit_position(fn.col("id"))
1984+
}
1985+
1986+
functionTest("bitmap_bit_position") {
1987+
fn.bitmap_bit_position(fn.col("id"))
1988+
}
1989+
1990+
functionTest("bitmap_construct_agg") {
1991+
fn.bitmap_construct_agg(fn.col("id"))
1992+
}
1993+
1994+
test("function bitmap_count") {
1995+
binary.select(fn.bitmap_count(fn.col("bytes")))
1996+
}
1997+
1998+
test("function bitmap_or_agg") {
1999+
binary.select(fn.bitmap_or_agg(fn.col("bytes")))
2000+
}
2001+
19822002
private def temporalFunctionTest(name: String)(f: => Column): Unit = {
19832003
test("function " + name) {
19842004
temporals.select(f)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.BitmapExpressionUtils, LongType, bitmapBitPosition, id#0L, LongType, true, false, true) AS bitmap_bit_position(id)#0L]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.BitmapExpressionUtils, LongType, bitmapBitPosition, id#0L, LongType, true, false, true) AS bitmap_bit_position(id)#0L]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [bitmap_construct_agg(id#0L, 0, 0) AS bitmap_construct_agg(id)#0]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.BitmapExpressionUtils, LongType, bitmapCount, bytes#0, BinaryType, true, false, true) AS bitmap_count(bytes)#0L]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [bitmap_or_agg(bytes#0, 0, 0) AS bitmap_or_agg(bytes)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "bitmap_bit_position",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "id"
20+
}
21+
}]
22+
}
23+
}]
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "bitmap_bit_position",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "id"
20+
}
21+
}]
22+
}
23+
}]
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "bitmap_construct_agg",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "id"
20+
}
21+
}]
22+
}
23+
}]
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,bytes:binary\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "bitmap_count",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "bytes"
20+
}
21+
}]
22+
}
23+
}]
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,bytes:binary\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "bitmap_or_agg",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "bytes"
20+
}
21+
}]
22+
}
23+
}]
24+
}
25+
}

python/docs/source/reference/pyspark.sql/functions.rst

+5
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,11 @@ Misc Functions
477477

478478
aes_decrypt
479479
aes_encrypt
480+
bitmap_bit_position
481+
bitmap_bucket_number
482+
bitmap_construct_agg
483+
bitmap_count
484+
bitmap_or_agg
480485
current_catalog
481486
current_database
482487
current_schema

python/pyspark/sql/connect/functions.py

+35
Original file line numberDiff line numberDiff line change
@@ -3788,6 +3788,41 @@ def random(
37883788
random.__doc__ = pysparkfuncs.random.__doc__
37893789

37903790

3791+
def bitmap_bit_position(col: "ColumnOrName") -> Column:
3792+
return _invoke_function_over_columns("bitmap_bit_position", col)
3793+
3794+
3795+
bitmap_bit_position.__doc__ = pysparkfuncs.bitmap_bit_position.__doc__
3796+
3797+
3798+
def bitmap_bucket_number(col: "ColumnOrName") -> Column:
3799+
return _invoke_function_over_columns("bitmap_bucket_number", col)
3800+
3801+
3802+
bitmap_bucket_number.__doc__ = pysparkfuncs.bitmap_bucket_number.__doc__
3803+
3804+
3805+
def bitmap_construct_agg(col: "ColumnOrName") -> Column:
3806+
return _invoke_function_over_columns("bitmap_construct_agg", col)
3807+
3808+
3809+
bitmap_construct_agg.__doc__ = pysparkfuncs.bitmap_construct_agg.__doc__
3810+
3811+
3812+
def bitmap_count(col: "ColumnOrName") -> Column:
3813+
return _invoke_function_over_columns("bitmap_count", col)
3814+
3815+
3816+
bitmap_count.__doc__ = pysparkfuncs.bitmap_count.__doc__
3817+
3818+
3819+
def bitmap_or_agg(col: "ColumnOrName") -> Column:
3820+
return _invoke_function_over_columns("bitmap_or_agg", col)
3821+
3822+
3823+
bitmap_or_agg.__doc__ = pysparkfuncs.bitmap_or_agg.__doc__
3824+
3825+
37913826
# User Defined Function
37923827

37933828

0 commit comments

Comments
 (0)