Skip to content

Commit ea6bacf

Browse files
panbingkunzhengruifeng
authored andcommitted
[SPARK-44329][CONNECT][PYTHON] Add hll_sketch_agg, hll_union_agg, to_varchar, try_aes_decrypt to Scala and Python
### What changes were proposed in this pull request? Add following functions: - hll_sketch_agg - hll_union_agg - to_varchar - try_aes_decrypt to: - Scala API - Python API - Spark Connect Scala Client - Spark Connect Python Client ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? Yes, new functions. ### How was this patch tested? - Add New UT. Closes #41907 from panbingkun/SPARK-44329. Authored-by: panbingkun <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 32d170f commit ea6bacf

File tree

54 files changed

+1058
-21
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1058
-21
lines changed

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala

+115
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,16 @@ object functions {
607607
def grouping_id(colName: String, colNames: String*): Column =
608608
grouping_id((Seq(colName) ++ colNames).map(n => Column(n)): _*)
609609

610+
/**
611+
* Aggregate function: returns the updatable binary representation of the Datasketches HllSketch
612+
* configured with lgConfigK arg.
613+
*
614+
* @group agg_funcs
615+
* @since 3.5.0
616+
*/
617+
def hll_sketch_agg(e: Column, lgConfigK: Column): Column =
618+
Column.fn("hll_sketch_agg", e, lgConfigK)
619+
610620
/**
611621
* Aggregate function: returns the updatable binary representation of the Datasketches HllSketch
612622
* configured with lgConfigK arg.
@@ -646,6 +656,18 @@ object functions {
646656
*/
647657
def hll_sketch_agg(columnName: String): Column = hll_sketch_agg(Column(columnName))
648658

659+
/**
660+
* Aggregate function: returns the updatable binary representation of the Datasketches
661+
* HllSketch, generated by merging previously created Datasketches HllSketch instances via a
662+
* Datasketches Union instance. Throws an exception if sketches have different lgConfigK values
663+
* and allowDifferentLgConfigK is set to false.
664+
*
665+
* @group agg_funcs
666+
* @since 3.5.0
667+
*/
668+
def hll_union_agg(e: Column, allowDifferentLgConfigK: Column): Column =
669+
Column.fn("hll_union_agg", e, allowDifferentLgConfigK)
670+
649671
/**
650672
* Aggregate function: returns the updatable binary representation of the Datasketches
651673
* HllSketch, generated by merging previously created Datasketches HllSketch instances via a
@@ -3491,6 +3513,72 @@ object functions {
34913513
def aes_decrypt(input: Column, key: Column): Column =
34923514
Column.fn("aes_encrypt", input, key)
34933515

3516+
/**
3517+
* This is a special version of `aes_decrypt` that performs the same operation, but returns a
3518+
* NULL value instead of raising an error if the decryption cannot be performed.
3519+
*
3520+
* @param input
3521+
* The binary value to decrypt.
3522+
* @param key
3523+
* The passphrase to use to decrypt the data.
3524+
* @param mode
3525+
* Specifies which block cipher mode should be used to decrypt messages. Valid modes: ECB,
3526+
* GCM, CBC.
3527+
* @param padding
3528+
* Specifies how to pad messages whose length is not a multiple of the block size. Valid
3529+
* values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
3530+
* for CBC.
3531+
* @param aad
3532+
* Optional additional authenticated data. Only supported for GCM mode. This can be any
3533+
* free-form input and must be provided for both encryption and decryption.
3534+
*
3535+
* @group misc_funcs
3536+
* @since 3.5.0
3537+
*/
3538+
def try_aes_decrypt(
3539+
input: Column,
3540+
key: Column,
3541+
mode: Column,
3542+
padding: Column,
3543+
aad: Column): Column =
3544+
Column.fn("try_aes_decrypt", input, key, mode, padding, aad)
3545+
3546+
/**
3547+
* Returns a decrypted value of `input`.
3548+
*
3549+
* @see
3550+
* `org.apache.spark.sql.functions.try_aes_decrypt(Column, Column, Column, Column, Column)`
3551+
*
3552+
* @group misc_funcs
3553+
* @since 3.5.0
3554+
*/
3555+
def try_aes_decrypt(input: Column, key: Column, mode: Column, padding: Column): Column =
3556+
Column.fn("try_aes_decrypt", input, key, mode, padding)
3557+
3558+
/**
3559+
* Returns a decrypted value of `input`.
3560+
*
3561+
* @see
3562+
* `org.apache.spark.sql.functions.try_aes_decrypt(Column, Column, Column, Column, Column)`
3563+
*
3564+
* @group misc_funcs
3565+
* @since 3.5.0
3566+
*/
3567+
def try_aes_decrypt(input: Column, key: Column, mode: Column): Column =
3568+
Column.fn("try_aes_decrypt", input, key, mode)
3569+
3570+
/**
3571+
* Returns a decrypted value of `input`.
3572+
*
3573+
* @see
3574+
* `org.apache.spark.sql.functions.try_aes_decrypt(Column, Column, Column, Column, Column)`
3575+
*
3576+
* @group misc_funcs
3577+
* @since 3.5.0
3578+
*/
3579+
def try_aes_decrypt(input: Column, key: Column): Column =
3580+
Column.fn("try_aes_decrypt", input, key)
3581+
34943582
/**
34953583
* Returns a sha1 hash value as a hex string of the `col`.
34963584
*
@@ -4192,6 +4280,33 @@ object functions {
41924280
*/
41934281
def to_char(e: Column, format: Column): Column = Column.fn("to_char", e, format)
41944282

4283+
/**
4284+
* Convert `e` to a string based on the `format`. Throws an exception if the conversion fails.
4285+
*
4286+
* @param e
4287+
* A column of number to be converted
4288+
* @param format
4289+
* The format can consist of the following characters, case insensitive: <ul> <li> '0' or '9':
4290+
* Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the format string
4291+
* matches a sequence of digits in the input value, generating a result string of the same
4292+
* length as the corresponding sequence in the format string. The result string is left-padded
4293+
* with zeros if the 0/9 sequence comprises more digits than the matching part of the decimal
4294+
* value, starts with 0, and is before the decimal point. Otherwise, it is padded with
4295+
* spaces.</li> <li>'.' or 'D': Specifies the position of the decimal point (optional, only
4296+
* allowed once).</li> <li>',' or 'G': Specifies the position of the grouping (thousands)
4297+
* separator (,). There must be a 0 or 9 to the left and right of each grouping
4298+
* separator.</li> <li>'$': Specifies the location of the $ currency sign. This character may
4299+
* only be specified once.</li> <li>'S' or 'MI': Specifies the position of a '-' or '+' sign
4300+
* (optional, only allowed once at the beginning or end of the format string). Note that 'S'
4301+
* prints '+' for positive values but 'MI' prints a space.</li> <li>'PR': Only allowed at the
4302+
* end of the format string; specifies that the result string will be wrapped by angle
4303+
* brackets if the input value is negative.</li> </ul>
4304+
*
4305+
* @group string_funcs
4306+
* @since 3.5.0
4307+
*/
4308+
def to_varchar(e: Column, format: Column): Column = Column.fn("to_varchar", e, format)
4309+
41954310
/**
41964311
* Convert string 'e' to a number based on the string format 'format'. Throws an exception if
41974312
* the conversion fails.

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

+60
Original file line numberDiff line numberDiff line change
@@ -2550,6 +2550,10 @@ class PlanGenerationTestSuite
25502550
fn.to_char(fn.col("b"), lit("$99.99"))
25512551
}
25522552

2553+
functionTest("to_varchar") {
2554+
fn.to_varchar(fn.col("b"), lit("$99.99"))
2555+
}
2556+
25532557
functionTest("to_number") {
25542558
fn.to_number(fn.col("g"), lit("$99.99"))
25552559
}
@@ -2821,6 +2825,22 @@ class PlanGenerationTestSuite
28212825
fn.aes_decrypt(fn.col("g"), fn.col("g"))
28222826
}
28232827

2828+
functionTest("try_aes_decrypt with mode padding aad") {
2829+
fn.try_aes_decrypt(fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"))
2830+
}
2831+
2832+
functionTest("try_aes_decrypt with mode padding") {
2833+
fn.try_aes_decrypt(fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"))
2834+
}
2835+
2836+
functionTest("try_aes_decrypt with mode") {
2837+
fn.try_aes_decrypt(fn.col("g"), fn.col("g"), fn.col("g"))
2838+
}
2839+
2840+
functionTest("try_aes_decrypt") {
2841+
fn.try_aes_decrypt(fn.col("g"), fn.col("g"))
2842+
}
2843+
28242844
functionTest("sha") {
28252845
fn.sha(fn.col("g"))
28262846
}
@@ -2853,6 +2873,46 @@ class PlanGenerationTestSuite
28532873
fn.random(lit(1))
28542874
}
28552875

2876+
test("hll_sketch_agg with column lgConfigK") {
2877+
binary.select(fn.hll_sketch_agg(fn.col("bytes"), lit(0)))
2878+
}
2879+
2880+
test("hll_sketch_agg with column lgConfigK_int") {
2881+
binary.select(fn.hll_sketch_agg(fn.col("bytes"), 0))
2882+
}
2883+
2884+
test("hll_sketch_agg with columnName lgConfigK_int") {
2885+
binary.select(fn.hll_sketch_agg("bytes", 0))
2886+
}
2887+
2888+
test("hll_sketch_agg") {
2889+
binary.select(fn.hll_sketch_agg(fn.col("bytes")))
2890+
}
2891+
2892+
test("hll_sketch_agg with columnName") {
2893+
binary.select(fn.hll_sketch_agg("bytes"))
2894+
}
2895+
2896+
test("hll_union_agg with column allowDifferentLgConfigK") {
2897+
binary.select(fn.hll_union_agg(fn.col("bytes"), lit(false)))
2898+
}
2899+
2900+
test("hll_union_agg with column allowDifferentLgConfigK_boolean") {
2901+
binary.select(fn.hll_union_agg(fn.col("bytes"), false))
2902+
}
2903+
2904+
test("hll_union_agg with columnName allowDifferentLgConfigK_boolean") {
2905+
binary.select(fn.hll_union_agg("bytes", false))
2906+
}
2907+
2908+
test("hll_union_agg") {
2909+
binary.select(fn.hll_union_agg(fn.col("bytes")))
2910+
}
2911+
2912+
test("hll_union_agg with columnName") {
2913+
binary.select(fn.hll_union_agg("bytes"))
2914+
}
2915+
28562916
test("groupby agg") {
28572917
simple
28582918
.groupBy(Column("id"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [to_char(cast(b#0 as decimal(30,15)), $99.99) AS to_char(b, $99.99)#0]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [tryeval(staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesDecrypt, cast(g#0 as binary), cast(g#0 as binary), GCM, DEFAULT, cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, true, true, true)) AS try_aes_decrypt(g, g, GCM, DEFAULT, )#0]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [tryeval(staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesDecrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, DEFAULT, cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, true, true, true)) AS try_aes_decrypt(g, g, g, DEFAULT, )#0]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [tryeval(staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesDecrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, true, true, true)) AS try_aes_decrypt(g, g, g, g, )#0]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Project [tryeval(staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesDecrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, cast(g#0 as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, true, true, true)) AS try_aes_decrypt(g, g, g, g, g)#0]
2+
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_sketch_agg(bytes#0, 12, 0, 0) AS hll_sketch_agg(bytes, 12)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_sketch_agg(bytes#0, 12, 0, 0) AS hll_sketch_agg(bytes, 12)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_sketch_agg(bytes#0, 0, 0, 0) AS hll_sketch_agg(bytes, 0)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_sketch_agg(bytes#0, 0, 0, 0) AS hll_sketch_agg(bytes, 0)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_sketch_agg(bytes#0, 0, 0, 0) AS hll_sketch_agg(bytes, 0)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_union_agg(bytes#0, false, 0, 0) AS hll_union_agg(bytes, false)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_union_agg(bytes#0, false, 0, 0) AS hll_union_agg(bytes, false)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_union_agg(bytes#0, false, 0, 0) AS hll_union_agg(bytes, false)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_union_agg(bytes#0, false, 0, 0) AS hll_union_agg(bytes, false)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Aggregate [hll_union_agg(bytes#0, false, 0, 0) AS hll_union_agg(bytes, false)#0]
2+
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "to_varchar",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "b"
20+
}
21+
}, {
22+
"literal": {
23+
"string": "$99.99"
24+
}
25+
}]
26+
}
27+
}]
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "try_aes_decrypt",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "g"
20+
}
21+
}, {
22+
"unresolvedAttribute": {
23+
"unparsedIdentifier": "g"
24+
}
25+
}]
26+
}
27+
}]
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"common": {
3+
"planId": "1"
4+
},
5+
"project": {
6+
"input": {
7+
"common": {
8+
"planId": "0"
9+
},
10+
"localRelation": {
11+
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
12+
}
13+
},
14+
"expressions": [{
15+
"unresolvedFunction": {
16+
"functionName": "try_aes_decrypt",
17+
"arguments": [{
18+
"unresolvedAttribute": {
19+
"unparsedIdentifier": "g"
20+
}
21+
}, {
22+
"unresolvedAttribute": {
23+
"unparsedIdentifier": "g"
24+
}
25+
}, {
26+
"unresolvedAttribute": {
27+
"unparsedIdentifier": "g"
28+
}
29+
}]
30+
}
31+
}]
32+
}
33+
}

0 commit comments

Comments
 (0)