Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务。这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据。这里有两个方法。
Python API:Flume现在还不支持PythonAPI
方法1:Flume风格的推方法
Flume被设计用来在Flume代理之间推送数据。在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上。下面是配置的步骤。
一般需求
在你的集群中选择一台机器满足如下条件:
1. 当你的Flume+Spark Streaming程序启动之后,Spark节点之一必须运行在那台机器上。
2. Flume可以配置为推送数据到那台机器的端口上。
根据推模型,流程序需要启动,同时接收器按计划运行并监听选择的端口,以让Flume能够推送数据。
配置Flume
配置Flume代理来发送数据到一个Avro池,需要在配置文件中加入如下的内容。
agent.sinks = avroSink agent.sinks.avroSink.type = avro agent.sinks.avroSink.channel = memoryChannel agent.sinks.avroSink.hostname = <chosen machine's hostname> agent.sinks.avroSink.port = <chosen port on the machine>
查看Flume文档来获取更多关于配置Flume代理的信息。
配置Spark Streaming程序
1.连接:在你的SBT/Maven项目定义中,通过下面的内容连接你的流程序(在主编程指南中的连接章节寻找更多的信息)。
groupId = org.apache.spark artifactId = spark-streaming-flume_2.10 version = 1.4.1
2.编程:在流处理程序的代码中,引入FlumeUtils并如下创建一个输入DStream流。
Scala
importorg.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createStream(streamingContext, [chosenmachine's hostname], [chosen port])
Java
import org.apache.spark.streaming.flume.*; JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine'shostname], [chosen port]);
查看API文档和示例。
注意,这里的主机名应该和集群中的资源管理器使用的主机名相同(Mesos,YARN或Spark Standalone),这样资源分配可以匹配名字并在正确的机器上启动接收器。
3.部署:将spark-streaming-flume_2.10包和它的依赖(除了由spark-submit提供的spark-core_2.10和spark-streaming_2.10)添加到程序包中。然后使用spark-submit来启动你的应用程序(在主编程指南中查看部署章节)。
方法2:使用自定义池的拉方式
不是Flume直接推送数据到SparkStreaming,这种方法运行了一个如下所示的Flume池。
1. Flume将数据推送到池中,然后数据在此处缓存。
2. Spark Streaming使用一个可靠的Flume接收器和操作从池中拉取数据。只有在Spark Streaming接收到数据并且把数据复制后才认为操作成功。
这个方法比前面的方法提供了强大的可靠性和容错保证。然而,这需要配置Flume运行一个自定义池。下面是配置步骤。
一般需求
选择一台在Flume代理中运行自定义池的机器。Flume其余的管道被配置为向那个代理发送数据。Spark集群中的机器都能连接到运行自定义池的那台机器上。
配置Flume
在选定的机器上配置Flume需要如下的两步。
1.池JAR包:添加如下的JAR包到要运行自定义池的机器中的Flume的classpath中(查看Flume的文档https://flume.apache.org/documentation.html)。
(i)自定义池JAR包:下载与下面内容一致的JAR包(或直接下载的地址https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume-sink_2.10/1.4.1/spark-streaming-flume-sink_2.10-1.4.1.jar)
groupId= org.apache.spark artifactId =spark-streaming-flume-sink_2.10 version = 1.4.1
(ii)Scala库JAR包:下载Scala库2.10.4版本JAR包。它可以用下面的内容找到(或直接在这里下载https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar)
groupId= org.scala-lang artifactId = scala-library version = 2.10.4
(iii)CommonsLang3 JAR包:下载Commons Lang 3 JAR包。它可以用下面的内容找到(或者直接下载https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar)
groupId= org.apache.commons artifactId = commons-lang3 version = 3.3.2
2.配置文件:在那台机器上,通过下面的配置文件配置Flume代理发送数据到一个Avro池中。
agent.sinks = spark agent.sinks.spark.type =org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname = <hostname ofthe local machine> agent.sinks.spark.port = <port to listen onfor connection from Spark> agent.sinks.spark.channel = memoryChannel
也要确保上行流的Flume管道配置了发送数据到运行这个池的Flume代理。
查看Flume文档寻找更多关于配置Flume代理的信息。
配置Spark Streaming程序
1. 连接:在你的SBT/Maven项目定义中,通过下面的内容连接你的流程序(在主编程指南中的连接章节寻找更多的信息http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking)
2. 编程:在流处理程序的代码中,引入FlumeUtils并如下创建一个输入DStream流。
Scala
importorg.apache.spark.streaming.flume._ val flumeStream =FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sinkport])
Java
importorg.apache.spark.streaming.flume.*; JavaReceiverInputDStream<SparkFlumeEvent>flumeStream= FlumeUtils.createPollingStream(streamingContext, [sink machinehostname], [sink port]);
注意每个输入DStream可以配置为从多个池中接收数据。
3. 部署:将spark-streaming-flume_2.10包和它的依赖(除了由spark-submit提供的spark-core_2.10和spark-streaming_2.10)添加到程序包中。然后使用spark-submit来启动你的应用程序(在主编程指南中查看部署章节http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications)。
Spark Streaming和Flume集成指南V1.4.1
原文地址:http://blog.csdn.net/sdujava2011/article/details/47035173