public interface POConverter<IN, OUT, T extends PhysicalOperator> { RDD<OUT> convert(List<RDD<IN>> rdd, T physicalOperator) throws IOException; }抽象类POConvertor提供了convert方法,输入参数中的List<RDD>是本次物理操作的前驱们产生的RDDs,可以认为是会依赖的父RDDs。
走的都是NewHadoopRDD路线。
Load方面是通过POLoad获得文件路径,pigContext获得必要配置信息,然后交由SparkContext调用newAPIHadoopFile来获得NewHadoopRDD,最后把Tuple2<Text, Tuple>的RDD map成只剩value的RDD<Tuple>。
Store方面是先把最近的前驱rdd转会成Key为空Text的Tuple2<Text, Tuple>,然后映射为PairRDDFunctions,借助pigContext生成POStore操作,最后调用RDD的saveAsNewAPIHadoopFile存到HDFS上。
ForEach里实现一个Iterator[T] => Iterator[T]的方法,把foreach转化为rdd.mapPartitions()方法。
Iterator[T]=> Iterator[T]方法的实现,会依赖原本的POForEach来获得nextTuple和进行一些别的操作,来实现一个新的Iterator。
对于hadoop backend的executionengine里的抽象类PhysicalOperator来说,
setInput()和attachInput()方法是放入带处理的tuple数据,
getNextTuple()的时候触发processTuple(),处理对象就是内部的Input Tuple。
所以ForEach操作实现Iterator的时候,在readNext()方法里掺入了以上设置Input数据的操作,在返回前调用getNextTuple()返回处理后的结果。
POFilter也是通过setInput()和attachInput()以及getNextTuple()来返回处理结果。
所以在实现为RDD操作的时候,把以上步骤包装成一个FilterFunction,传入rdd.filter(Function)处理。
POLimit同POFilter是完全一样的。
现在RDD已经直接具备distinct(numPartitions: Int)方法了。
这里的distinct实现同rdd里的distinct逻辑是完全一样的。
第一步:把类型为Tuple的rdd映射成为Tuple2<Tuple, Object>,其中value部分是null的;
第二步:进行rdd.reduceByKey(merge_function, parallelism)操作,merge_function对两个value部分的Object不做任何处理,也就是按key reduce且不对value部分处理;
第三步:对第二步的结果进行rdd.map(function, ClassTag)处理,function为得到Tuple2<Tuple, Object>里的._1,即key值:Tuple。
Union是一次求并过程,直接new UnionRDD<Tuple>返回。
由于UnionRDD处理的是Seq<RDD>,所以使用JavaConversions.asScalaBuffer(List<RDD<Tuple>>)进行一下转换再传入。
Sort过程:
第一步:把Tuple类型的RDD转成Tuple2<Tuple, Object>类型,Object为空
第二步:根据第一步结果,new OrderedRDDFunctions<Tuple, Object,Tuple2<Tuple, Object>>
,其sortByKey方法产出一个排过序的RDD<Tuple2<Tuple, Object>>。OrderedRDDFunctions里的Key类型必须是可排序的,比较器复用的是POSort的mComparator。sortByKey结果返回的是ShuffleRDD,其Partitioner是RangePartitioner,排序之后,每个Partition里存放的都是一个范围内的排过序的值。
第三步:调用rdd.mapPartition(function, xx, xx),function作用为把Iterator<Tuple2<Tuple,Object>>吐成Iterator<Tuple>,即再次取回Key值,此时已有序。
POSplit的处理是直接返回第一个祖先RDD。
LocalRearrange -> Global Rearrange -> Package是一同出现的。
Local rearrange直接依赖
physicalOperator.setInputs(null); physicalOperator.attachInput(t); result = physicalOperator.getNextTuple();
三步得到result。返回的Tuple格式为(index, key, value)。
依赖POLocalRearrange本身内部对input tuple的处理。
待处理的Tuple格式是(index, key, value)。最后结果为(key, { values })
如果父RDD只有一个:
先进行按key进行一次groupBy,得到结果是Tuple2<Object, Seq<Tuple>>
然后做一次map操作,得到(key, { values })形态的RDD,即Tuple<Object, Iterator>
如果父RDD有多个:
让通过rdd的map操作先将Tuple从(index, key, value)转成(key, value)形态,然后把这个rdd集合new成CoGroupRDD,包含一次(Seq) JavaConversions.asScalaBuffer(rddPairs)转化。最后调用CoGroupRDD的map方法,把Tuple2<Object,Seq<Seq<Tuple>>>转化成Tuple<Object, Iterator>,即(key, { values })形态。实际上,CoGroupRDD的map方法内部做的事情,是针对每个Key里的Iterator集合,进行了Iterator之间的合并操作。
Package需要把global rearrange处理后的key, Seq<Tuple>进行group。具体的待处理Tuple结构是这样的:(key, Seq<Tuple>:{(index,key, value without key)})
tuple.get(0)是keyTuple,tuple.get(1)是Iterator<Tuple>,最后返回(key, {values}),即Tuple<Object, Iterator>
Spork: Pig on Spark实现分析,布布扣,bubuko.com
原文地址:http://blog.csdn.net/pelick/article/details/25317899