标签:ram info input cli file sub textfile pack foreach
配置文件: pom.xml
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
</properties>
<repositories>
<!--添加cloudera仓库依赖, CDH版本是cloudera仓库下的-->
<repository>
<id>cloudera</id>
<name>cloudera</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<!--添加scala依赖-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--添加spark-code的依赖,scala版本2.11-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--添加hadoop-client的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
测试代码: WordCountApp.scala
package com.ruozedata
import org.apache.spark.{SparkConf, SparkContext}
object WordCountApp extends App {
val conf = new SparkConf()
val sc = new SparkContext(conf)
//输入(用args()传入参数,非硬编码)
val dataFile = sc.textFile(args(0))
//业务逻辑
val outputFile = dataFile.flatMap(_.split(",")).map((_,1)) //.collect().foreach(println)
//输出文件
outputFile.saveAsTextFile(args(1))
//关闭流(输入流)
sc.stop()
}
?
?
?
?
?
?
?
?
本地提交到服务器: (在脚本中配置)
$ /home/hadoop/app/spark/bin/spark-submit --class com.ruozedata.WordCountApp --master local[2] --name WordCountApp /home/hadoop/lib/spark/SparkCodeApp-1.0.jar /wc_input/ /wc_output
具体配置参考Spark官网:
http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
http://spark.apache.org/docs/2.2.0/configuration.html
http://spark.apache.org/docs/2.2.0/submitting-applications.html
标签:ram info input cli file sub textfile pack foreach
原文地址:https://www.cnblogs.com/suixingc/p/spark-ying-yong-cheng-xu-kai-fa-liu-cheng.html