码迷,mamicode.com
首页 > 其他好文 > 详细

2020年寒假学习进度第六天

时间:2020-02-06 14:42:22      阅读:107      评论:0      收藏:0      [点我收藏+]

标签:日志采集   start   string   from   字符   xtend   user   nts   agent   

今天主要进行了spark实验六的学习,Spark Streaming 编程初级实践

Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写的 Spark Streaming 应用程序对消息进行处理
⑴配置 Flume 数据源
1.cd /usr/local/flume
2.cd conf
3.vim flume-to-spark.conf
 
flume-to-spark.conf 文件中写入如下内容:
#flume-to-spark.conf: A single-node Flume configuration
 # Name the components on this agent
 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1
 # Describe/configure the source
 a1.sources.r1.type = netcat
 a1.sources.r1.bind = localhost
 a1.sources.r1.port = 33333
# Describe the sink
 a1.sinks.k1.type = avro
 a1.sinks.k1.hostname = localhost
 a1.sinks.k1.port =44444
 # Use a channel which buffers events in memory
 a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
 a1.channels.c1.transactionCapacity = 1000000
 # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

  

(2)Spark 的准备工作
  1.cd /usr/local/spark
  2. ./bin/spark-shell
 
启动成功后,在 spark-shell 中执行下面 import 语句:
1.scala> import org.apache.spark.streaming.flume._
 
你可以看到,马上会报错,因为找不到相关的 jar 包。所以,现在我们就需要下载spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示对应的Scala版本号,2.1.0表示Spark版本号。现在请在 Linux 系统中,打开一个火狐浏览器,打开下方的网址http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,里面有提供 spark-streaming-flume_2.11-2.1.0.jar 文件的下载。
 
随后:
1.cd /usr/local/spark/jars
2.mkdir flume
3.cd ~
4.cd 下载
5.cp ./spark-streaming-flume_2.11-2.1.0.jar
6./usr/local/spark/jars/flume
7.cd /usr/local/flume/lib
8.ls
9.cp ./* /usr/local/spark/jars/flume
 
⑶编写 Spark 程序使用 Flume 数据源
1.cd /usr/local/spark/mycode
2.mkdir flume
3.cd flume
4.mkdir -p src/main/scala
5.cd src/main/scala
6.vim FlumeEventCount.scala
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {
 def main(args: Array[String]) {
 if (args.length < 2) {
 System.err.println(
 "Usage: FlumeEventCount <host> <port>")
 System.exit(1)
 }
StreamingExamples.setStreamingLogLevels()
 val Array(host, IntParam(port)) = args
 val batchInterval = Milliseconds(2000)
 // Create the context and set the batch size
 val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local
[2]")
 val ssc = new StreamingContext(sparkConf, batchInterval)
 // Create a flume stream
 val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_S
ER_2)
 // Print out the count of events received from this server in each batch
 stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
 ssc.start()
 ssc.awaitTermination()
 } }

  

然后再使用 vim 编辑器新建 StreamingExamples.scala 文件,输入如下代码,用于控
制日志输出格式:
package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
 /** Set reasonable logging levels for streaming if the user has not configured log4
j. */
 def setStreamingLogLevels() {
 val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
 if (!log4jInitialized) {
 // We first log something to initialize Spark‘s default logging, then we overri
de the
 // logging level.
 logInfo("Setting log level to [WARN] for streaming example." +
 " To override add a custom log4j.properties to the classpath.")
 Logger.getRootLogger.setLevel(Level.WARN)
 }
 }
}

  

然后,请执行下面命令新建一个 simple.sbt 文件:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0"

  进行打包:

 1.cd /usr/local/spark/mycode/flume/
 2./usr/local/sbt/sbt package#执行这一步之前需要先安装 sbt 插件
 
⑷测试程序效果
关闭之前打开的所有终端。首先,请新建第 1 个 Linux 终端,启动 Spark Streaming 应用程序,命令如下:
 1.cd /usr/local/spark
 2. ./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark/mycode/flume/target/scala-2.11/simple-project_2.11-1.0.jar localhost 44444
 
通过上面命令,我们为应用程序提供 host 和 port 两个参数的值分别为 localhost 和44444,程序会对 localhost 的 44444 端口进行监听,Milliseconds(2000)设置了时间间隔为2 秒,所以,该程序每隔 2 秒就会从指定的端口中获取由 Flume Sink 发给该端口的消息,然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔 2 秒钟刷新一次信息,大量信息中会包含如下重要信息:
-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.
因为目前 Flume 还没有启动,没有给 FlumeEventCount 发送任何消息,所以 FlumeEvents 的数量是 0。第 1 个终端不要关闭,让它一直处于监听状态。现在,我们可以再另外新建第 2 个终端,在这个新的终端中启动 Flume Agent,命令如下:
 1.cd /usr/local/flume
 2.bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console
启动 agent 以后,该 agent 就会一直监听 localhost 的 33333 端口,这样,我们下面就可以通过“telnet localhost 33333”命令向 Flume Source 发送消息。第 2 个终端也不要关闭,让它一直处于监听状态。
 
请另外新建第 3 个终端,执行如下命令:
1.telnet localhost 33333
 
执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息都会被 Flume 监听到,Flume 把消息采集到以后汇集到 Sink,然后由 Sink 发送给 Spark的 FlumeEventCount 程序进行处理。然后,你就可以在运行 FlumeEventCount 的前面那个终端窗口内看到类似如下的统计结果:
-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.
#这里省略了其他屏幕信息
-------------------------------------------
Time: 1488029432000 ms
-------------------------------------------
Received 8 flume events.
#这里省略了其他屏幕信息
-------------------------------------------
Time: 1488029434000 ms
-------------------------------------------
Received 21 flume events.
 
 
 
以上是关于实验的一些操作步骤,最后经过实验出现了一个问题,启动Flume之后,就应该能接受到Received 21 flume events这个提示,但是我做实验启动flume之后,依然是0,不知原因,此问题还未解决。

2020年寒假学习进度第六天

标签:日志采集   start   string   from   字符   xtend   user   nts   agent   

原文地址:https://www.cnblogs.com/ljm-zsy/p/12268395.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!