码迷,mamicode.com
首页 > 其他好文 > 详细

大数据时代到底Hadoop和Spark谁是王者!

时间:2016-12-07 18:22:29      阅读:206      评论:0      收藏:0      [点我收藏+]

标签:protected   分布式   comment   jar   代码示例   模式   public   parse   submit   

在现在这个大数据时代,Hadoop和Spark是最潮流的两个词汇,Hadoop是一种分布式计算框架,由Google提出,主要用于搜索领域,解决海量数据的计算问题,Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,用户只需要实现map函数和reduce函数即可实现分布式计算,非常简单。而近几年Spark新兴框架的产生,以不可挡之势席卷中国,其核心内部结构RDD以超强的弹性机制更加的引人注目!越来越多的人认为Spark终有一天要取代Hadoop,但是事实究竟如何呢,本篇博客将以一个实际的电信业务来阐明自己的观点。
实验所用数据:
技术分享
具体字段描述:
技术分享
业务要求:统计同一个用户的上行总流量和,下行总流量和以及上下总流量和
例如:
技术分享
首先,我们先用Hadoop去实现这个业务,代码示例如下:

package com.appache.hadoop;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class FlowCount
{
    public static String path1 = "hdfs://hadoop11:9000/zmy/flowdata.txt";
    public static String path2 = "hdfs://hadoop11:9000/dirout2/";
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        conf.set("fs.default.name","hdfs://hadoop11:9000/");
        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(new Path(path2)))
        {
            fileSystem.delete(new Path(path2), true);
        }       
        Job job = new Job(conf,"FlowCount");
        job.setJarByClass(FlowCount.class);
        //编写驱动
        FileInputFormat.setInputPaths(job, new Path(path1));
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //shuffle洗牌阶段
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(path2));   
        //将任务提交给JobTracker
        job.waitForCompletion(true);
        //查看程序的运行结果
        FSDataInputStream fr = fileSystem.open(new Path("hdfs://hadoop11:9000/dirout2/part-r-00000"));
        IOUtils.copyBytes(fr,System.out,1024,true);
    }
    public static class MyMapper   extends Mapper<LongWritable, Text, Text, Text>
    {
        @Override
        protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
        {
            String line = v1.toString();//拿到日志中的一行数据
            String[] splited = line.split("\t");//切分各个字段
            //获取我们所需要的字段
            String msisdn = splited[1];
            String upFlow = splited[8];
            String downFlow = splited[9];
            long flowsum = Long.parseLong(upFlow) + Long.parseLong(downFlow);
            context.write(new Text(msisdn), new Text(upFlow+"\t"+downFlow+"\t"+String.valueOf(flowsum)));
        }
    }
    public static class MyReducer  extends Reducer<Text, Text, Text, Text>
    {
        @Override
        protected void reduce(Text k2, Iterable<Text> v2s,Context context)throws IOException, InterruptedException
        {
           long upFlowSum = 0L;
           long downFlowSum = 0L;
           long FlowSum = 0L;
           for(Text v2:v2s)
           {
               String[] splited = v2.toString().split("\t");
               upFlowSum += Long.parseLong(splited[0]);
               downFlowSum += Long.parseLong(splited[1]);
               FlowSum += Long.parseLong(splited[2]);
           }
           String data = String.valueOf(upFlowSum)+"\t"+String.valueOf(downFlowSum)+"\t"+String.valueOf(FlowSum);
           context.write(k2,new Text(data));
        }
    }
}

代码打好jar包后,在集群模式下运行:

[root@hadoop11 mydata]# hadoop jar FlowCount1.jar

查看运行结果:

[root@hadoop11 mydata]# hadoop fs -cat /dirout2/part-r-00000
13480253104     180     180     360
13502468823     7335    110349  117684
13560439658     2034    5892    7926
13600217502     1080    186852  187932
13602846565     1938    2910    4848
13660577991     6960    690     7650
13719199419     240     0       240
13726230503     2481    24681   27162
13760778710     120     120     240
13823070001     360     180     540
13826544101     264     0       264
13922314466     3008    3720    6728
13925057413     11058   48243   59301
13926251106     240     0       240
13926435656     132     1512    1644
15013685858     3659    3538    7197
15920133257     3156    2936    6092
15989002119     1938    180     2118
18211575961     1527    2106    3633
18320173382     9531    2412    11943
84138413        4116    1432    5548

好的,接下来我们用Spark去实现这个同样的业务,代码示例如下:

package com.appache.spark.app

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FlowDataCount
{
  def main(args: Array[String]): Unit =
  {
    //先创建配置信息
    val conf = new SparkConf()
    conf.setAppName("FlowDataCount")
    val sc:SparkContext = new SparkContext(conf)

    val lines:RDD[String] = sc.textFile("hdfs://hadoop11:9000/zmy/flowdata.txt",1)
    val rdd2:RDD[(String,(Integer,Integer))] = lines.flatMap(line=>   //拿到日志中的一行数据,切分各个字段,获取我们所需要的字段
    {
      val arr:Array[String] = line.split("\t")
      val msisdn:String = arr.apply(1)  //k2
    val upFlow:Integer = Integer.valueOf(arr.apply(8))
      val downFlow:Integer = Integer.valueOf(arr.apply(9))
      val arr2 = Array((msisdn,(upFlow,downFlow)))
      arr2
    }) //最终的结果:Array(   (18330267966,(50,100) )  ,(13403342405,(100,200))  )

    val rdd3:RDD[(String,(Integer,Integer))] = rdd2.reduceByKey((updown1,updown2)=>  //((50,100),(100,200))
    {
      val upFlowSum = updown1._1+updown2._1
      val downFlowSum = updown1._2+updown2._2
      ((upFlowSum,downFlowSum)) //(150,300)
    })
    val rdd4:RDD[(String,Integer,Integer,Int)] = rdd3.map(ele=>(ele._1,ele._2._1,ele._2._2,ele._2._1+ele._2._2))
    rdd4.saveAsTextFile("hdfs://hadoop11:9000/dirout/")
    sc.stop()
  }
}

代码打好jar包后,同样在集群模式下运行:

[root@hadoop11 mydata]# spark-submit --master spark://hadoop11:7077 FlowCount2.jar

查看运行结果:

[root@hadoop11 mydata]# hadoop fs -cat /dirout/part-00000
(84138413,4116,1432,5548)
(13925057413,11058,48243,59301)
(13726230503,2481,24681,27162)
(13922314466,3008,3720,6728)
(13826544101,264,0,264)
(13560439658,2034,5892,7926)
(15920133257,3156,2936,6092)
(13823070001,360,180,540)
(13602846565,1938,2910,4848)
(13926435656,132,1512,1644)
(15013685858,3659,3538,7197)
(13600217502,1080,186852,187932)
(15989002119,1938,180,2118)
(13660577991,6960,690,7650)
(13760778710,120,120,240)
(13719199419,240,0,240)
(13480253104,180,180,360)
(13926251106,240,0,240)
(18320173382,9531,2412,11943)
(18211575961,1527,2106,3633)
(13502468823,7335,110349,117684)

好的,接下来我们做一下对比:
技术分享
从结果上看,无论是运行时间还是代码量,Spark均占有很大的优势,但是呢,笔者认为:无论是Hadoop中的MapReduce,还是Spark,归根结底都是map和reduce思想的一种实现,如果非要分出优劣的话,笔者给Spark打6开分,给Hadoop打4开分,但是无论是Hadoop还是Spark在笔者的心里都是非常优秀的大数据处理框架!

大数据时代到底Hadoop和Spark谁是王者!

标签:protected   分布式   comment   jar   代码示例   模式   public   parse   submit   

原文地址:http://blog.csdn.net/a2011480169/article/details/53504617

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!