码迷,mamicode.com
首页 > 其他好文 > 详细

如何构建第一个Spark项目代码

时间:2016-04-29 16:02:56      阅读:236      评论:0      收藏:0      [点我收藏+]

标签:

如何构建第一个Spark项目代码


环境准备

本地环境

  1. 操作系统
    Window7/Mac
  2. IDE
    IntelliJ IDEA Community Edition 14.1.6
    下载地址
  3. JDK 1.8.0_65
    下载地址
  4. Scala 2.11.7
    下载地址

其它环境

  1. Spark:1.4.1
    下载地址
  2. Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2

IDE项目创建

新建一个项目

  1. New Project
    技术分享
  2. 使用Maven模型创建一个Scala项目
    技术分享
  3. 填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目
    技术分享
    项目创建完成后,目录结构如下
    技术分享
    4.为项目添加JDK以及Scala SDK
    点击File->Project Structure,在SDKS和Global Libraries中为项目配置环境。
    技术分享
    至此整个项目结构、项目环境都搭建好了

编写主函数

主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现

MyRouteBuild
MyRouteMain

这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMainDirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下

package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("...")
      System.exit(1)
    }
    //StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。

修改pom.xml,为项目打包做准备

pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>1.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>1.4.1</version>
</dependency>

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.10.4</version>
</dependency>

除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xmlbulid标签中写入以下配置:

<plugins>
      <!-- Plugin to create a single jar that includes all dependencies -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.0.2</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compile</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>      
    </plugins>

pom.xml文件修改完成后,即可开始maven打包,操作如图:
技术分享
点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package
技术分享

Spark作业提交

在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
jar包导到Spark服务器,运行Spark作业,运行操作如下

../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic

利用spark-submit把任务提交到Yarn集群,即可看到运行结果。

如何构建第一个Spark项目代码

标签:

原文地址:http://blog.csdn.net/huochen1994/article/details/51275777

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!