Skip to content

Commit af44135

Browse files
committed
mtrlz/sql: support abs function
1 parent fd578aa commit af44135

File tree

8 files changed

+98
-38
lines changed

8 files changed

+98
-38
lines changed

src/materialize/dataflow/func.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,34 @@ pub fn neg_float64(a: Datum) -> Datum {
223223
Datum::from(-a.unwrap_float64())
224224
}
225225

226+
pub fn abs_int32(a: Datum) -> Datum {
227+
if a.is_null() {
228+
return Datum::Null;
229+
}
230+
Datum::from(a.unwrap_int32().abs())
231+
}
232+
233+
pub fn abs_int64(a: Datum) -> Datum {
234+
if a.is_null() {
235+
return Datum::Null;
236+
}
237+
Datum::from(a.unwrap_int64().abs())
238+
}
239+
240+
pub fn abs_float32(a: Datum) -> Datum {
241+
if a.is_null() {
242+
return Datum::Null;
243+
}
244+
Datum::from(a.unwrap_float32().abs())
245+
}
246+
247+
pub fn abs_float64(a: Datum) -> Datum {
248+
if a.is_null() {
249+
return Datum::Null;
250+
}
251+
Datum::from(a.unwrap_float64().abs())
252+
}
253+
226254
pub fn eq(a: Datum, b: Datum) -> Datum {
227255
if a.is_null() || b.is_null() {
228256
return Datum::Null;
@@ -344,6 +372,10 @@ pub enum UnaryFunc {
344372
NegInt64,
345373
NegFloat32,
346374
NegFloat64,
375+
AbsInt32,
376+
AbsInt64,
377+
AbsFloat32,
378+
AbsFloat64,
347379
}
348380

349381
impl UnaryFunc {
@@ -355,6 +387,10 @@ impl UnaryFunc {
355387
UnaryFunc::NegInt64 => neg_int64,
356388
UnaryFunc::NegFloat32 => neg_float32,
357389
UnaryFunc::NegFloat64 => neg_float64,
390+
UnaryFunc::AbsInt32 => abs_int32,
391+
UnaryFunc::AbsInt64 => abs_int64,
392+
UnaryFunc::AbsFloat32 => abs_float32,
393+
UnaryFunc::AbsFloat64 => abs_float64,
358394
}
359395
}
360396
}

src/materialize/interchange/avro.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::collections::HashMap;
1212
use url::Url;
1313

1414
use crate::repr::{Datum, FType, Type};
15-
use ore::vec::VecExt;
15+
use ore::collections::CollectionExt;
1616

1717
/// Converts an Apache Avro schema into a [`repr::Type`].
1818
pub fn parse_schema(schema: &str) -> Result<Type, Error> {

src/materialize/sql/mod.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::dataflow::{
3030
use crate::glue::*;
3131
use crate::interchange::avro;
3232
use crate::repr::{Datum, FType, Type};
33-
use ore::vec::VecExt;
33+
use ore::collections::CollectionExt;
3434
use plan::SQLPlan;
3535
use store::{DataflowStore, RemoveMode};
3636

@@ -1086,22 +1086,42 @@ impl Planner {
10861086
&self,
10871087
ctx: &ExprContext,
10881088
name: &'a SQLObjectName,
1089-
_args: &'a [ASTNode],
1089+
args: &'a [ASTNode],
10901090
_over: &'a Option<SQLWindowSpec>,
10911091
_all: bool,
10921092
_distinct: bool,
10931093
plan: &SQLPlan,
10941094
) -> Result<(Expr, Type), failure::Error> {
10951095
let ident = name.to_string().to_lowercase();
1096+
10961097
if AggregateFunc::is_aggregate_func(&ident) {
10971098
if !ctx.allow_aggregates {
10981099
bail!("aggregate functions are not allowed in {}", ctx.scope);
10991100
}
11001101
let (i, typ) = plan.resolve_func(name);
11011102
let expr = Expr::Column(i, Box::new(Expr::Ambient));
1102-
Ok((expr, typ.clone()))
1103-
} else {
1104-
bail!("Unsupported function: {}", ident)
1103+
return Ok((expr, typ.clone()))
1104+
}
1105+
1106+
match ident.as_str() {
1107+
"abs" => {
1108+
if args.len() != 1 {
1109+
bail!("abs expects one argument, got {}", args.len());
1110+
}
1111+
let (expr, typ) = self.plan_expr(ctx, args.into_element(), plan)?;
1112+
let func = match typ.ftype {
1113+
FType::Int32 => UnaryFunc::AbsInt32,
1114+
FType::Int64 => UnaryFunc::AbsInt64,
1115+
FType::Float32 => UnaryFunc::AbsFloat32,
1116+
FType::Float64 => UnaryFunc::AbsFloat64,
1117+
_ => bail!("abs does not accept arguments of type {:?}", typ),
1118+
};
1119+
Ok((Expr::CallUnary {
1120+
func,
1121+
expr: Box::new(expr),
1122+
}, typ))
1123+
}
1124+
_ => bail!("unsupported function: {}", ident),
11051125
}
11061126
}
11071127

src/ore/collections.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2019 Materialize, Inc. All rights reserved.
2+
//
3+
// This file is part of Materialize. Materialize may not be used or
4+
// distributed without the express permission of Materialize, Inc.
5+
6+
//! Collection utilities.
7+
8+
/// Extension methods for collections.
9+
pub trait CollectionExt<T>
10+
where T: IntoIterator
11+
{
12+
/// Consumes the collection and returns its first element.
13+
///
14+
/// This method panics if the collection does not have at least one element.
15+
fn into_element(self) -> T::Item;
16+
17+
/// Consumes the collection and returns its last element.
18+
///
19+
/// This method panics if the collection does not have at least one element.
20+
fn into_last(self) -> T::Item;
21+
}
22+
23+
impl<T> CollectionExt<T> for T
24+
where T: IntoIterator
25+
{
26+
fn into_element(self) -> T::Item {
27+
self.into_iter().next().unwrap()
28+
}
29+
30+
fn into_last(self) -> T::Item {
31+
self.into_iter().last().unwrap()
32+
}
33+
}

src/ore/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
1212
#![forbid(missing_docs)]
1313

14+
pub mod collections;
1415
pub mod future;
1516
pub mod log;
1617
pub mod netio;
1718
pub mod option;
1819
pub mod sync;
19-
pub mod vec;
2020

2121
/// Logs a message to stderr and crashes the process.
2222
///

src/ore/vec.rs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/testdrive/action/kafka.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use serde_json::Value as JsonValue;
2525
use crate::action::{Action, State};
2626
use crate::parser::BuiltinCommand;
2727
use ore::future::StreamExt;
28-
use ore::vec::VecExt;
28+
use ore::collections::CollectionExt;
2929

3030
pub struct IngestAction {
3131
topic: String,

src/testdrive/action/sql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::time::Duration;
1313

1414
use crate::action::{Action, State};
1515
use crate::parser::{FailSqlCommand, SqlCommand};
16-
use ore::vec::VecExt;
16+
use ore::collections::CollectionExt;
1717

1818
pub struct SqlAction {
1919
cmd: SqlCommand,

0 commit comments

Comments
 (0)