Skip to content

Commit 5d6d624

Browse files
Merge pull request #2164 from taozhi8833998/feat-tumble-named-params-flink
feat: support named params for tumble function in flinksql
2 parents 71a0ffc + bb00a8d commit 5d6d624

File tree

3 files changed

+72
-10
lines changed

3 files changed

+72
-10
lines changed

pegjs/flinksql.pegjs

+28-5
Original file line numberDiff line numberDiff line change
@@ -1890,6 +1890,8 @@ table_join
18901890
};
18911891
}
18921892

1893+
tumble_args
1894+
= n:('DATA'i __ IMPLIES_ARROW) __ KW_TABLE __ d:table_name
18931895
//NOTE that, the table assigned to `var` shouldn't write in `table_join`
18941896
table_base
18951897
= KW_DUAL {
@@ -1919,16 +1921,36 @@ table_base
19191921
as: alias
19201922
};
19211923
}
1922-
/ KW_TABLE __ LPAREN __ KW_TUMBLE __ LPAREN __ KW_TABLE __ d:table_name __ COMMA __ 'DESCRIPTOR'i __ LPAREN __ t:column_ref __ RPAREN __ COMMA __ s:interval_expr __ RPAREN __ RPAREN __ alias:alias_clause? {
1923-
return {
1924+
/ KW_TABLE __ LPAREN __ KW_TUMBLE __ LPAREN __ dn:('DATA'i __ IMPLIES_ARROW)? __ KW_TABLE __ d:table_name __ COMMA __ tn:('TIMECOL'i __ IMPLIES_ARROW)? __ 'DESCRIPTOR'i __ LPAREN __ t:column_ref __ RPAREN __ COMMA __ sn:('SIZE'i __ IMPLIES_ARROW)? __ s:interval_expr o:(__ COMMA __ ('OFFSET'i __ IMPLIES_ARROW)? __ interval_expr)? __ RPAREN __ RPAREN __ alias:alias_clause? {
1925+
const result = {
19241926
expr: {
19251927
type: 'tumble',
1926-
data: d,
1927-
timecol: t,
1928-
size: s
1928+
data: {
1929+
name: dn && dn[0],
1930+
symbol: dn && dn[2],
1931+
expr: d
1932+
},
1933+
timecol: {
1934+
name: tn && tn[0],
1935+
symbol: tn && tn[2],
1936+
expr: t,
1937+
},
1938+
size: {
1939+
name: sn && sn[0],
1940+
symbol: sn && sn[2],
1941+
expr: s,
1942+
},
19291943
},
19301944
as: alias
19311945
}
1946+
if (o) {
1947+
result.expr.offset = {
1948+
name: o[3] && o[3][0],
1949+
symbol: o[3] && o[3][2],
1950+
expr: o[5],
1951+
}
1952+
}
1953+
return result
19321954
}
19331955

19341956
join_op
@@ -3562,6 +3584,7 @@ SINGLE_ARROW = '->'
35623584
DOUBLE_ARROW = '->>'
35633585
WELL_ARROW = '#>'
35643586
DOUBLE_WELL_ARROW = '#>>'
3587+
IMPLIES_ARROW = '=>'
35653588

35663589
OPERATOR_CONCATENATION = '||'
35673590
OPERATOR_AND = '&&'

src/tables.js

+12-4
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,20 @@ function tableHintToSQL(tableHintExpr) {
6464
return result.filter(hasVal).join(' ')
6565
}
6666

67+
function tableTumbleArgsToSQL(param, expr) {
68+
const { name, symbol } = param
69+
return [toUpper(name), symbol, expr].filter(hasVal).join(' ')
70+
}
6771
function tableTumbleToSQL(tumble) {
6872
if (!tumble) return ''
69-
const { data: tableInfo, timecol, size } = tumble
70-
const fullTableName = [identifierToSql(tableInfo.db), identifierToSql(tableInfo.table)].filter(hasVal).join('.')
71-
const result = ['TABLE(TUMBLE(TABLE', fullTableName, `DESCRIPTOR(${columnRefToSQL(timecol)})`, `${intervalToSQL(size)}))`]
72-
return result.filter(hasVal).join(' ')
73+
const { data: tableInfo, timecol, offset, size } = tumble
74+
const fullTableName = [identifierToSql(tableInfo.expr.db), identifierToSql(tableInfo.expr.schema), identifierToSql(tableInfo.expr.table)].filter(hasVal).join('.')
75+
const timeColSQL = `DESCRIPTOR(${columnRefToSQL(timecol.expr)})`
76+
const result = [`TABLE(TUMBLE(TABLE ${tableTumbleArgsToSQL(tableInfo, fullTableName)}`, tableTumbleArgsToSQL(timecol, timeColSQL)]
77+
const sizeSQL = tableTumbleArgsToSQL(size, intervalToSQL(size.expr))
78+
if (offset && offset.expr) result.push(sizeSQL, `${tableTumbleArgsToSQL(offset, intervalToSQL(offset.expr))}))`)
79+
else result.push(`${sizeSQL}))`)
80+
return result.filter(hasVal).join(', ')
7381
}
7482

7583
function temporalTableOptionToSQL(stmt) {

test/flink.spec.js

+32-1
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,38 @@ describe('Flink', () => {
376376
INTERVAL '60' SECONDS
377377
)
378378
)`,
379-
"SELECT `window_start`, `window_end`, `http_status`, COUNT(*) AS `count_http_status` FROM TABLE(TUMBLE(TABLE `parsed_logs` DESCRIPTOR(`_operationTs`) INTERVAL '60' SECONDS))"
379+
"SELECT `window_start`, `window_end`, `http_status`, COUNT(*) AS `count_http_status` FROM TABLE(TUMBLE(TABLE `parsed_logs`, DESCRIPTOR(`_operationTs`), INTERVAL '60' SECONDS))"
380+
]
381+
},
382+
{
383+
title: 'tumble table with offset',
384+
sql: [
385+
`SELECT
386+
window_start,
387+
window_end,
388+
http_status,
389+
count(*) as count_http_status
390+
FROM
391+
TABLE (
392+
TUMBLE (
393+
TABLE parsed_logs,
394+
DESCRIPTOR (_operationTs),
395+
INTERVAL '60' SECONDS,
396+
INTERVAL '10' MINUTES
397+
)
398+
)`,
399+
"SELECT `window_start`, `window_end`, `http_status`, COUNT(*) AS `count_http_status` FROM TABLE(TUMBLE(TABLE `parsed_logs`, DESCRIPTOR(`_operationTs`), INTERVAL '60' SECONDS, INTERVAL '10' MINUTES))"
400+
]
401+
},
402+
{
403+
title: 'tumble table with named params',
404+
sql: [
405+
`SELECT * FROM TABLE(
406+
TUMBLE(
407+
DATA => TABLE Bid,
408+
TIMECOL => DESCRIPTOR(bidtime),
409+
SIZE => INTERVAL '10' MINUTES))`,
410+
"SELECT * FROM TABLE(TUMBLE(TABLE DATA => `Bid`, TIMECOL => DESCRIPTOR(`bidtime`), SIZE => INTERVAL '10' MINUTES))"
380411
]
381412
},
382413
{

0 commit comments

Comments
 (0)