Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ lazy val `pipeline-sdk` =
ZIO.core,
ZIO.schema,
ZIO.schemaDerivation,
Scalaland.chimney
Scalaland.chimney,
"com.chuusai" %% "shapeless" % "2.3.10"
)
)
.dependsOn(`pipeline-dsl-macros`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ai.nikin.pipeline
package model

import io.scalaland.chimney.dsl._
import shapeless.{::, HList, HNil}

package object dsl {

Expand All @@ -23,9 +24,20 @@ package object dsl {

lazy final val schema: zio.schema.Schema[DATA] = s

def &[T <: Product](l: Lake[T]): CombinedLake[T :: DATA :: HNil] =
CombinedLake[T :: DATA :: HNil](Set(this, l))

def toUntyped: UntypedLake = this.transformInto[UntypedLake]
}

case class CombinedLake[L <: HList](lakes: Set[Lake[_]])
extends Vertex[CombinedLake[L]](s"combined-${lakes.map(_.name).mkString("&")}") {
final override type IN = L
final override type OUT = L

def &[T <: Product](l: Lake[T]): CombinedLake[T :: L] = CombinedLake[T :: L](lakes + l)
}

sealed abstract class Transformation[_IN, _OUT](n: String)
extends Vertex[Transformation[_IN, _OUT]](n) {
final override type IN = _IN
Expand Down
14 changes: 12 additions & 2 deletions pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package object sdk {
import scalax.collection.GraphEdge.DiEdge
import scalax.collection.immutable.Graph
import scalax.collection.GraphPredef._
import shapeless.HList

type PipelineDef = Graph[Vertex[_], DiEdge]

Expand All @@ -28,7 +29,10 @@ package object sdk {
def >>>[
V <: VertexTO[SELF, V]
](next: V)(implicit @unused ev: CanMakeEdge[SELF, V]): PipelineBuilder[V] =
new PipelineBuilder(next, graph ++ Set(v ~> next))
v match {
case cb: CombinedLake[_] => new PipelineBuilder(next, graph ++ cb.lakes.map(_ ~> next))
case _ => new PipelineBuilder(next, graph ++ Set(v ~> next))
}
}

type VertexTO[FROM <: Vertex[FROM], TO <: Vertex[TO] { type IN = FROM#OUT }] =
Expand All @@ -40,6 +44,12 @@ package object sdk {
implicit def lakeToTransform[DATA <: Product, OUT]: CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]] =
CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]]()

implicit def combinedLakeToTransform[L <: HList, OUT]: CanMakeEdge[CombinedLake[L], Transformation[L, OUT]] =
CanMakeEdge[CombinedLake[L], Transformation[L, OUT]]()

implicit def transformToCombinedLake[DATA <: HList, IN]: CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]] =
CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]]()

def aggregation[IN <: Product, OUT <: Product](name: String, f: AggregationFunction)(implicit
inTypeTag: WeakTypeTag[IN],
outTypeTag: WeakTypeTag[OUT]
Expand All @@ -49,7 +59,7 @@ package object sdk {
typeTag: WeakTypeTag[DATA]
): Lake[DATA] = Lake(name, extractFQN(typeTag))

private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.typeSymbol.fullName
private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.toString

implicit def schemaGen[T]: ZSchema[T] = macro zio.schema.DeriveSchema.genImpl[T]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package ai.nikin.pipeline.sdk

import ai.nikin.pipeline.model.dsl._
import AggregationFunction.{Avg, Sum}
import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB}
import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB, RecordC}
import shapeless._

class SdkSpec extends TestUtils {
test("SDK - aggregation to lake") {
Expand Down Expand Up @@ -30,8 +31,9 @@ class SdkSpec extends TestUtils {

test("SDK - lake to aggregation to lake") {
val pipeline =
lake[RecordA]("lA") >>> aggregation[RecordA, RecordB]("tAB", Avg("col1", "col2")) >>>
lake[RecordB]("lB")
(lake[RecordA]("lA") & lake[RecordB]("lB") & lake[RecordC]("lC")) >>>
aggregation[RecordC :: RecordB :: RecordA :: HNil, RecordC :: RecordB :: HNil]("tAB", Avg("col1", "col2")) >>>
(lake[RecordB]("lB") & lake[RecordC]("lC"))

println(pipeline.graph)
}
Expand Down