标签:protected 分布式 comment jar 代码示例 模式 public parse submit
在现在这个大数据时代,Hadoop和Spark是最潮流的两个词汇,Hadoop是一种分布式计算框架,由Google提出,主要用于搜索领域,解决海量数据的计算问题,Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,用户只需要实现map函数和reduce函数即可实现分布式计算,非常简单。而近几年Spark新兴框架的产生,以不可挡之势席卷中国,其核心内部结构RDD以超强的弹性机制更加的引人注目!越来越多的人认为Spark终有一天要取代Hadoop,但是事实究竟如何呢,本篇博客将以一个实际的电信业务来阐明自己的观点。
实验所用数据:
具体字段描述:
业务要求:统计同一个用户的上行总流量和,下行总流量和以及上下总流量和
例如:
首先,我们先用Hadoop去实现这个业务,代码示例如下:
package com.appache.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class FlowCount
{
public static String path1 = "hdfs://hadoop11:9000/zmy/flowdata.txt";
public static String path2 = "hdfs://hadoop11:9000/dirout2/";
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://hadoop11:9000/");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}
Job job = new Job(conf,"FlowCount");
job.setJarByClass(FlowCount.class);
//编写驱动
FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//shuffle洗牌阶段
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
//将任务提交给JobTracker
job.waitForCompletion(true);
//查看程序的运行结果
FSDataInputStream fr = fileSystem.open(new Path("hdfs://hadoop11:9000/dirout2/part-r-00000"));
IOUtils.copyBytes(fr,System.out,1024,true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>
{
@Override
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
{
String line = v1.toString();//拿到日志中的一行数据
String[] splited = line.split("\t");//切分各个字段
//获取我们所需要的字段
String msisdn = splited[1];
String upFlow = splited[8];
String downFlow = splited[9];
long flowsum = Long.parseLong(upFlow) + Long.parseLong(downFlow);
context.write(new Text(msisdn), new Text(upFlow+"\t"+downFlow+"\t"+String.valueOf(flowsum)));
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>
{
@Override
protected void reduce(Text k2, Iterable<Text> v2s,Context context)throws IOException, InterruptedException
{
long upFlowSum = 0L;
long downFlowSum = 0L;
long FlowSum = 0L;
for(Text v2:v2s)
{
String[] splited = v2.toString().split("\t");
upFlowSum += Long.parseLong(splited[0]);
downFlowSum += Long.parseLong(splited[1]);
FlowSum += Long.parseLong(splited[2]);
}
String data = String.valueOf(upFlowSum)+"\t"+String.valueOf(downFlowSum)+"\t"+String.valueOf(FlowSum);
context.write(k2,new Text(data));
}
}
}
代码打好jar包后,在集群模式下运行:
[root@hadoop11 mydata]# hadoop jar FlowCount1.jar
查看运行结果:
[root@hadoop11 mydata]# hadoop fs -cat /dirout2/part-r-00000
13480253104 180 180 360
13502468823 7335 110349 117684
13560439658 2034 5892 7926
13600217502 1080 186852 187932
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13760778710 120 120 240
13823070001 360 180 540
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548
好的,接下来我们用Spark去实现这个同样的业务,代码示例如下:
package com.appache.spark.app
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FlowDataCount
{
def main(args: Array[String]): Unit =
{
//先创建配置信息
val conf = new SparkConf()
conf.setAppName("FlowDataCount")
val sc:SparkContext = new SparkContext(conf)
val lines:RDD[String] = sc.textFile("hdfs://hadoop11:9000/zmy/flowdata.txt",1)
val rdd2:RDD[(String,(Integer,Integer))] = lines.flatMap(line=> //拿到日志中的一行数据,切分各个字段,获取我们所需要的字段
{
val arr:Array[String] = line.split("\t")
val msisdn:String = arr.apply(1) //k2
val upFlow:Integer = Integer.valueOf(arr.apply(8))
val downFlow:Integer = Integer.valueOf(arr.apply(9))
val arr2 = Array((msisdn,(upFlow,downFlow)))
arr2
}) //最终的结果:Array( (18330267966,(50,100) ) ,(13403342405,(100,200)) )
val rdd3:RDD[(String,(Integer,Integer))] = rdd2.reduceByKey((updown1,updown2)=> //((50,100),(100,200))
{
val upFlowSum = updown1._1+updown2._1
val downFlowSum = updown1._2+updown2._2
((upFlowSum,downFlowSum)) //(150,300)
})
val rdd4:RDD[(String,Integer,Integer,Int)] = rdd3.map(ele=>(ele._1,ele._2._1,ele._2._2,ele._2._1+ele._2._2))
rdd4.saveAsTextFile("hdfs://hadoop11:9000/dirout/")
sc.stop()
}
}
代码打好jar包后,同样在集群模式下运行:
[root@hadoop11 mydata]# spark-submit --master spark://hadoop11:7077 FlowCount2.jar
查看运行结果:
[root@hadoop11 mydata]# hadoop fs -cat /dirout/part-00000
(84138413,4116,1432,5548)
(13925057413,11058,48243,59301)
(13726230503,2481,24681,27162)
(13922314466,3008,3720,6728)
(13826544101,264,0,264)
(13560439658,2034,5892,7926)
(15920133257,3156,2936,6092)
(13823070001,360,180,540)
(13602846565,1938,2910,4848)
(13926435656,132,1512,1644)
(15013685858,3659,3538,7197)
(13600217502,1080,186852,187932)
(15989002119,1938,180,2118)
(13660577991,6960,690,7650)
(13760778710,120,120,240)
(13719199419,240,0,240)
(13480253104,180,180,360)
(13926251106,240,0,240)
(18320173382,9531,2412,11943)
(18211575961,1527,2106,3633)
(13502468823,7335,110349,117684)
好的,接下来我们做一下对比:
从结果上看,无论是运行时间还是代码量,Spark均占有很大的优势,但是呢,笔者认为:无论是Hadoop中的MapReduce,还是Spark,归根结底都是map和reduce思想的一种实现,如果非要分出优劣的话,笔者给Spark打6开分,给Hadoop打4开分,但是无论是Hadoop还是Spark在笔者的心里都是非常优秀的大数据处理框架!
标签:protected 分布式 comment jar 代码示例 模式 public parse submit
原文地址:http://blog.csdn.net/a2011480169/article/details/53504617