Skip to content

Commit e27ea77

Browse files
committed
mtrlz/sql: support coalesce function
1 parent af44135 commit e27ea77

File tree

6 files changed

+73
-13
lines changed

6 files changed

+73
-13
lines changed

src/materialize/dataflow/func.rs

+20
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,26 @@ impl UnaryFunc {
395395
}
396396
}
397397

398+
pub fn coalesce(datums: Vec<Datum>) -> Datum {
399+
datums
400+
.into_iter()
401+
.find(|d| !d.is_null())
402+
.unwrap_or(Datum::Null)
403+
}
404+
405+
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
406+
pub enum VariadicFunc {
407+
Coalesce,
408+
}
409+
410+
impl VariadicFunc {
411+
pub fn func(self) -> fn(Vec<Datum>) -> Datum {
412+
match self {
413+
VariadicFunc::Coalesce => coalesce,
414+
}
415+
}
416+
}
417+
398418
// TODO(jamii) be careful about overflow in sum/avg
399419
// see https://timely.zulipchat.com/#narrow/stream/186635-engineering/topic/additional.20work/near/163507435
400420

src/materialize/dataflow/render.rs

+4
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,10 @@ fn eval_expr(expr: &Expr, datum: &Datum) -> Datum {
466466
let datum2 = eval_expr(expr2, datum);
467467
(func.func())(datum1, datum2)
468468
}
469+
Expr::CallVariadic { func, exprs } => {
470+
let datums = exprs.iter().map(|e| eval_expr(e, datum)).collect();
471+
(func.func())(datums)
472+
}
469473
Expr::If { cond, then, els } => match eval_expr(cond, datum) {
470474
Datum::True => eval_expr(then, datum),
471475
Datum::False | Datum::Null => eval_expr(els, datum),

src/materialize/dataflow/types.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use serde::{Deserialize, Serialize};
77
use url::Url;
88

9-
use super::func::{AggregateFunc, BinaryFunc, UnaryFunc};
9+
use super::func::{AggregateFunc, BinaryFunc, UnaryFunc, VariadicFunc};
1010
use crate::repr::{Datum, Type};
1111

1212
/// System-wide update type.
@@ -231,11 +231,11 @@ pub enum Expr {
231231
then: Box<Expr>,
232232
els: Box<Expr>,
233233
},
234-
// /// A function call that takes an arbitrary number of arguments.
235-
// CallMany {
236-
// fn: fn(Vec<Datum>) -> Datum,
237-
// exprs: Vec<Expr>,
238-
// }
234+
/// A function call that takes an arbitrary number of arguments.
235+
CallVariadic {
236+
func: VariadicFunc,
237+
exprs: Vec<Expr>,
238+
},
239239
}
240240

241241
impl Expr {

src/materialize/sql/mod.rs

+38-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::iter::FromIterator;
2222
use std::net::{SocketAddr, ToSocketAddrs};
2323
use url::Url;
2424

25-
use crate::dataflow::func::{AggregateFunc, BinaryFunc, UnaryFunc};
25+
use crate::dataflow::func::{AggregateFunc, BinaryFunc, UnaryFunc, VariadicFunc};
2626
use crate::dataflow::{
2727
Aggregate, Dataflow, Expr, KafkaSinkConnector, KafkaSourceConnector, LocalSourceConnector,
2828
Plan, Sink, SinkConnector, Source, SourceConnector, View,
@@ -1100,7 +1100,7 @@ impl Planner {
11001100
}
11011101
let (i, typ) = plan.resolve_func(name);
11021102
let expr = Expr::Column(i, Box::new(Expr::Ambient));
1103-
return Ok((expr, typ.clone()))
1103+
return Ok((expr, typ.clone()));
11041104
}
11051105

11061106
match ident.as_str() {
@@ -1116,11 +1116,45 @@ impl Planner {
11161116
FType::Float64 => UnaryFunc::AbsFloat64,
11171117
_ => bail!("abs does not accept arguments of type {:?}", typ),
11181118
};
1119-
Ok((Expr::CallUnary {
1119+
let expr = Expr::CallUnary {
11201120
func,
11211121
expr: Box::new(expr),
1122-
}, typ))
1122+
};
1123+
Ok((expr, typ))
11231124
}
1125+
1126+
"coalesce" => {
1127+
if args.is_empty() {
1128+
bail!("coalesce requires at least one argument");
1129+
}
1130+
let mut exprs = Vec::new();
1131+
let mut result_type: Option<Type> = None;
1132+
for arg in args {
1133+
let (expr, typ) = self.plan_expr(ctx, arg, plan)?;
1134+
match &result_type {
1135+
Some(result_type) => {
1136+
if result_type.ftype != typ.ftype {
1137+
bail!(
1138+
"COALESCE does not have uniform argument type: {:?} vs {:?}",
1139+
result_type.ftype,
1140+
typ.ftype
1141+
)
1142+
}
1143+
}
1144+
None => result_type = Some(typ),
1145+
}
1146+
exprs.push(expr);
1147+
}
1148+
// args is known to be non-empty, and therefore result_type must
1149+
// be non-None after the loop above.
1150+
let result_type = result_type.unwrap();
1151+
let expr = Expr::CallVariadic {
1152+
func: VariadicFunc::Coalesce,
1153+
exprs,
1154+
};
1155+
Ok((expr, result_type))
1156+
}
1157+
11241158
_ => bail!("unsupported function: {}", ident),
11251159
}
11261160
}

src/ore/collections.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
88
/// Extension methods for collections.
99
pub trait CollectionExt<T>
10-
where T: IntoIterator
10+
where
11+
T: IntoIterator,
1112
{
1213
/// Consumes the collection and returns its first element.
1314
///
@@ -21,7 +22,8 @@ pub trait CollectionExt<T>
2122
}
2223

2324
impl<T> CollectionExt<T> for T
24-
where T: IntoIterator
25+
where
26+
T: IntoIterator,
2527
{
2628
fn into_element(self) -> T::Item {
2729
self.into_iter().next().unwrap()

src/testdrive/action/kafka.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use serde_json::Value as JsonValue;
2424

2525
use crate::action::{Action, State};
2626
use crate::parser::BuiltinCommand;
27-
use ore::future::StreamExt;
2827
use ore::collections::CollectionExt;
28+
use ore::future::StreamExt;
2929

3030
pub struct IngestAction {
3131
topic: String,

0 commit comments

Comments
 (0)