标签:
在上一篇讨论里我们介绍了Source,它的类型款式是这样的:Process[F[_],O]。Source是通过await函数来产生数据流。await函数款式如下:
def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O]
type Process1[-I,+O] = Process[Env[I,Any]#Is, O]
/** The `Process1` which awaits a single input, emits it, then halts normally. */ def await1[I]: Process1[I, I] = receive1(emit) def receive1[I, O](rcv: I => Process1[I, O]): Process1[I, O] = await(Get[I])(rcv)
import Process._ def multiplyBy(n: Int): Process1[Int,Int] = await1[Int].flatMap { i => emit(i * n) }.repeat //> multiplyBy: (n: Int)scalaz.stream.Process1[Int,Int] def addPosfix: Process1[Int,String] = receive1[Int,String]{ i => emit(i.toString + "!") }.repeat //> addPosfix: => scalaz.stream.Process1[Int,String]
(range(11,16).toSource pipe multiplyBy(5) |> addPosfix).runLog.run //> res0: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)
我们也可以把一个普通函数lift成Process1:
import process1._ (range(11,16).toSource |> lift {(i: Int) => i * 5} |> addPosfix).runLog.run //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)
range(11,16).toSource.flatMap { i => emit(i * 5) }.flatMap { i => emit(i.toString + "!") }.runLog.run //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)
虽然用更直接的方法获得相同结果,但值得注意的是现在这个Source已经是一个特殊的版本了,附加在它上面的这些元素转换特性是无法分割的了。实际上pipe就是Process组合函数,我们用它来把Source和Transducer、Transducer与Transducer对接起来。这样我们就可以保证Source和Transducer都是功能单一的函数组件了。
只要连接上一个数据源,我们就可以对它发出的元素进行转换处理。这些transduce功能函数都在process1对象里:
import process1._ (range(1,6).toSource pipe take(2)) .runLog.run //> res2: Vector[Int] = Vector(1, 2) (range(1,10).toSource |> filter {_ % 2 == 0 } |> collect { case 4 => "the number four" case 5 => "the number five" case 6 => "the number six" case 100 => "the number one hundred" } ).runLog.run //> res3: Vector[String] = Vector(the number four, the number six)
基本上所有对scala标准库List使用的函数都可以对Process1施用:
(range(1,6).toSource |> fold(Nil:List[Int]){ (b,a) => a :: b } ).runLog.run //> res5: Vector[List[Int]] = Vector(List(5, 4, 3, 2, 1)) (range(1,6).toSource |> foldMap { List(_) } ).runLog.run //> res6: Vector[List[Int]] = Vector(List(1, 2, 3, 4, 5)) (range(1,6).toSource |> foldMap { identity } ).runLog.run //> res7: Vector[Int] = Vector(15) (range(1,6).toSource |> sum ).runLog.run //> res8: Vector[Int] = Vector(15) (range(1,6).toSource |> scan(0){(a,b) => a + b} ).runLog.run //> res9: Vector[Int] = Vector(0, 1, 3, 6, 10, 15)
我们也可以把一串现成的元素插入一个Process1:
(range(1,6).toSource |> feed(6 to 10)(lift(identity)) ).runLog.run //> res10: Vector[Int] = Vector(6, 7, 8, 9, 10, 1, 2, 3, 4, 5) (range(1,6).toSource |> feed(6 to 10)(lift(identity)) |> foldMap {identity} ).runLog.run //> res11: Vector[Int] = Vector(55)
/** * A stream transducer that can read from one of two inputs, * the 'left' (of type `I`) or the 'right' (of type `I2`). * `Process1[I,O] <: Tee[I,I2,O]`. */ type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]
/** * Awaits to receive input from Left side, * than if that request terminates with `End` or is terminated abnormally * runs the supplied `continue` or `cleanup`. * Otherwise `rcv` is run to produce next state. * * If you don't need `continue` or `cleanup` use rather `awaitL.flatMap` */ def receiveL[I, I2, O](rcv: I => Tee[I, I2, O]): Tee[I, I2, O] = await[Env[I, I2]#T, I, O](L)(rcv) /** * Awaits to receive input from Right side, * than if that request terminates with `End` or is terminated abnormally * runs the supplied continue. * Otherwise `rcv` is run to produce next state. * * If you don't need `continue` or `cleanup` use rather `awaitR.flatMap` */ def receiveR[I, I2, O](rcv: I2 => Tee[I, I2, O]): Tee[I, I2, O] = await[Env[I, I2]#T, I2, O](R)(rcv)
case class Env[-I, -I2]() { sealed trait Y[-X] { def tag: Int def fold[R](l: => R, r: => R, both: => R): R } sealed trait T[-X] extends Y[X] sealed trait Is[-X] extends T[X] case object Left extends Is[I] { def tag = 0 def fold[R](l: => R, r: => R, both: => R): R = l } case object Right extends T[I2] { def tag = 1 def fold[R](l: => R, r: => R, both: => R): R = r } case object Both extends Y[ReceiveY[I, I2]] { def tag = 2 def fold[R](l: => R, r: => R, both: => R): R = both } } private val Left_ = Env[Any, Any]().Left private val Right_ = Env[Any, Any]().Right private val Both_ = Env[Any, Any]().Both def Get[I]: Env[I, Any]#Is[I] = Left_ def L[I]: Env[I, Any]#Is[I] = Left_ def R[I2]: Env[Any, I2]#T[I2] = Right_ def Both[I, I2]: Env[I, I2]#Y[ReceiveY[I, I2]] = Both_
L[I1],R[I2],Get[I]都没什么实际作用,它们是为了compiler类型推导而设。tee的顺序特性是指我们可以用receiveL,receiveR来指定从那边输入元素。可以想象tee的主要作用应该是合并两个数据源发出的元素。tee的数据合并操作方式基本上是按下面这个tee函数款式进行的:
/** * Use a `Tee` to interleave or combine the outputs of `this` and * `p2`. This can be used for zipping, interleaving, and so forth. * Nothing requires that the `Tee` read elements from each * `Process` in lockstep. It could read fifty elements from one * side, then two elements from the other, then combine or * interleave these values in some way, etc. * * If at any point the `Tee` awaits on a side that has halted, * we gracefully kill off the other side, then halt. * * If at any point `t` terminates with cause `c`, both sides are killed, and * the resulting `Process` terminates with `c`. */ final def tee[F2[x] >: F[x], O2, O3](p2: Process[F2, O2])(t: Tee[O, O2, O3]): Process[F2, O3]
以下是几个常用的tee功能函数:
/** Alternate emitting elements from `this` and `p2`, starting with `this`. */ def interleave[F2[x] >: F[x], O2 >: O](p2: Process[F2, O2]): Process[F2, O2] = this.tee(p2)(scalaz.stream.tee.interleave[O2]) /** Call `tee` with the `zipWith` `Tee[O,O2,O3]` defined in `tee.scala`. */ def zipWith[F2[x] >: F[x], O2, O3](p2: Process[F2, O2])(f: (O, O2) => O3): Process[F2, O3] = this.tee(p2)(scalaz.stream.tee.zipWith(f)) /** Call `tee` with the `zip` `Tee[O,O2,O3]` defined in `tee.scala`. */ def zip[F2[x] >: F[x], O2](p2: Process[F2, O2]): Process[F2, (O, O2)] = this.tee(p2)(scalaz.stream.tee.zip) /** * When `condition` is `true`, lets through any values in `this` process, otherwise blocks * until `condition` becomes true again. Note that the `condition` is checked before * each and every read from `this`, so `condition` should return very quickly or be * continuous to avoid holding up the output `Process`. Use `condition.forwardFill` to * convert an infrequent discrete `Process` to a continuous one for use with this * function. */ def when[F2[x] >: F[x], O2 >: O](condition: Process[F2, Boolean]): Process[F2, O2] = condition.tee(this)(scalaz.stream.tee.when) /** * Halts this `Process` as soon as `condition` becomes `true`. Note that `condition` * is checked before each and every read from `this`, so `condition` should return * very quickly or be continuous to avoid holding up the output `Process`. Use * `condition.forwardFill` to convert an infrequent discrete `Process` to a * continuous one for use with this function. */ def until[F2[x] >: F[x], O2 >: O](condition: Process[F2, Boolean]): Process[F2, O2] = condition.tee(this)(scalaz.stream.tee.until)
下面是它们的具体实现方法:
/** A `Tee` which ignores all input from left. */ def passR[I2]: Tee[Any, I2, I2] = awaitR[I2].repeat /** A `Tee` which ignores all input from the right. */ def passL[I]: Tee[I, Any, I] = awaitL[I].repeat /** Echoes the right branch until the left branch becomes `true`, then halts. */ def until[I]: Tee[Boolean, I, I] = awaitL[Boolean].flatMap(kill => if (kill) halt else awaitR[I] ++ until) /** Echoes the right branch when the left branch is `true`. */ def when[I]: Tee[Boolean, I, I] = awaitL[Boolean].flatMap(ok => if (ok) awaitR[I] ++ when else when) /** Defined as `zipWith((_,_))` */ def zip[I, I2]: Tee[I, I2, (I, I2)] = zipWith((_, _)) /** Defined as `zipWith((arg,f) => f(arg)` */ def zipApply[I,I2]: Tee[I, I => I2, I2] = zipWith((arg,f) => f(arg)) /** A version of `zip` that pads the shorter stream with values. */ def zipAll[I, I2](padI: I, padI2: I2): Tee[I, I2, (I, I2)] = zipWithAll(padI, padI2)((_, _))
import tee._ val source = range(1,6).toSource //> source : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>)) val seq = emitAll(Seq("a","b","c")) //> seq : scalaz.stream.Process0[String] = Emit(List(a, b, c)) val signalw = Process(true,true,false,true) //> signalw : scalaz.stream.Process0[Boolean] = Emit(WrappedArray(true, true, false, true)) val signalu = Process(false,true,false,true) //> signalu : scalaz.stream.Process0[Boolean] = Emit(WrappedArray(false, true,false, true)) source.tee(seq)(interleave).runLog.run //> res12: Vector[Any] = Vector(1, a, 2, b, 3, c) (source interleave seq).runLog.run //> res13: Vector[Any] = Vector(1, a, 2, b, 3, c) signalu.tee(source)(until).runLog.run //> res14: Vector[Int] = Vector(1) signalw.tee(source)(when).runLog.run //> res15: Vector[Int] = Vector(1, 2, 3) source.tee(seq)(passL).runLog.run //> res16: Vector[Int] = Vector(1, 2, 3, 4, 5) source.tee(seq)(passR).runLog.run //> res17: Vector[String] = Vector(a, b, c) (source zip seq).runLog.run //> res18: Vector[(Int, String)] = Vector((1,a), (2,b), (3,c)) (seq zip source).runLog.run //> res19: Vector[(String, Int)] = Vector((a,1), (b,2), (c,3)) (source.zipWith(seq){(a,b) => a.toString + b}).runLog.run //> res20: Vector[String] = Vector(1a, 2b, 3c)
与Process1同样,我们也可以对tee注入一串元素,这次我们用feedL和feedR:
/** Feed a sequence of inputs to the left side of a `Tee`. */ def feedL[I, I2, O](i: Seq[I])(p: Tee[I, I2, O]): Tee[I, I2, O] = {...} /** Feed a sequence of inputs to the right side of a `Tee`. */ def feedR[I, I2, O](i: Seq[I2])(p: Tee[I, I2, O]): Tee[I, I2, O] = {...}
val ltee = tee.feedL(Seq(1,2,3))(id[Int]) //> ltee : scalaz.stream.Tee[Int,Any,Int] = Append(Emit(Vector(1, 2)),Vector(<function1>)) halt.tee[Task,Int,Int](halt)(ltee).runLog.run //> res21: Vector[Int] = Vector(1, 2, 3) source.tee[Task,Int,Int](halt)(ltee).runLog.run //> res22: Vector[Int] = Vector(1, 2, 3, 1, 2, 3, 4, 5)
/** * Like `tee`, but we allow the `Wye` to read non-deterministically * from both sides at once. * * If `y` is in the state of awaiting `Both`, this implementation * will continue feeding `y` from either left or right side, * until either it halts or _both_ sides halt. * * If `y` is in the state of awaiting `L`, and the left * input has halted, we halt. Likewise for the right side. * * For as long as `y` permits it, this implementation will _always_ * feed it any leading `Emit` elements from either side before issuing * new `F` requests. More sophisticated chunking and fairness * policies do not belong here, but should be built into the `Wye` * and/or its inputs. * * The strategy passed in must be stack-safe, otherwise this implementation * will throw SOE. Preferably use one of the `Strategys.Executor(es)` based strategies */ final def wye[O2, O3](p2: Process[Task, O2])(y: Wye[O, O2, O3])(implicit S: Strategy): Process[Task, O3] = scalaz.stream.wye[O, O2, O3](self, p2)(y)(S)
/** * After each input, dynamically determine whether to read from the left, right, or both, * for the subsequent input, using the provided functions `f` and `g`. The returned * `Wye` begins by reading from the left side and is left-biased--if a read of both branches * returns a `These(x,y)`, it uses the signal generated by `f` for its next step. */ def dynamic[I,I2](f: I => wye.Request, g: I2 => wye.Request): Wye[I,I2,ReceiveY[I,I2]] = { import scalaz.stream.wye.Request._ def go(signal: wye.Request): Wye[I,I2,ReceiveY[I,I2]] = signal match { case L => receiveL { i => emit(ReceiveL(i)) ++ go(f(i)) } case R => receiveR { i2 => emit(ReceiveR(i2)) ++ go(g(i2)) } case Both => receiveBoth { case t@ReceiveL(i) => emit(t) ++ go(f(i)) case t@ReceiveR(i2) => emit(t) ++ go(g(i2)) case HaltOne(rsn) => Halt(rsn) } } go(L) } /** * Non-deterministic interleave of both inputs. Emits values whenever either * of the inputs is available. * * Will terminate once both sides terminate. */ def merge[I]: Wye[I,I,I] = receiveBoth { case ReceiveL(i) => emit(i) ++ merge case ReceiveR(i) => emit(i) ++ merge case HaltL(End) => awaitR.repeat case HaltR(End) => awaitL.repeat case HaltOne(rsn) => Halt(rsn) } /** * Nondeterminstic interleave of both inputs. Emits values whenever either * of the inputs is available. */ def either[I,I2]: Wye[I,I2,I \/ I2] = receiveBoth { case ReceiveL(i) => emit(left(i)) ++ either case ReceiveR(i) => emit(right(i)) ++ either case HaltL(End) => awaitR[I2].map(right).repeat case HaltR(End) => awaitL[I].map(left).repeat case h@HaltOne(rsn) => Halt(rsn) }
import wye._ source.wye(seq)(either).runLog.run //> res23: Vector[scalaz.\/[Int,String]] = Vector(-\/(1), \/-(a), \/-(b), \/-(c), -\/(2), -\/(3), -\/(4), -\/(5)) (source either seq).runLog.run //> res24: Vector[scalaz.\/[Int,String]] = Vector(-\/(1), \/-(a), \/-(b), \/-(c), -\/(2), -\/(3), -\/(4), -\/(5)) source.wye(seq)(merge).runLog.run //> res25: Vector[Any] = Vector(1, a, b, c, 2, 3, 4, 5) (source merge seq).runLog.run //> res26: Vector[Any] = Vector(1, a, b, c, 2, 3, 4, 5)
val w = dynamic((r:Int) => Request.R, (l:String) => Request.L) //> w : scalaz.stream.Wye[Int,String,scalaz.stream.ReceiveY[Int,String]] = Await(Left,<function1>,<function1>) source.wye(seq)(w).runLog.run //> res27: Vector[scalaz.stream.ReceiveY[Int,String]] = Vector(ReceiveL(1), ReceiveR(a), ReceiveL(2), ReceiveR(b), ReceiveL(3), ReceiveR(c), ReceiveL(4)) val fw = dynamic((r: Int) => if (r % 3 == 0) { Request.R } else {Request.L}, (l:String) => Request.L) //> fw : scalaz.stream.Wye[Int,String,scalaz.stream.ReceiveY[Int,String]] = Await(Left,<function1>,<function1>) source.wye(seq)(fw).runLog.run //> res28: Vector[scalaz.stream.ReceiveY[Int,String]] = Vector(ReceiveL(1), ReceiveL(2), ReceiveL(3), ReceiveR(a), ReceiveL(4), ReceiveL(5))
val lwye = wye.feedL(Seq(1,2,3))(id[Int]) //> lwye : scalaz.stream.Wye[Int,Any,Int] = Append(Emit(Vector(1, 2)),Vector(< //| function1>)) halt.wye(halt)(lwye).runLog.run //> res29: Vector[Int] = Vector(1, 2, 3) source.wye(halt)(lwye).runLog.run //> res30: Vector[Int] = Vector(1, 2, 3, 1, 2, 3, 4, 5)
Scalaz(47)- scalaz-stream: 深入了解-Transducer: Process1-tee-wye
标签:
原文地址:http://blog.csdn.net/tiger_xc/article/details/51930574