From a148ad896e7fadc08fbc6c61b761e146ace1574f Mon Sep 17 00:00:00 2001 From: Fran Bermejo Date: Mon, 16 Jan 2023 18:01:28 +0100 Subject: [PATCH] POC: Fan-in fan-out on top of Scala Graph --- build.sbt | 3 ++- .../ai/nikin/pipeline/model/dsl/package.scala | 12 ++++++++++++ .../main/scala/ai/nikin/pipeline/sdk/package.scala | 14 ++++++++++++-- .../test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala | 8 +++++--- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index 898cd77..ccde2f8 100644 --- a/build.sbt +++ b/build.sbt @@ -63,7 +63,8 @@ lazy val `pipeline-sdk` = ZIO.core, ZIO.schema, ZIO.schemaDerivation, - Scalaland.chimney + Scalaland.chimney, + "com.chuusai" %% "shapeless" % "2.3.10" ) ) .dependsOn(`pipeline-dsl-macros`) diff --git a/pipeline-sdk/src/main/scala/ai/nikin/pipeline/model/dsl/package.scala b/pipeline-sdk/src/main/scala/ai/nikin/pipeline/model/dsl/package.scala index 5807a41..139fbd6 100644 --- a/pipeline-sdk/src/main/scala/ai/nikin/pipeline/model/dsl/package.scala +++ b/pipeline-sdk/src/main/scala/ai/nikin/pipeline/model/dsl/package.scala @@ -2,6 +2,7 @@ package ai.nikin.pipeline package model import io.scalaland.chimney.dsl._ +import shapeless.{::, HList, HNil} package object dsl { @@ -23,9 +24,20 @@ package object dsl { lazy final val schema: zio.schema.Schema[DATA] = s + def &[T <: Product](l: Lake[T]): CombinedLake[T :: DATA :: HNil] = + CombinedLake[T :: DATA :: HNil](Set(this, l)) + def toUntyped: UntypedLake = this.transformInto[UntypedLake] } + case class CombinedLake[L <: HList](lakes: Set[Lake[_]]) + extends Vertex[CombinedLake[L]](s"combined-${lakes.map(_.name).mkString("&")}") { + final override type IN = L + final override type OUT = L + + def &[T <: Product](l: Lake[T]): CombinedLake[T :: L] = CombinedLake[T :: L](lakes + l) + } + sealed abstract class Transformation[_IN, _OUT](n: String) extends Vertex[Transformation[_IN, _OUT]](n) { final override type IN = _IN diff --git a/pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala b/pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala index 02abaa8..ce55e4c 100644 --- a/pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala +++ b/pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala @@ -13,6 +13,7 @@ package object sdk { import scalax.collection.GraphEdge.DiEdge import scalax.collection.immutable.Graph import scalax.collection.GraphPredef._ + import shapeless.HList type PipelineDef = Graph[Vertex[_], DiEdge] @@ -28,7 +29,10 @@ package object sdk { def >>>[ V <: VertexTO[SELF, V] ](next: V)(implicit @unused ev: CanMakeEdge[SELF, V]): PipelineBuilder[V] = - new PipelineBuilder(next, graph ++ Set(v ~> next)) + v match { + case cb: CombinedLake[_] => new PipelineBuilder(next, graph ++ cb.lakes.map(_ ~> next)) + case _ => new PipelineBuilder(next, graph ++ Set(v ~> next)) + } } type VertexTO[FROM <: Vertex[FROM], TO <: Vertex[TO] { type IN = FROM#OUT }] = @@ -40,6 +44,12 @@ package object sdk { implicit def lakeToTransform[DATA <: Product, OUT]: CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]] = CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]]() + implicit def combinedLakeToTransform[L <: HList, OUT]: CanMakeEdge[CombinedLake[L], Transformation[L, OUT]] = + CanMakeEdge[CombinedLake[L], Transformation[L, OUT]]() + + implicit def transformToCombinedLake[DATA <: HList, IN]: CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]] = + CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]]() + def aggregation[IN <: Product, OUT <: Product](name: String, f: AggregationFunction)(implicit inTypeTag: WeakTypeTag[IN], outTypeTag: WeakTypeTag[OUT] @@ -49,7 +59,7 @@ package object sdk { typeTag: WeakTypeTag[DATA] ): Lake[DATA] = Lake(name, extractFQN(typeTag)) - private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.typeSymbol.fullName + private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.toString implicit def schemaGen[T]: ZSchema[T] = macro zio.schema.DeriveSchema.genImpl[T] diff --git a/pipeline-sdk/src/test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala b/pipeline-sdk/src/test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala index ed1ffdd..407a3c1 100644 --- a/pipeline-sdk/src/test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala +++ b/pipeline-sdk/src/test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala @@ -2,7 +2,8 @@ package ai.nikin.pipeline.sdk import ai.nikin.pipeline.model.dsl._ import AggregationFunction.{Avg, Sum} -import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB} +import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB, RecordC} +import shapeless._ class SdkSpec extends TestUtils { test("SDK - aggregation to lake") { @@ -30,8 +31,9 @@ class SdkSpec extends TestUtils { test("SDK - lake to aggregation to lake") { val pipeline = - lake[RecordA]("lA") >>> aggregation[RecordA, RecordB]("tAB", Avg("col1", "col2")) >>> - lake[RecordB]("lB") + (lake[RecordA]("lA") & lake[RecordB]("lB") & lake[RecordC]("lC")) >>> + aggregation[RecordC :: RecordB :: RecordA :: HNil, RecordC :: RecordB :: HNil]("tAB", Avg("col1", "col2")) >>> + (lake[RecordB]("lB") & lake[RecordC]("lC")) println(pipeline.graph) }