码迷,mamicode.com
首页 > 其他好文 > 详细

Spark Streaming编程示例

时间:2015-08-08 01:16:50      阅读:152      评论:0      收藏:0      [点我收藏+]

标签:

近期也有开始研究使用spark streaming来实现流式处理。本文以流式计算word count为例,简单描述如何进行spark streaming编程。

1. 依赖的jar包

参考《分别用Eclipse和IDEA搭建Scala+Spark开发环境》一文,pom.xml中指定依赖库spark-streaming_2.10.jar。

<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- Spark -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>
    <!-- HDFS -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    </dependency>

2. WordCount代码示例

监听socket端口,每5秒统计一次收到的文本的单词数量,并输出到屏幕。

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

/**
 * Spark Streaming示例,统计输入中所有单词出现的次数
 *
 */
object StreamingWordCount {
    def main(args: Array[String]) {
        if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
        }

        // Create the context with a 5 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        // Create a socket stream on target ip:port and count the
        // words in input stream of \n delimited text
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

3. 提交任务和监听集群

socketTextStream是从监听service的socket端口。

(1)Job如何提交:
$SPARK_HOME/bin/spark-submit --name StreamingDemo --class StreamingWordCount ./sparktest-1.0-SNAPSHOT.jar localhost 1234

(2)监听socket端口:

nc -lk 1234

 

Spark Streaming编程示例

标签:

原文地址:http://www.cnblogs.com/simplestupid/p/4712317.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!