Skip to content

Commit acb3165

Browse files
authored
Merge pull request #36 from VirtusLab/new_named_columns
Redesign handling of named columns * Separate general Column supertype from data type specific Col[T] * Remove column names from members of view-like refinements * Rely on tuples instead of varargs in user facing APIs of methods like select, agg, groupBy * Assign names to columns via implicit conversions
2 parents dd486a8 + fd2b51e commit acb3165

19 files changed

+314
-205
lines changed

src/main/CollectColumns.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.virtuslab.iskra
2+
3+
import scala.compiletime.error
4+
5+
import org.virtuslab.iskra.types.DataType
6+
7+
// TODO should it be covariant or not?
8+
trait CollectColumns[-C]:
9+
type CollectedColumns <: Tuple
10+
def underlyingColumns(c: C): Seq[UntypedColumn]
11+
12+
// Using `given ... with { ... }` syntax might sometimes break pattern match on `CollectColumns[...] { type CollectedColumns = cc }`
13+
14+
object CollectColumns:
15+
given collectNamedColumn[N <: Name, T <: DataType]: CollectColumns[NamedColumn[N, T]] with
16+
type CollectedColumns = (N := T) *: EmptyTuple
17+
def underlyingColumns(c: NamedColumn[N, T]) = Seq(c.untyped)
18+
19+
given collectColumnsWithSchema[S <: Tuple]: CollectColumns[ColumnsWithSchema[S]] with
20+
type CollectedColumns = S
21+
def underlyingColumns(c: ColumnsWithSchema[S]) = c.underlyingColumns
22+
23+
given collectEmptyTuple[S]: CollectColumns[EmptyTuple] with
24+
type CollectedColumns = EmptyTuple
25+
def underlyingColumns(c: EmptyTuple) = Seq.empty
26+
27+
given collectCons[H, T <: Tuple](using collectHead: CollectColumns[H], collectTail: CollectColumns[T]): (CollectColumns[H *: T] { type CollectedColumns = Tuple.Concat[collectHead.CollectedColumns, collectTail.CollectedColumns] }) =
28+
new CollectColumns[H *: T]:
29+
type CollectedColumns = Tuple.Concat[collectHead.CollectedColumns, collectTail.CollectedColumns]
30+
def underlyingColumns(c: H *: T) = collectHead.underlyingColumns(c.head) ++ collectTail.underlyingColumns(c.tail)
31+
32+
33+
// TODO Customize error message for different operations with an explanation
34+
class CannotCollectColumns(typeName: String)
35+
extends Exception(s"Could not find an instance of CollectColumns for ${typeName}")

src/main/Column.scala

Lines changed: 70 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,94 @@
11
package org.virtuslab.iskra
22

3+
import scala.language.implicitConversions
4+
35
import scala.quoted.*
46

57
import org.apache.spark.sql.{Column => UntypedColumn}
68
import types.DataType
9+
import MacroHelpers.TupleSubtype
10+
11+
class Column(val untyped: UntypedColumn):
12+
inline def name(using v: ValueOf[Name]): Name = v.value
13+
14+
object Column:
15+
implicit transparent inline def columnToNamedColumn(inline col: Col[?]): NamedColumn[?, ?] =
16+
${ columnToNamedColumnImpl('col) }
17+
18+
private def columnToNamedColumnImpl(col: Expr[Col[?]])(using Quotes): Expr[NamedColumn[?, ?]] =
19+
import quotes.reflect.*
20+
col match
21+
case '{ ($v: StructuralSchemaView).selectDynamic($nm: Name).$asInstanceOf$[Col[tp]] } =>
22+
nm.asTerm.tpe.asType match
23+
case '[Name.Subtype[n]] =>
24+
'{ NamedColumn[n, tp](${ col }.untyped.as(${ nm })) }
25+
case '{ $c: Col[tp] } =>
26+
col.asTerm match
27+
case Inlined(_, _, Ident(name)) =>
28+
ConstantType(StringConstant(name)).asType match
29+
case '[Name.Subtype[n]] =>
30+
val alias = Literal(StringConstant(name)).asExprOf[Name]
31+
'{ NamedColumn[n, tp](${ col }.untyped.as(${ alias })) }
32+
33+
extension [T <: DataType](col: Col[T])
34+
inline def as[N <: Name](name: N): NamedColumn[N, T] =
35+
NamedColumn[N, T](col.untyped.as(name))
36+
inline def alias[N <: Name](name: N): NamedColumn[N, T] =
37+
NamedColumn[N, T](col.untyped.as(name))
38+
39+
extension [T1 <: DataType](col1: Col[T1])
40+
inline def +[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Plus[T1, T2]): Col[op.Out] = op(col1, col2)
41+
inline def -[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Minus[T1, T2]): Col[op.Out] = op(col1, col2)
42+
inline def *[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Mult[T1, T2]): Col[op.Out] = op(col1, col2)
43+
inline def /[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Div[T1, T2]): Col[op.Out] = op(col1, col2)
44+
inline def ++[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.PlusPlus[T1, T2]): Col[op.Out] = op(col1, col2)
45+
inline def <[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Lt[T1, T2]): Col[op.Out] = op(col1, col2)
46+
inline def <=[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Le[T1, T2]): Col[op.Out] = op(col1, col2)
47+
inline def >[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Gt[T1, T2]): Col[op.Out] = op(col1, col2)
48+
inline def >=[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Ge[T1, T2]): Col[op.Out] = op(col1, col2)
49+
inline def ===[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Eq[T1, T2]): Col[op.Out] = op(col1, col2)
50+
inline def =!=[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Ne[T1, T2]): Col[op.Out] = op(col1, col2)
51+
inline def &&[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.And[T1, T2]): Col[op.Out] = op(col1, col2)
52+
inline def ||[T2 <: DataType](col2: Col[T2])(using op: ColumnOp.Or[T1, T2]): Col[op.Out] = op(col1, col2)
53+
54+
55+
class Col[+T <: DataType](untyped: UntypedColumn) extends Column(untyped)
756

8-
sealed trait NamedColumns[Schema](val underlyingColumns: Seq[UntypedColumn])
957

1058
object Columns:
11-
transparent inline def apply(inline columns: NamedColumns[?]*): NamedColumns[?] = ${ applyImpl('columns) }
59+
transparent inline def apply[C <: NamedColumns](columns: C): ColumnsWithSchema[?] = ${ applyImpl('columns) }
1260

13-
private def applyImpl(columns: Expr[Seq[NamedColumns[?]]])(using Quotes): Expr[NamedColumns[?]] =
61+
private def applyImpl[C : Type](columns: Expr[C])(using Quotes): Expr[ColumnsWithSchema[?]] =
1462
import quotes.reflect.*
1563

16-
val columnValuesWithTypes = columns match
17-
case Varargs(colExprs) =>
18-
colExprs.map { arg =>
19-
arg match
20-
case '{ $value: NamedColumns[schema] } => ('{ ${ value }.underlyingColumns }, Type.of[schema])
21-
}
64+
Expr.summon[CollectColumns[C]] match
65+
case Some(collectColumns) =>
66+
collectColumns match
67+
case '{ $cc: CollectColumns[?] { type CollectedColumns = collectedColumns } } =>
68+
Type.of[collectedColumns] match
69+
case '[TupleSubtype[collectedCols]] =>
70+
'{
71+
val cols = ${ cc }.underlyingColumns(${ columns })
72+
ColumnsWithSchema[collectedCols](cols)
73+
}
74+
case None =>
75+
throw CollectColumns.CannotCollectColumns(Type.show[C])
2276

23-
val columnsValues = columnValuesWithTypes.map(_._1)
24-
val columnsTypes = columnValuesWithTypes.map(_._2)
2577

26-
val schemaTpe = FrameSchema.schemaTypeFromColumnsTypes(columnsTypes)
78+
trait NamedColumnOrColumnsLike
2779

28-
schemaTpe match
29-
case '[s] =>
30-
'{
31-
val cols = ${ Expr.ofSeq(columnsValues) }.flatten
32-
new NamedColumns[s](cols) {}
33-
}
80+
type NamedColumns = Repeated[NamedColumnOrColumnsLike]
3481

35-
class Column[+T <: DataType](val untyped: UntypedColumn):
82+
class NamedColumn[N <: Name, T <: DataType](val untyped: UntypedColumn)
83+
extends NamedColumnOrColumnsLike
3684

37-
inline def name(using v: ValueOf[Name]): Name = v.value
85+
class ColumnsWithSchema[Schema <: Tuple](val underlyingColumns: Seq[UntypedColumn]) extends NamedColumnOrColumnsLike
3886

39-
object Column:
40-
extension [T <: DataType](col: Column[T])
41-
inline def as[N <: Name](name: N)(using v: ValueOf[N]): LabeledColumn[N, T] =
42-
LabeledColumn[N, T](col.untyped.as(v.value))
43-
inline def alias[N <: Name](name: N)(using v: ValueOf[N]): LabeledColumn[N, T] =
44-
LabeledColumn[N, T](col.untyped.as(v.value))
45-
46-
extension [T1 <: DataType](col1: Column[T1])
47-
inline def +[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Plus[T1, T2]): Column[op.Out] = op(col1, col2)
48-
inline def -[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Minus[T1, T2]): Column[op.Out] = op(col1, col2)
49-
inline def *[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Mult[T1, T2]): Column[op.Out] = op(col1, col2)
50-
inline def /[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Div[T1, T2]): Column[op.Out] = op(col1, col2)
51-
inline def ++[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.PlusPlus[T1, T2]): Column[op.Out] = op(col1, col2)
52-
inline def <[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Lt[T1, T2]): Column[op.Out] = op(col1, col2)
53-
inline def <=[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Le[T1, T2]): Column[op.Out] = op(col1, col2)
54-
inline def >[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Gt[T1, T2]): Column[op.Out] = op(col1, col2)
55-
inline def >=[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Ge[T1, T2]): Column[op.Out] = op(col1, col2)
56-
inline def ===[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Eq[T1, T2]): Column[op.Out] = op(col1, col2)
57-
inline def =!=[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Ne[T1, T2]): Column[op.Out] = op(col1, col2)
58-
inline def &&[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.And[T1, T2]): Column[op.Out] = op(col1, col2)
59-
inline def ||[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Or[T1, T2]): Column[op.Out] = op(col1, col2)
6087

6188
@annotation.showAsInfix
62-
class :=[L <: LabeledColumn.Label, T <: DataType](untyped: UntypedColumn)
63-
extends Column[T](untyped)
64-
with NamedColumns[(L := T) *: EmptyTuple](Seq(untyped))
89+
trait :=[L <: ColumnLabel, T <: DataType]
6590

6691
@annotation.showAsInfix
6792
trait /[+Prefix <: Name, +Suffix <: Name]
6893

69-
type LabeledColumn[L <: LabeledColumn.Label, T <: DataType] = :=[L, T]
70-
71-
object LabeledColumn:
72-
type Label = Name | (Name / Name)
73-
def apply[L <: LabeledColumn.Label, T <: DataType](untyped: UntypedColumn) = new :=[L, T](untyped)
94+
type ColumnLabel = Name | (Name / Name)

src/main/ColumnOp.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package org.virtuslab.iskra
33
import scala.quoted.*
44
import org.apache.spark.sql
55
import org.apache.spark.sql.functions.concat
6-
import org.virtuslab.iskra.{Column as Col}
6+
import org.virtuslab.iskra.Col
77
import org.virtuslab.iskra.UntypedOps.typed
88
import org.virtuslab.iskra.types.*
99
import DataType.*

src/main/FrameSchema.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ object FrameSchema:
1414
case TupleSubtype[s2] => S1 *: s2
1515
case _ => S1 *: S2 *: EmptyTuple
1616

17-
type NullableLabeledColumn[T] = T match
17+
type NullableLabeledDataType[T] = T match
1818
case label := tpe => label := DataType.Nullable[tpe]
1919

2020
type NullableSchema[T] = T match
21-
case TupleSubtype[s] => Tuple.Map[s, NullableLabeledColumn]
22-
case _ => NullableLabeledColumn[T]
21+
case TupleSubtype[s] => Tuple.Map[s, NullableLabeledDataType]
22+
case _ => NullableLabeledDataType[T]
2323

2424
def reownType[Owner <: Name : Type](schema: Type[?])(using Quotes): Type[?] =
2525
schema match

src/main/Grouping.scala

Lines changed: 40 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,45 +13,38 @@ object GroupBy:
1313

1414
given groupByOps: {} with
1515
extension [View <: SchemaView](groupBy: GroupBy[View])
16-
transparent inline def apply(inline groupingColumns: View ?=> NamedColumns[?]*) = ${ applyImpl[View]('groupBy, 'groupingColumns) }
16+
transparent inline def apply[C <: NamedColumns](groupingColumns: View ?=> C) = ${ applyImpl[View, C]('groupBy, 'groupingColumns) }
1717

18-
def groupByImpl[S : Type](df: Expr[StructDataFrame[S]])(using Quotes): Expr[GroupBy[?]] =
18+
private def groupByImpl[S : Type](df: Expr[StructDataFrame[S]])(using Quotes): Expr[GroupBy[?]] =
1919
import quotes.reflect.asTerm
2020
val viewExpr = StructSchemaView.schemaViewExpr[StructDataFrame[S]]
2121
viewExpr.asTerm.tpe.asType match
2222
case '[SchemaView.Subtype[v]] =>
2323
'{ GroupBy[v](${ viewExpr }.asInstanceOf[v], ${ df }.untyped) }
2424

25-
def applyImpl[View <: SchemaView : Type](groupBy: Expr[GroupBy[View]], groupingColumns: Expr[Seq[View ?=> NamedColumns[?]]])(using Quotes): Expr[GroupedDataFrame[View]] =
25+
private def applyImpl[View <: SchemaView : Type, C : Type](groupBy: Expr[GroupBy[View]], groupingColumns: Expr[View ?=> C])(using Quotes): Expr[GroupedDataFrame[View]] =
2626
import quotes.reflect.*
2727

28-
val columnValuesWithTypes = groupingColumns match
29-
case Varargs(colExprs) =>
30-
colExprs.map { arg =>
31-
val reduced = Term.betaReduce('{$arg(using ${ groupBy }.view)}.asTerm).get
32-
reduced.asExpr match
33-
case '{ $value: NamedColumns[schema] } => ('{ ${ value }.underlyingColumns }, Type.of[schema])
34-
}
35-
36-
val columnsValues = columnValuesWithTypes.map(_._1)
37-
val columnsTypes = columnValuesWithTypes.map(_._2)
38-
39-
val groupedSchemaTpe = FrameSchema.schemaTypeFromColumnsTypes(columnsTypes)
40-
groupedSchemaTpe match
41-
case '[TupleSubtype[groupingKeys]] =>
42-
val groupedViewExpr = StructSchemaView.schemaViewExpr[StructDataFrame[groupingKeys]]
43-
44-
groupedViewExpr.asTerm.tpe.asType match
45-
case '[SchemaView.Subtype[groupedView]] =>
46-
'{
47-
val groupingCols = ${ Expr.ofSeq(columnsValues) }.flatten
48-
new GroupedDataFrame[View]:
49-
type GroupingKeys = groupingKeys
50-
type GroupedView = groupedView
51-
def underlying = ${ groupBy }.underlying.groupBy(groupingCols*)
52-
def fullView = ${ groupBy }.view
53-
def groupedView = ${ groupedViewExpr }.asInstanceOf[GroupedView]
54-
}
28+
Expr.summon[CollectColumns[C]] match
29+
case Some(collectColumns) =>
30+
collectColumns match
31+
case '{ $cc: CollectColumns[?] { type CollectedColumns = collectedColumns } } =>
32+
Type.of[collectedColumns] match
33+
case '[TupleSubtype[collectedCols]] =>
34+
val groupedViewExpr = StructSchemaView.schemaViewExpr[StructDataFrame[collectedCols]]
35+
groupedViewExpr.asTerm.tpe.asType match
36+
case '[SchemaView.Subtype[groupedView]] =>
37+
'{
38+
val groupingCols = ${ cc }.underlyingColumns(${ groupingColumns }(using ${ groupBy }.view))
39+
new GroupedDataFrame[View]:
40+
type GroupingKeys = collectedCols
41+
type GroupedView = groupedView
42+
def underlying = ${ groupBy }.underlying.groupBy(groupingCols*)
43+
def fullView = ${ groupBy }.view
44+
def groupedView = ${ groupedViewExpr }.asInstanceOf[GroupedView]
45+
}
46+
case None =>
47+
throw CollectColumns.CannotCollectColumns(Type.show[C])
5548

5649
// TODO: Rename to RelationalGroupedDataset and handle other aggregations: cube, rollup (and pivot?)
5750
trait GroupedDataFrame[FullView <: SchemaView]:
@@ -66,13 +59,12 @@ trait GroupedDataFrame[FullView <: SchemaView]:
6659
object GroupedDataFrame:
6760
given groupedDataFrameOps: {} with
6861
extension [FullView <: SchemaView, GroupKeys <: Tuple, GroupView <: SchemaView](gdf: GroupedDataFrame[FullView]{ type GroupedView = GroupView; type GroupingKeys = GroupKeys })
69-
transparent inline def agg(inline columns: (Agg { type View = FullView }, GroupView) ?=> NamedColumns[?]*): StructDataFrame[?] =
70-
${ aggImpl[FullView, GroupKeys, GroupView]('gdf, 'columns) }
62+
transparent inline def agg[C <: NamedColumns](columns: (Agg { type View = FullView }, GroupView) ?=> C): StructDataFrame[?] =
63+
${ aggImpl[FullView, GroupKeys, GroupView, C]('gdf, 'columns) }
7164

72-
73-
def aggImpl[FullView <: SchemaView : Type, GroupingKeys <: Tuple : Type, GroupView <: SchemaView : Type](
65+
private def aggImpl[FullView <: SchemaView : Type, GroupingKeys <: Tuple : Type, GroupView <: SchemaView : Type, C : Type](
7466
gdf: Expr[GroupedDataFrame[FullView] { type GroupedView = GroupView }],
75-
columns: Expr[Seq[(Agg { type View = FullView }, GroupView) ?=> NamedColumns[?]]]
67+
columns: Expr[(Agg { type View = FullView }, GroupView) ?=> C]
7668
)(using Quotes): Expr[StructDataFrame[?]] =
7769
import quotes.reflect.*
7870

@@ -82,27 +74,19 @@ object GroupedDataFrame:
8274
val view = ${ gdf }.fullView
8375
}
8476

85-
val columnValuesWithTypes = columns match
86-
case Varargs(colExprs) =>
87-
colExprs.map { arg =>
88-
val reduced = Term.betaReduce('{$arg(using ${ aggWrapper }, ${ gdf }.groupedView)}.asTerm).get
89-
reduced.asExpr match
90-
case '{ $value: NamedColumns[schema] } => ('{ ${ value }.underlyingColumns }, Type.of[schema])
91-
}
92-
93-
val columnsValues = columnValuesWithTypes.map(_._1)
94-
val columnsTypes = columnValuesWithTypes.map(_._2)
95-
96-
val schemaTpe = FrameSchema.schemaTypeFromColumnsTypes(columnsTypes)
97-
schemaTpe match
98-
case '[s] =>
99-
'{
100-
// TODO assert cols is not empty
101-
val cols = ${ Expr.ofSeq(columnsValues) }.flatten
102-
StructDataFrame[FrameSchema.Merge[GroupingKeys, s]](
103-
${ gdf }.underlying.agg(cols.head, cols.tail*)
104-
)
105-
}
77+
Expr.summon[CollectColumns[C]] match
78+
case Some(collectColumns) =>
79+
collectColumns match
80+
case '{ $cc: CollectColumns[?] { type CollectedColumns = collectedColumns } } =>
81+
'{
82+
// TODO assert cols is not empty
83+
val cols = ${ cc }.underlyingColumns(${ columns }(using ${ aggWrapper }, ${ gdf }.groupedView))
84+
StructDataFrame[FrameSchema.Merge[GroupingKeys, collectedColumns]](
85+
${ gdf }.underlying.agg(cols.head, cols.tail*)
86+
)
87+
}
88+
case None =>
89+
throw CollectColumns.CannotCollectColumns(Type.show[C])
10690

10791
trait Agg:
10892
type View <: SchemaView

src/main/JoinOnCondition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object JoinOnCondition:
7373
import quotes.reflect.*
7474

7575
'{ ${ condition }(using ${ joiningView }) } match
76-
case '{ $cond: Column[BooleanOptType] } =>
76+
case '{ $cond: Col[BooleanOptType] } =>
7777
'{
7878
val joined = ${ join }.left.join(${ join }.right, ${ cond }.untyped, JoinType.typeName[T])
7979
StructDataFrame[JoinedSchema](joined)

src/main/Repeated.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.virtuslab.iskra
2+
3+
type Repeated[A] =
4+
A
5+
| (A, A)
6+
| (A, A, A)
7+
| (A, A, A, A)
8+
| (A, A, A, A, A)
9+
| (A, A, A, A, A, A)
10+
| (A, A, A, A, A, A, A)
11+
| (A, A, A, A, A, A, A, A)
12+
| (A, A, A, A, A, A, A, A, A)
13+
| (A, A, A, A, A, A, A, A, A, A)
14+
| (A, A, A, A, A, A, A, A, A, A, A)
15+
| (A, A, A, A, A, A, A, A, A, A, A, A)
16+
| (A, A, A, A, A, A, A, A, A, A, A, A, A)
17+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A)
18+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
19+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
20+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
21+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
22+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
23+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
24+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A)
25+
| (A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A) // 22 is maximal arity

0 commit comments

Comments
 (0)