内容:
1、基础排序算法实战;
2、二次排序算法实战;
3、更高局级别排序算法;
4、排序算法内幕解密;
为啥讲排序?因为在应用的时候都有排序要求。
海量数据经常排序之后要我们想要的内容。
==========基础排序算法============
scala> sc.setLogLevel("WARN")
scala> val x = sc.textFile("/historyserverforSpark/README.md", 3).flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_,1)
x: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:27
scala> x.map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).collect
16/02/08 20:34:00 WARN client.AppClient$ClientEndpoint: Connection to Worker2:7077 failed; waiting for master to reconnect...
16/02/08 20:34:01 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
16/02/08 20:34:01 WARN client.AppClient$ClientEndpoint: Connection to Worker2:7077 failed; waiting for master to reconnect...
res11: Array[(String, Int)] = Array(("",67), (the,21), (to,14), (Spark,13), (for,11), (and,10), (##,8), (a,8), (run,7), (is,6), (can,6), (on,5), (of,5), (in,5), (if,4), (also,4), (you,4), (build,3), (including,3), (Please,3), (use,3), (or,3), (Hadoop,3), (documentation,3), (example,3), (an,3), (You,3), (with,3), (For,2), (This,2), (Hive,2), (To,2), (SparkPi,2), (refer,2), (Interactive,2), (be,2), (./bin/run-example,2), (1000:,2), (tests,2), (examples,2), (at,2), (using,2), (Shell,2), (class,2), (`examples`,2), (set,2), (Hadoop,,2), (cluster,2), (supports,2), (Python,2), (particular,2), (general,2), (locally,2), (following,2), (which,2), (should,2), ([project,2), (do,2), (how,2), (It,2), (Scala,2), (detailed,2), (return,2), (one,2), (Python,,2), (building,2), (that,2), (SQL,2), (guidance...
/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
// TODO: this currently doesn‘t work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
reduceByKey(_+_,1),RDD的并发是继承的
==========二次排序实战============
所谓二次排序就是排序的时候考虑两个纬度
建立HelloSort.txt,内容:
2 3
4 1
3 2
4 3
9 7
2 1
第一列有2个4,无法排了,这个时候就要借助第二列了,这个就是二次排序
二次排序知识入门,如果是10次排序,20次排序,100次排序呢?怎么办?所以二次排序要把这些都考虑了。。。
自定义Key才是以不变应万变!
JAVA版本代码
package com.dt.spark.SparkApps.cores;
import java.io.Serializable;
import scala.math.Ordered;
/**
* 自定义二次排序
* @author 威
*
*/
public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable {
//需要二次排序的key
private int first;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
private int second;
public SecondarySortKey(int first,int second){
this.first = first;
this.second = second;
}
@Override
public boolean $greater(SecondarySortKey other) {
if(this.first>other.getFirst()){
return true;
}else if(this.first == other.getFirst()&&this.second>other.getSecond()){
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondarySortKey other) {
if(this.$greater(other)){
return true;
}else if(this.first==other.getFirst()&&this.second==other.getSecond()){
return true;
}
return false;
}
@Override
public boolean $less(SecondarySortKey other) {
if(this.first<other.getFirst()){
return true;
}else if(this.first==other.getFirst()&&this.second<other.getSecond()){
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondarySortKey other) {
if(this.$less(other)){
return true;
}else if(this.first==other.getFirst()&&this.second==other.getSecond()){
return true;
}
return false;
}
@Override
public int compare(SecondarySortKey other) {
if(this.first-other.getFirst() != 0){
return this.first-other.getFirst();
}else{
return this.second-other.getSecond();
}
}
@Override
public int compareTo(SecondarySortKey other) {
if(this.first-other.getFirst() != 0){
return this.first-other.getFirst();
}else{
return this.second-other.getSecond();
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondarySortKey other = (SecondarySortKey) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
}
package com.dt.spark.SparkApps.cores;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* 二次排序
* @author 威
* 第一步:按照ordered和 Serializable接口实现自定义排序的key
* 第二步:将要进行二次排序的文件加载进来生成key,value类型的RDD
* 第三步:基于自定义的使用sortByKey进行二次排序
* 第四步:去除掉排序的key,只保留排序的结果
*/
public class SecondarySortApp {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SecondarySortApp" ).setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);// 其地产曾实际上就是Scala的SparkContext
JavaRDD<String> lines = sc.textFile("F:/exepro/scala/HelloSort.txt" );
JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair( new PairFunction<String, SecondarySortKey, String>() {
@Override
public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
String[] splited = line.split( " ");
SecondarySortKey key = new SecondarySortKey(Integer.valueOf(splited[0]),Integer. valueOf(splited[1]));
return new Tuple2<SecondarySortKey,String>(key,line);
}
});
JavaPairRDD<SecondarySortKey, String> sorted = pairs.sortByKey();
//过滤掉排序后自定义的key,保留排序的结果
JavaRDD<String> secondarySorted = sorted .map(new Function<Tuple2<SecondarySortKey,String>, String>() {
@Override
public String call(Tuple2<SecondarySortKey, String> sortedContent) throws Exception {
return sortedContent ._2 ;
}
});
secondarySorted.foreach(new VoidFunction<String>() {
@Override
public void call(String str) throws Exception {
// TODO Auto-generated method stub
System. out.println(str );
}
});
}
}
结果:
16/02/08 21:37:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
2 1
2 3
3 2
4 1
4 3
9 7
16/02/08 21:37:49 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
Scala版本代码:
package com.dt.spark.cores
/**
* 自定义二次排序
* Created by 威 on 2016/2/9.
*/
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey): Int ={
if(this.first-other.first!=0){
this.first-other.first
}else{
this.second-other.second
}
}
}
package com.dt.spark.cores
import org.apache.spark.{SparkContext, SparkConf}
/**
* 二次排序
*
* @author 威
* 第一步:按照ordered和Serializable接口实现自定义排序的key
* 第二步:将要进行二次排序的文件加载进来生成key,value类型的RDD
* 第三步:基于自定义的使用sortByKey进行二次排序
* 第四步:去除掉排序的key,只保留排序的结果
*/
object SecondarySortApp {
def main(args: Array[String]) {
val conf = new SparkConf()//创建SparkConf对象
conf.setAppName("SecondarySortApp")//设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local")//此时程序在本地运行,不需要安装Spark集群
val sc = new SparkContext(conf)//通过创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
val lines = sc.textFile("F:/exepro/scala/HelloSort.txt")//通过HadoopRDD以及MapPartitionRDD获取文件中每一行的内容本身
val pairWithSortKey = lines.map(line=>{
val splited = line.split(" ")
(new SecondarySortKey(splited(0).toInt,splited(1).toInt),line)
})
val sorted = pairWithSortKey.sortByKey(false)
val secondarySorted = sorted.map(sortedLine=>sortedLine._2)
secondarySorted.collect().foreach(println)
}
}
Scala太简洁了。
作业:
1、用Scala实现二次排序(三次排序),key用object的apply来实现;
2、自己去阅读RangePartitioner的源代码;
本文出自 “一枝花傲寒” 博客,谢绝转载!
原文地址:http://feiweihy.blog.51cto.com/6389397/1743584