Streaming hdfs count 需要先启动 hadoop 集群。
# 启动 hadoop 集群 start-dfs.sh start-yarn.sh # 查看是否启动成功 # 命令 jps jps
hadoop 启动成功之后,下面就是关于 stream 的代码,stream 统计代码如下,将下面的代码进行打包,上传到服务器上即可。
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object HdfsWordCount { def main(args: Array[String]): Unit = { if (args.length < 2) { System.err.println("Usage: HdfsWordCount <directory>") System.exit(1) } // StreamingExamples.setStreamingLogLevels() val sparkConf = new SparkConf().setAppName("HdfsWordCount") // Create the context val ssc = new StreamingContext(sparkConf, Seconds(10)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created val lines = ssc.textFileStream(args(0)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() // wordCounts.saveAsTextFiles(args(1)) ssc.start() ssc.awaitTermination() } }
代码需要传递两个参数,一个是 stream 监控的数据输入目录,一个是输出目录。对应的执行脚本如下。
$SPARK_HOME/bin/spark-submit --class com.hw.streaming.HdfsWordCount --master yarn-cluster --executor-memory 1G --total-executor-cores 2 --files $HIVE_HOME/conf/hive-site.xml --jars $HIVE_HOME/lib/mysql-connector-java-5.1.25-bin.jar,$SPARK_HOME/jars/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/jars/datanucleus-core-3.2.10.jar,$SPARK_HOME/jars/datanucleus-rdbms-3.2.9.jar,$SPARK_HOME/jars/guava-14.0.1.jar ./SparkPro-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://master:9000/data/input hdfs://master:9000/data/output
# 脚本是跑在 yarn-cluster 上的,所以可以通过 ui 界面查看对应的内容 sh hdfs_run.sh
# 随便上传一个文件,比如这里是 3.txt,对应的内容是 # cat 3.txt hello world hello world hello world hello world hello world hello world hello world a a a a a a a b b b # 将 3.txt 上传到 hdfs hadoop fs -put 3.txt /data/input
# 浏览器输入 # 点击对应的 application # 点击对应的 log # 点击查看 log 详情 # 会看到下面的日志输出 ------------------------------------------- Time: 1564279580000 ms ------------------------------------------- (b,3) (hello,7) (world,7) (a,7)
以上就是 Streaming hdfs count 的案例,一开始调试的时候没有通过是没有看清楚,是先把数据文件上传到 hdfs 里面了,导致后面统计不出来,后来发现是启动之后监控的,因此,需要先启动,在向里面放数据。查看日志的时候,发现 INFO 也打印出来了,如果不需要看 INFO 信息,可以在 hadoop 配置文件中 log4j.properties 中把日志级别调高,或者去掉 INFO,即可。
