注意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函数构建:
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = fromSinkAndSourceMat(sink, source)(Keep.none)
/** * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages. */object CoupledTerminationFlow { @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2") def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
从上面图列里的Composite BidiFlow可以看出:一个复合Graph的内部可以是很复杂的,但从外面看到的只是简单的几个输入输出端口。不过Graph内部构件之间的端口必须按照功能逻辑进行正确的连接,剩下的就变成直接向外公开的界面端口了。这种机制支持了层级式的模块化组合方式,如下面的图示:
val nestedFlow = Flow[Int].filter(_ != 0) // an atomic processing stage .map(_ - 2) // another atomic processing stage .named("nestedFlow") // wraps up the Flow, and gives it a nameval nestedSink = nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow .named("nestedSink") // wrap it up// Create a RunnableGraphval runnableGraph = nestedSource.to(nestedSink)
import akka.actor._import akka.stream._import akka.stream.scaladsl._import scala.collection.immutableobject GraphModules { def someProcess[I, O]: I => O = i => i.asInstanceOf[O] case class TwoThreeShape[I, I2, O, O2, O3]( in1: Inlet[I], in2: Inlet[I2], out1: Outlet[O], out2: Outlet[O2], out3: Outlet[O3]) extends Shape { override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil override def deepCopy(): Shape = TwoThreeShape( in1.carbonCopy(), in2.carbonCopy(), out1.carbonCopy(), out2.carbonCopy(), out3.carbonCopy() ) }//a functional module with 2 input 3 output def TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder => val balancer = builder.add(Balance[I](2)) val flow = builder.add(Flow[I2].map(someProcess[I2, O2])) TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out) } val closedGraph = GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val inp1 = builder.add(Source(List(1,2,3))).out val inp2 = builder.add(Source(List(10,20,30))).out val merge = builder.add(Merge[Int](2)) val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int]) inp1 ~> mod23.in1 inp2 ~> mod23.in2 mod23.out1 ~> merge.in(0) mod23.out2 ~> merge.in(1) mod23.out3 ~> Sink.foreach(println) merge ~> Sink.foreach(println) ClosedShape }}object TailorGraph extends App { import GraphModules._ implicit val sys = ActorSystem("streamSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer() RunnableGraph.fromGraph(closedGraph).run() scala.io.StdIn.readLine() sys.terminate()}
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), b.add(to).in) def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), to.in)...
def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), to)
可以说这是一个相对复杂的数据处理方案,里面甚至包括了数据流回路(feedback)。无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。但用akka GraphDSL可以很形象的组合这个数据流图;
import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => val A: Outlet[Int] = builder.add(Source.single(0)).out val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2)) val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1)) val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2)) val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val G: Inlet[Any] = builder.add(Sink.foreach(println)).in C <~ F A ~> B ~> C ~> F B ~> D ~> E ~> F E ~> G ClosedShape })
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => val B = builder.add(Broadcast[Int](2)) val C = builder.add(Merge[Int](2)) val E = builder.add(Balance[Int](2)) val F = builder.add(Merge[Int](2)) Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0) C.in(0) <~ F.out B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1) E.out(1) ~> Sink.foreach(println) ClosedShape})
val partial = GraphDSL.create() { implicit builder => val B = builder.add(Broadcast[Int](2)) val C = builder.add(Merge[Int](2)) val E = builder.add(Balance[Int](2)) val F = builder.add(Merge[Int](2)) C <~ F B ~> C ~> F B ~> Flow[Int].map(_ + 1) ~> E ~> F FlowShape(B.in, E.out(1)) }.named("partial")
// Convert the partial graph of FlowShape to a Flow to get// access to the fluid DSL (for example to be able to call .filter())val flow = Flow.fromGraph(partial)// Simple way to create a graph backed Sourceval source = Source.fromGraph( GraphDSL.create() { implicit builder => val merge = builder.add(Merge[Int](2)) Source.single(0) ~> merge Source(List(2, 3, 4)) ~> merge // Exposing exactly one output port SourceShape(merge.out)})// Building a Sink with a nested Flow, using the fluid DSLval sink = { val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow") nestedFlow.to(Sink.head)}// Putting all togetherval closed = source.via(flow.filter(_ > 1)).to(sink)
返回运算结果是通过viaMat, toMat来实现的。简写的via,to默认选择流图左边运算产生的结果。