标签:
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called ‘agent‘
a2.sources = r2
a2.channels = c2
a2.sinks = k2
### define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/datas/spark_word_count.log
a2.sources.r2.shell = /bin/bash -c
### define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
### define sinks
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = bigdata.eclipse.com
a2.sinks.k2.port = 9999
### bind the sources and sinks to the channels
a2.sources.r2.channels=c2
a2.sinks.k2.channel = c2
用到的jar包如下:
1) spark-streaming-flume_2.10-1.3.0.jar
2) flume-avro-source-1.5.0-cdh5.3.6.jar
3)flume-ng-sdk-1.5.0-cdh5.3.6.jar
bin/spark-shell \
--jars /opt/app/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/extraljars/spark-streaming-flume_2.10-1.3.0.jar,/opt/app/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/extraljars/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/app/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/extraljars/flume-ng-sdk-1.5.0-cdh5.3.6.jar \
--master local[2]
-- 代码:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
val ssc = new StreamingContext(sc, Seconds(5))
val stream = FlumeUtils.createStream(ssc, "bigdata.eclipse.com", 9999)
// val eventsCount = stream.count.map(cnt => "Recevied " + cnt + " flume events.")
// eventsCount.print()
val wordCountStream = stream.map(x => new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
wordCountStream.print()
ssc.start()
ssc.awaitTermination()
---
bin/flume-ng agent -c conf -n a2 -f conf/flume-spark-tail-conf.properties -Dflume.root.logger=INFO,console
标签:
原文地址:http://blog.csdn.net/yangcongyangling/article/details/51357113