码迷,mamicode.com
首页 > 编程语言 > 详细

将java开发的wordcount程序部署到spark集群上运行

时间:2016-01-07 13:12:49      阅读:231      评论:0      收藏:0      [点我收藏+]

标签:

 1 package cn.spark.study.core;
 2 
 3 import java.util.Arrays;
 4 
 5 import org.apache.spark.SparkConf;
 6 import org.apache.spark.api.java.JavaPairRDD;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 import org.apache.spark.api.java.function.FlatMapFunction;
10 import org.apache.spark.api.java.function.Function2;
11 import org.apache.spark.api.java.function.PairFunction;
12 import org.apache.spark.api.java.function.VoidFunction;
13 
14 import scala.Tuple2;
15 
16 /**
17  * 将java开发的wordcount程序部署到spark集群上运行
18  * @author Administrator
19  *
20  */
21 public class WordCountCluster {
22     public static void main(String[] args) {
23         
24         
25         // 如果要在spark集群上运行,需要修改的,只有两个地方
26         // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接
27         // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件
28         
29         // 实际执行步骤:
30         // 1、将spark.txt文件上传到hdfs上去
31         // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
32         // 3、将打包后的spark工程jar包,上传到机器上执行
33         // 4、编写spark-submit脚本
34         // 5、执行spark-submit脚本,提交spark应用到集群执行
35         SparkConf conf = new SparkConf()
36         .setAppName("WordCountCluster");
37         
38         JavaSparkContext sc = new JavaSparkContext(conf);
39         
40         
41         JavaRDD<String> lines = sc.textFile("hdfs://hadoop:9000/test/spark.txt");
42         
43         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
44 
45             private static final long serialVersionUID = 1L;
46 
47             @Override
48             public Iterable<String> call(String line) throws Exception {
49                 return Arrays.asList(line.split(" "));
50             }
51         });
52         
53         JavaPairRDD<String, Integer> pairs = words.mapToPair(new  PairFunction<String, String, Integer>() {
54             private static final long serialVersionUID = 1L;
55 
56             @Override
57             public Tuple2<String, Integer> call(String word) throws Exception {
58                 return new Tuple2<String, Integer>(word, 1);
59             }
60         });
61         
62         JavaPairRDD<String, Integer> wordsCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
63             
64             private static final long serialVersionUID = 1L;
65 
66             @Override
67             public Integer call(Integer v1, Integer v2) throws Exception {
68                 return v1 + v2;
69             }
70         });
71         
72         wordsCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
73             
74             private static final long serialVersionUID = 1L;
75 
76             @Override
77             public void call(Tuple2<String, Integer> tuple) throws Exception {
78                 System.out.println(tuple._1+":"+tuple._2);
79             }
80         });
81         
82         sc.close();
83         
84     }
85 
86 }

 

spark-submit脚本

1 /usr/local/spark/bin/spark-submit 2 --class cn.spark.sparktest.core.WordCountCluster 3 --num-executors 3 4 --driver-memory 100m 5 --executor-memory 100m 6 --executor-cores 3 7 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

 集群环境下要加

--master spark://192.168.1.107:7077

将java开发的wordcount程序部署到spark集群上运行

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5109269.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!