标签:并且 大量 干货 核心 应用 hdfs 构建 如何 developer
欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~
本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。
首先,什么是流(streaming)?数据流是连续到达的无穷序列。流处理将不断流动的输入数据分成独立的单元进行处理。流处理是对流数据的低延迟处理和分析。Spark Streaming是Spark API核心的扩展,可实现实时数据的快速扩展,高吞吐量,高容错处理。Spark Streaming适用于大量数据的快速处理。实时处理用例包括:
Spark Streaming支持如HDFS目录,TCP套接字,Kafka,Flume,Twitter等数据源。数据流可以用Spark 的核心API,DataFrames SQL,或机器学习的API进行处理,并且可以被保存到HDFS,databases或Hadoop OutputFormat提供的任何文件系统中去。
Spark Streaming将数据流每X秒分作一个集合,称为Dstreams,它在内部是一系列RDD。您的Spark应用程序使用Spark API处理RDD,并且批量返回RDD操作的结果。
Spark Streaming示例代码执行以下操作:
其他Spark示例代码执行以下操作:
油泵传感器数据文件放入目录中(文件是以逗号为分隔符的CSV)。Spark Streaming将监视目录并处理在该目录中创建的所有文件。(如前所述,Spark Streaming支持不同的流式数据源;为简单起见,此示例将使用CSV。)
以下是带有一些示例数据的csv文件示例:
我们使用Scala案例类来定义与传感器数据csv文件相对应的传感器模式,并使用parseSensor函数将逗号分隔值解析到传感器案例类中。
流数据的HBase表格模式如下:
日常统计汇总的模式如下所示:
下面的函数将Sensor对象转换为HBase Put对象,该对象用于将数据行插入到HBase中。
您可以使用Spark 的TableOutputFormat类写入HBase表,这与您从MapReduce写入HBase表的方式类似。下面我们使用TableOutputFormat类设置HBase的配置。
这些是Spark Streaming代码的基本步骤:
我们将通过示例应用程序代码完成这些步骤。
首先,我们创建一个StreamingContext,这是流式传输的主要入口点(2秒间隔时间)。
val sparkConf = new SparkConf ( ) . setAppName ( "HBaseStream" ) // 创建 StreamingContext, 流式函数的主要入口 val ssc = new StreamingContext ( sparkConf , Seconds ( 2 ) )
接下来,我们使用StreamingContext textFileStream(directory)方法创建一个输入流,该输入流监视Hadoop兼容的文件系统以获取新文件,并处理在该目录中创建的所有文件。
// 创建代表数据 DStream对象 val linesDStream = ssc . textFileStream ( "/user/user01/stream" )
linesDStream代表数据流,每个记录都是一行文本。内部DStream是一系列RDD,每个批处理间隔一个RDD。
接下来,我们将数据行解析为Sensor对象,并使用DStream行上的map操作。
// 把lineDSream的每一行解析为Sensor对象 val sensorDStream = linesDStream . map ( Sensor . parseSensor )
map操作在linesDStream中的RDD上使用Sensor.parseSensor函数,从而生成Sensor对象(RDD)。
接下来,我们使用DStream foreachRDD方法将处理应用于此DStream中的每个RDD。我们过滤低psi传感器对象以创建警报,然后我们通过将传感器和警报数据转换为Put对象并使用PairRDDFunctions saveAsHadoopDataset方法将传感器和警报数据写入HBase ,该方法使用Hadoop将RDD输出到任何支持Hadoop的存储系统,该存储系统的配置对象(请参阅上面的HBase的Hadoop配置)。
// 对每一个RDD. sensorRDD . foreachRDD { rdd => // 低psi的传感器过滤器 val alertRDD = rdd . filter ( sensor => sensor . psi < 5.0 ) // 把传感器数据转为对象并写入HD rdd . map ( Sensor . convertToPut ) . saveAsHadoopDataset (jobConfig ) // 把警报转为对象并写入HD rdd . map ( Sensor . convertToPutAlert ) . saveAsHadoopDataset (jobConfig ) }
sensorRDD对象被转换并写入HBase。
要开始接收数据,我们必须在StreamingContext上显式调用start(),然后调用awaitTermination来等待计算完成。
// 开始计算 ssc . start ( ) // 等待计算完成 ssc . awaitTermination ( )
现在我们要读取HBase传感器表数据,计算每日摘要统计信息并将这些统计信息写入。
以下代码读取HBase表,传感器表,psi列数据,使用StatCounter计算此数据的统计数据,然后将统计数据写入传感器统计数据列。
// HBase的读取设置 val conf = HBaseConfiguration . create ( ) conf . set ( TableInputFormat . INPUT_TABLE , HBaseSensorStream . tableName ) // 扫描数据 conf . set ( TableInputFormat . SCAN_COLUMNS , "data:psi" ) // 加载RDD (row key, row Result)元组 val hBaseRDD = sc . newAPIHadoopRDD ( conf , classOf [TableInputFormat ] , classOf [ org . apache . hadoop . hbase . io . ImmutableBytesWritable ] , classOf [ org . apache . hadoop . hbase . client . Result ] ) // 把(row key, row Result) 元组为RDD val resultRDD = hBaseRDD.map(tuple => tuple._2) // 转为 RDD (RowKey, ColumnValue), 移除Time val keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value))) // 分组,得到统计数据 val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list)) // 转码rowkey,统计信息放入并写入hbase keyStatsRDD.map { case (k, v) => convertToPut(k, v)}.saveAsHadoopDataset(jobConfig)
下图显示newAPIHadoopRDD的输出。PairRDDFunctions saveAsHadoopDataset将Put对象保存到HBase。
您可以将代码作为独立应用程序运行,如“MapR Sandbox上的Spark入门教程”中所述。
以下是总的步骤:
hbase classpath
--class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jarcp sensordata.csv /user/user01/stream/
这就结束了关于使用HBase进行Spark Streaming的教程。您可以在相关阅读部分找到更多信息。
问答
相关阅读
此文已由作者授权腾讯云+社区发布,原文链接:https://cloud.tencent.com/developer/article/1123173?fromSource=waitui
标签:并且 大量 干货 核心 应用 hdfs 构建 如何 developer
原文地址:https://www.cnblogs.com/qcloud1001/p/9044511.html