铭文一级:
第8章 Spark Streaming进阶与案例实战
黑名单过滤
访问日志 ==> DStream
20180808,zs
20180808,ls
20180808,ww
==> (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)
黑名单列表 ==> RDD
zs
ls
==>(zs: true)(ls: true)
==> 20180808,ww
leftjoin
(zs: [<20180808,zs>, <true>]) x
(ls: [<20180808,ls>, <true>]) x
(ww: [<20180808,ww>, <false>]) ==> tuple 1
第9章 Spark Streaming整合Flume
Push方式整合
Flume Agent的编写: flume_push_streaming.conf
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = 192.168.199.203
simple-agent.sinks.avro-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
flume-ng agent \
--name simple-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
hadoop000:是服务器的地址
local的模式进行Spark Streaming代码的测试 192.168.199.203
本地测试总结
1)启动sparkstreaming作业
2) 启动flume agent
3) 通过telnet输入数据,观察IDEA控制台的输出
spark-submit \
--class com.imooc.spark.FlumePushWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar \
hadoop000 41414
铭文二级:
第8章 Spark Streaming进阶与案例实战
复制NetworkWordCount改成TransformApp:
1.构建黑名单
val blacks = List("zs","ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))
需要构建的各种形式
传入的数据(zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)
黑名单:(zs: true)(ls: true)
RDD=(zs: [<20180808,zs>, <true>]) x
(ls: [<20180808,ls>, <true>]) x
(ww: [<20180808,ww>, <false>])
val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x => x._2._2.getOrElse("flase") != true)
.map(x => x._2._1)
})
clicklog.print() //打印来看看
实战:整合Spark Streaming与Spark SQL的操作
直接拷贝官方源码来测试->点击
导入相应的包
在pom.xml导入SparkSQL的依赖(将Spark Streaming的改成sql即可)
官方关键代码:
// Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words")
运行监测即可
第9章 Spark Streaming整合Flume(push与pull方式)
push方式(看官网):
Flume配置->导入依赖->FlumeUtils->spark-submit提交