标签:标识 密码 进制 页面 高可用 encoder global SOS 实现
Apache Spark是一个计算系统,它的处理速度很快,很通用,并且能集群部署。可以通过Scala,Java,Python和R来操作API。它还提供了了一些高级工具,比如Spark SQL(通过SQL操作),MLlib(机器学习用的),Graphx(图形化操作用的)还有Spark Streaming(流式计算)。
默认情况下Spark安全性是关闭的。(正式环境要开启)
Spark版本 | Java版本 | Python版本 | Scala版本 | R版本 |
---|---|---|---|---|
2.4.1~2.4.5 | 8 | 2.7+/3.4+ | 2.12.x | 3.1+ |
有关此结构的几点概要:
PS:集群管理器类型包括:独立部署,Spark模式部署,Mesos模式部署,Kubernetes模式部署。
使用spark-submit
可以将应用程序提交到任何形势的集群。
浏览端口4040
查看任务。例如:http://node1:4040
术语名 | 简介 |
---|---|
Application | 基于Spark的应用程序。 |
Application jar | 一个包含Spark应用程序的jar。 |
Driver program | 运行应用程序main(),并创建SparkContext的过程。 |
Cluster manager | 用于获取外部集群资源的服务。(比如mesos,yarn) |
Deploy mode | 部署方式:在“集群”模式下部署在集群内部,在“客户端”模式下,提交者在集群外部启动程序。 |
Worker node | 工作节点(在集群中可以运行应用程序的节点。) |
Executor | 运行应用程序,并将结果存储在内存或者磁盘中。 |
Task | 一种工作单元,将发送给一个执行者。 |
Job | 由多组Task组成的并行计算。 |
Stage | 每个Job被分的最小任务集。 |
默认情况下Spark是关闭安全的,所以如果您不配置则可能会被攻击。如何设置安全,请参阅 Spark安全性 。
从官网下载对应版本放在服务器即可。这里我下载的2.4.5
我服务器部署如下:
tuge1 | tuge2 | tuge3 |
---|---|---|
Spark2.4.5 | Spark2.4.5 | Spark2.4.5 |
我在tuge1
创建了一个/opt/spark
目录,并将下载的压缩包放上去。
然后在master
服务器(tuge1)执行如下脚本启动:
# ./sbin/start-master.sh
启动后就可以使用默认端口8080访问了。比如: http://tuge1:8080/
同样您可以启动一个或多个Worker,并通过以下方式连接到主节点:
./sbin/start-slave.sh <master-spark-URL>
在tuge2,tuge3服务器执行: ./sbin/start-slave.sh spark://tuge1:7077
启动后再次浏览http://tuge1:8080/
就可以看到新增了两个Worker。
要使用启动脚本启动Spark独立集群,您应该在Spark目录中创建一个名为conf / slaves的文件,该文件必须包含要启动Spark Worker的所有计算机的主机名,每行一个。如果conf / slaves不存在,则启动脚本默认为单台计算机(localhost),这对于测试非常有用。请注意,主计算机通过ssh访问每个工作计算机。默认情况下,ssh是并行运行的,并且需要设置无密码(使用私钥)访问权限。如果没有无密码设置,则可以设置环境变量SPARK_SSH_FOREGROUND并为每个工作线程依次提供一个密码。
您可以选择通过在中设置环境变量来进一步配置集群conf/spark-env.sh
。从开始以创建此文件conf/spark-env.sh.template
,然后将其复制到所有工作计算机上以使设置生效。可以使用以下设置(必须配置的已经加粗):
环境变量 | 含义 |
---|---|
JAVA_HOME | 配置Java环境。(注:这个最好配置下,我就是因为没配置导致启动报了Unsupported major.minor version 52.0错误~!) |
SPARK_MASTER_HOST | 将主机绑定到特定的主机名或IP地址,例如公共主机名或IP地址。 |
SPARK_MASTER_PORT | 在另一个端口上启动主服务器(默认:7077)。 |
SPARK_MASTER_WEBUI_PORT | 主Web UI的端口(默认值:8080)。 |
SPARK_MASTER_OPT | 仅以“ -Dx = y”的形式应用于主服务器的配置属性(默认值:无)。请参阅下面的可能选项列表。 |
SPARK_LOCAL_DIRS | 用于Spark中“临时”空间的目录,包括存储在磁盘上的地图输出文件和RDD。它应该在系统中的快速本地磁盘上。它也可以是不同磁盘上多个目录的逗号分隔列表。 |
SPARK_WORKER_CORES | 允许Spark应用程序在计算机上使用的核心总数(默认值:所有可用的核心)。 |
SPARK_WORKER_MEMORY | 允许Spark应用程序在计算机上使用的内存总量,例如1000m ,2g (默认值:总内存减去1 GB);请注意,每个应用程序的单独内存都是使用其spark.executor.memory 属性配置的。 |
SPARK_WORKER_PORT | 在特定端口上启动Spark worker(默认:随机)。 |
SPARK_WORKER_WEBUI_PORT | 辅助Web UI的端口(默认值:8081)。 |
SPARK_WORKER_DIR | 要在其中运行应用程序的目录,其中将包括日志和暂存空间(默认值:SPARK_HOME / work)。 |
SPARK_WORKER_OPTS | 仅以“ -Dx = y”的形式应用于工作程序的配置属性(默认值:无)。请参阅下面的可能选项列表。 |
SPARK_DAEMON_MEMORY | 分配给Spark主守护程序和辅助守护程序本身的内存(默认值:1g)。 |
SPARK_DAEMON_JAVA_OPTS | Spark主服务器和辅助服务器守护程序的JVM选项以“ -Dx = y”的形式出现(默认值:无)。 |
SPARK_DAEMON_CLASSPATH | Spark主守护程序和辅助守护程序本身的类路径(默认值:无)。 |
SPARK_PUBLIC_DNS | Spark主服务器和辅助服务器的公共DNS名称(默认值:无)。 |
您可以基于Hadoop的部署脚本,使用以下shell启动或停止集群, 该脚本在SPARK_HOME/sbin
下面:
sbin/start-master.sh
-在执行脚本的计算机上启动主实例。sbin/start-slaves.sh
-在conf/slaves
文件中指定的每台计算机上启动一个从属实例。sbin/start-slave.sh
-在执行脚本的计算机上启动从属实例。sbin/start-all.sh
-如上所述,同时启动一个主机和多个从机。sbin/stop-master.sh
-停止通过sbin/start-master.sh
脚本启动的主机。sbin/stop-slaves.sh
-停止conf/slaves
文件中指定的机器上的所有从属实例。sbin/stop-all.sh
-如上所述,停止主机和从机。这里我就使用sbin/start-all.sh
命令将Application和Worker启动了~!
执行如下命令进行链接(spark://IP:PORT)
# ./spark-shell --master spark://tuge1:7077
您还可以传递一个选项--total-executor-cores
来控制spark-shell在集群上使用的核心数量。
? (控制台)
? (WebUI)
使用spark-submit脚本进行提交。
就拿官方自带程序进程演示吧,下载好Spark2.4.5到本地后进行解压,可以在
examples\src\main\java\org\apache\spark\examples 看到java示例程序。
然后整个项目的压缩包可以在examples\jars下找到:
就拿JavaSparkPi举例:
# ./bin/spark-submit > --class org.apache.spark.examples.JavaSparkPi > --master spark://tuge1:7077 > /opt/spark/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.5.jar
执行结果:
standalone遵循的是先进先出原则,默认情况下 ,它将获取所有内核,这仅在您一次只运行一个程序才有意义。要过要同时运行多个,必须要限制内核,如下:
val conf=new SparkConf()
conf.setMaster("....")
conf.setAppName("...")
conf.set("spark.cores.max","3")//限制最大内核数量是3
val sc=new SparkContext(conf)
此外还可以在集群上进行配置。因此将以下内容添加到conf/spark-env.sh
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
通过spark-executor-cores
可以明确给应用指定分配的核心数。
您可以通过将Spark作为单独的服务在同一台计算机上启动,将其与现有Hadoop集群一起运行。要从Spark访问Hadoop数据,只需使用hdfs:// URL(通常为hdfs://:9000/path
,但是您可以在Hadoop Namenode的Web UI上找到正确的URL)。另外,您可以为Spark设置一个单独的群集,并且仍然让它通过网络访问HDFS。这将比磁盘本地访问慢,但是如果您仍在同一局域网中运行(例如,在装有Hadoop的每个机架上放置几台Spark机器),则可能不必担心。
Spark默认部署在内网环境中。如果要进行安全设置参考 安全性页面
总览
利用Zookeeper实现高可用:会存在一个主服务器和多个备用服务器,当主服务器挂掉之后,Zookeeper会通过心跳进行感知,从而通过选举机制选举一台从服务器当新的主。
组态
为了启用此恢复模式,您可以通过配置spark.deploy.recoveryMode
及相关的spark.deploy.zookeeper
。在spark-env.sh中设置SPARK_DAEMON_JAVA_OPTS
。有关这些配置的更多信息,请参阅配置文档
物业名称 | 默认 | 含义 |
---|---|---|
spark.deploy.recoveryMode | none | 设置为ZOOKEEPER代表发生故障使用zookeeper。 |
spark.deploy.zookeeper.url | none | zookeeper主机的名字。 |
spark.deploy.zookeeper.dir | none | spark要在zookeeper上写数据时的保存目录 |
例如我的配置如下:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=tuge1:2181,tuge2:2181,tuge3:2181 -Dspark.deploy.zookeeper.dir=/opt/spark"
然后修改slaves文件(先修改一台,然后分发)
# vim slaves
tuge1
tuge2
tuge3
zookeeper默认和spark端口都是8080,有冲突,所以要么修改zookeeper端口,要么修改spark端口,这里我修改的zookeeper端口。
在conf/zoo.cfg里面添加自定义端口:admin.serverPort=9999
启动
先启动zookeeper
在每台机器上执行该命令:
# zkServer.sh start
启动后一个leader,其他的是flower 。
在第一台机器上启动spark
# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/spark-2.4.5-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-tuge1.out
tuge2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/spark-2.4.5-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-tuge2.out
tuge3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/spark-2.4.5-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-tuge3.out
tuge1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/spark-2.4.5-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-tuge1.out
# jps
6402 QuorumPeerMain
7797 Master
7868 Worker
7948 Jps
在第二台机器上单独启动master
# sbin/start-master.sh
查看web端口
第一个master为
Status: ALIVE
第二个master为
Status: STANDBY
这是时候把tuge1杀死,然后再看下tuge2是否转为ALIVE。
通过jps可知,tuge1的masterpid为8365。这里我就接简单粗暴了 :
# kill 8365
然后查看页面,可以看出tuge1已经无法访问,tuge2成为了新的leader。
搭建成功~
总览
zookeeper是生产级高可用方法,但是如果您只想在主服务器出现故障时启动它,则 FILESYSTEM模式可以解决这一问题。当Application和Worker注册时,它们可以将状态写入到下面配置的目录里面,以便重新启动主进程时恢复它们。
您可以通过在spark-env
设置开启本地文件恢复。
系统属性 | 含义 |
---|---|
spark.deploy.recoveryMode | 设置为FILESYSTEM以启用单节点恢复模式(默认值:NONE)。 |
spark.deploy.recoveryDirectory | 从主服务器的角度来看,Spark将存储恢复状态的目录。 |
默认情况下Spark是关闭安全的,所以如果您不配置则可能会被攻击。如何设置安全,请参阅 Spark安全性 。
确保HADOOP_CONF_DIR
或YARN_CONF_DIR
指向包含Hadoop集群的(客户端)配置文件的目录。 这些配置用于写入HDFS并链接到Yarn。
# vim spark-env.sh
加入如下配置(将Hadoop里面的配置文件引入进来)(tuge1,tuge2,tuge3服务器都要修改)
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/etc/hadoop"}
从上面介绍可以得知,这个是基于HDFS和Yarn的,所以在运行Spark程序之前要先启动HDFS和Yarn。
# cd /opt/hadoop/hadoop-2.6.5/sbin/
# ./start-dfs.sh
# ./start-yarn.sh
以cluster模式启动Spark应用程序:
$ ./spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
例如:
$ ./spark-submit --class org.apache.spark.examples.JavaSparkPi --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue thequeue /opt/spark/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples*.jar 10
以client模式启动Spark应用程序:
$ ./spark-shell --master yarn --deploy-mode client
添加其它JAR
在cluster
模式下,驱动程序与客户端运行在不同的计算机上,因此SparkContext.addJar
对于客户端本地的文件而言,开箱即用。要使客户机上的文件可用于SparkContext.addJar
,请将它们包含--jars
在启动命令中的选项中。
$ ./bin/spark-submit --class my.main.Class --master yarn --deploy-mode cluster --jars my-other-jar.jar,my-other-other-jar.jar my-main-jar.jar app_arg1 app_arg2
我们可以使用Apache Oozie来启动Spark应用程序,如果Spark配置了安全启动,则使用Spark安全令牌,如果没配置,则通过Oozie设置安全性。
有关Oozie安全设置参考网站.
对于Spark应用程序,必须为Oozie设置Oozie工作流,以请求该应用程序需要的所有令牌,包括:
为了避免Spark尝试(然后失败)获取Hive,HBase和远程HDFS令牌,必须将Spark配置设置为禁用服务的令牌收集。
Spark配置必须包括以下几行:
spark.security.credentials.hive.enabled false
spark.security.credentials.hbase.enabled false
spark.yarn.access.hadoopFileSystems
必须取消配置选项。
禁用应用程序UI时,可以将Spark History Server应用程序页面用作运行应用程序的跟踪URL。在安全群集上或减少Spark驱动程序的内存使用量可能是理想的。要通过Spark History Server设置跟踪,请执行以下操作:
spark.yarn.historyServer.allowTracking=true
在Spark的配置中进行设置。如果禁用了应用程序的UI,这将告诉Spark使用历史服务器的URL作为跟踪URL。org.apache.spark.deploy.yarn.YarnProxyRedirectFilter
到spark.ui.filters
配置中的过滤器列表。请注意,历史记录服务器信息可能不是与应用程序状态有关的最新信息。
http://spark.apache.org/docs/latest/running-on-kubernetes.html
这个功能属于实验阶段,后面有可能会有修改。这里不演示了,有兴趣的自行查阅。
http://spark.apache.org/docs/latest/running-on-mesos.html
这个功能公司用的不多,学习的话直接参考官网文档就行了,我这里先不安装了。
默认情况下Spark是关闭安全的,所以如果您不配置则可能会被攻击。如何设置安全,请参阅 Spark安全性 。
这里主要以Scala语言进行介绍。
准备数据集(可以通过Hdfs和其它数据集来准备)
# mkdir -p /data/spark
# vim sparkData1.txt
#填写如下内容
hello world
What are you doing?
Nice to meet you~
hello YiMing
你好啊 一明
#按Esc ,然后输入:wq保存并退出
# hdfs dfs -put sparkData1.txt /user/root
启动:
# ./bin/spark-shell
scala> val textFile = spark.read.textFile("sparkData1.txt")
注意:使用spark.read.textFile()默认读取的是hdfs文件,也可以显示指定hdfs:// ,如果要读取本地文件加上file:// ,比如file:///data/spark/sparkData,或者使用 ./data/spark/sparkData
操作
scala> textFile.count()
res0: Long = 5 //计算出行数是5行
scala> textFile.first()
res2: String = hello world //查询出第一行的内容
我们可以通过过滤操作创建新的数据集
scala> val linesWithSpark = textFile.filter(line => line.contains("hello world"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
scala> linesWithSpark.count()
res4: Long = 1 //可以看到存在一行
我们可以将过滤动作和计算count链接在一起
scala> textFile.filter(line => line.contains("hello world")).count()
res5: Long = 1
scala> textFile.map(line=>line.split(" ").size).reduce((a,b)=>if(a>b) a else b)
res8: Int = 4
通过导入类库,来实现计算
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line=>line.split(" ").size).reduce((a,b)=>Math.max(a,b))
mapList: Int = 4
使用spark实现MapReduce流
scala> val wordCounts=textFile.flatMap(line=>line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
scala> wordCounts.collect()//把结果打印出来
res4: Array[(String, Long)] = Array((doing?,1), (Nice,1), (你好啊,1), (you,1), (一明,1), (YiMing,1), (hello,2), (are,1), (meet,1), (What,1), (world,1), (to,1), (you~,1))
指定缓存数据
scala> textFile.cache()
res6: textFile.type = [value: string]
scala> textFile.count()
res7: Long = 5
scala> textFile.count()
res8: Long = 5
建立一个sbt项目,起名SimpleApp,并在build.sbt里面配置如下内容
name := "SimpleApp"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
右键scala文件夹->New->Scala Class 选择Object,起名Simple 确定
添加如下代码
def main(args: Array[String]): Unit = {
val rootPath= System.getProperty("user.dir")//获取项目根目录
var bookFilePath = System.getProperty("user.dir") + "\\data\\Book"; //拼接路径
val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()//获取SparkSession
val bookData = spark.read.textFile(bookFilePath).cache() //获取文件内容并缓存
val hei = bookData.filter(line => line.contains("黑")).count() //计算文件中包含“黑”的行数
println(hei)
spark.stop()
}
在项目根目录下创建一个data文件夹,并在里面添加一个Book文件,注意没有后缀哦。在 Book里面添加如下内容:
这世界既残酷也温柔 22
唐骏转 44
增长黑客 11
蓝筹孩子 55
富爸爸穷爸爸 21
未来世界的幸存者 44
智慧农业 13
工业4.0 81
厚黑学 66
配置本地运行:
选择Run->Edit Configuratioins,在VM Options添加-Dspark.master=local
参数。
然后运行就可以看到结果了,结果是2就没问题了。
Spark提供了弹性分布式集(RDD)和共享变量。
RDD是一种跨节点的元素的集合,通过从HDFS或其它支持Hadoop的系统转换为RDD。RDD可以持久化到内存。
共享变量是可以将一个变量的副本分发给每个运行的任务,共享变量共有两种,一种是广播变量,一种是累加器。
默认情况下,Spark 2.4.5已构建并分发为可与Scala 2.12一起使用。(也可以将Spark构建为与其他版本的Scala一起使用。)要在Scala中编写应用程序,您将需要使用兼容的Scala版本(例如2.12.X)。
使用maven依赖添加以下内容:
spark依赖
groupId=org.apache.spark
artifactId=spark-core_2.12
version=2.4.5
haddop hdfs依赖
groupId=org.apache.hadoop
artifactId=hadoop-client
version=2.6.5
程序中导入
Scala导入
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
Java导入:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
在使用Spark的第一件事当然是创建SparkContext,但是在创建SparkContext之前,要先创建SparkConf对集群进行配置。
Scala初始化:
val conf=new SparkConf().setAppName("RDDDemo1").setMaster("local")
new SparkContext(conf)
Java初始化:
SparkConf conf = new SparkConf().setAppName("RDDDemo1").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
setAppName里面填写应用显示的名字,setMaster填写Standalone,Spark,Kubernetes的链接,但是一般都在使用submit命令的时候手动指定,而不是写死。在本地测试的话,填写‘local‘就行了。
在Spark Shell中,默认为您创建了一个特殊的可识别解释器的SparkContext sc
。制作自己的SparkContext将不起作用。您可以使用--master
参数设置上下文连接到哪个主机,还可以通过将逗号分隔的列表传递给参数来将JAR添加到类路径--jars
。您还可以通过在--packages
参数中提供逗号分隔的Maven坐标列表,从而将依赖项(例如Spark Packages)添加到Shell会话中。可能存在依赖项的任何其他存储库(例如Sonatype)都可以传递给--repositories
参数。例如,要bin/spark-shell
在四个核心上运行,请使用:
$ ./bin/spark-shell --master local[4]
或者,也可以添加code.jar到其类路径中,使用
$ ./bin/spark-shell --master local[4] --jars code.jar
获取完整列表,使用spark-shell --help即可查看。
通过sparkcontext的parallelize方法创建并行集合。
Scala实现:
val data=Array(1,2,3,4)
val distData=sc.parallelize(data,10)
Java实现:
List<Integer> data = Arrays.asList(1, 2, 3, 4);
JavaRDD<Integer> distData = sc.parallelize(data);
创建后分布式数据集(distData)就可以并行操作。 例如,我们可能会调用distData.reduce((a, b) => a + b)
以添加数组的元素。Spark将为每一个分区运行一个任务,默认每个CPU是2-4个分区,通常Spark会根据您的集群手动设置分区数,当然了也可以手动指定,比如上面我指定的10。
Spark可以从Hadoop支持的任何存储(hdfs,cassandra,hbase,本地文件)创建分布式数据集。
比如:
scala实现:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
Java实现:
JavaRDD<String> distFile = sc.textFile("data.txt");
有关使用Spark读取文件的一些注意事项。
除了文本文件,Spark的API还支持其它几种数据格式:
SparkContext.wholeTextFiles使您可以读取包含多个文本文件的目录。
读取序列化文件使用sequenceFile
对于其它Hadoop InputFormat,还可以使用SparkContext.hadoopRDD方法
RDD操作
RDD支持两种类型的操作:transformation和action。例如map是一个转换,将每个数据集传递给一个函数。reduce是一个动作,需要将计算结果返回。
Spark所有的transformation都是惰性的,因为它不会立即计算结果。仅当应用程序需要将结果返回时,才进行转换。
基本
为了说明RDD基础,请考虑下面程序
Scala版:
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.textFile("file:///F:\\Code\\Scala\\SparkTest\\data\\words")
val lineLength = data.map(line => line.length).persist()
val totalLength = lineLength.reduce((a, b) => a + b)
println(totalLength)
上面的程序给words里面的所有字数做了一个汇总。persist是对数据进行缓存,这个时候还没真正执行,当时候reduce的时候需要返回结果了,这个时候才会去执行map。
将函数传递给Spark
例如:
Scala实现:
val data = sc.textFile("file:///F:\\Code\\Scala\\SparkTest\\data\\words")
val lineLengths = data.map(line => line.length).map(line=>addOne(line))//计算每行的长度,并且每行都加1
val totalLength=lineLengths.reduce((a,b)=>a+b)//统计总的长度
def addOne(n: Integer): Integer = {
n + 1
}
Java实现:
JavaRDD<String> lines = sc.textFile("file:///F:\\Code\\Scala\\SparkTest\\data\\words");
JavaRDD<Integer> addOneLine = lines.map(line -> addOne(line));//计算每行的长度,并且每行都加1
int totalLength = addOneLine.reduce((a, b) -> a + b);//统计总的长度
public static Integer addOne(String str) {
return str.length() + 1;
}
闭包
Spark的难点之一就是跨集群执行时了解它的作用范围和生命周期。如下面例子:
例子
Scala代码:
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don‘t do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Java代码:
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don‘t do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
上面的例子运行得到的结果还是0,因为集群执行时,它分发的是副本,做的是副本累加。
要实现累加,我们可以使用累加器(accumulator),
val acc=sc.longAccumulator()
rdd.foreach(x=>acc.add(x))
println(acc.value)
上面就能正确打印结果了。当使用Accumulator的时候,注意使用一次action或者使用cache,persist切断之前的依赖。否则,累加器会持续累加。参考:( https://blog.csdn.net/zhaojianting/article/details/80228093 )
打印RDD
本地打印的时候,直接foreach循环就打印了,而在集群上打印,则需要把数据进行一次收集,要不计算的结果不会主动返回给Driver的。
以打印每行出现的次数为例:
Scala实现:
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("file:///F:\\Code\\Scala\\SparkTest\\data\\words")
val mapLine = lines.map(line => (line, 1))//得到一个key,value数组
val reduceLine = mapLine.reduceByKey((a, b) => a + b)//得到一个key,value数组
reduceLine.foreach(println)
Java实现:
JavaPairRDD<String, Integer> mapPairLine = lines.mapToPair(s -> new Tuple2(s, 1));//得到一个key,value数组
JavaPairRDD<String, Integer> reducePairLine = mapPairLine.reduceByKey((Integer a, Integer b) -> a + b);//得到一个key,value数组
Map<String, Integer> countMap = reducePairLine.collectAsMap();
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
TransFormation | 说明 |
---|---|
map(func) | 返回一个新的数据集,该数据集通过执行fun函数形成的。 |
filter(func) | 返回一个新的数据集,该数据集返回执行fun函数为true的数据形成的。 |
flatMap(func) | 与map相似,但是每行都可以映射到0或多个输出项。 |
mapPartitions(func) | 与map相似,但是分别在RDD的每个分区上运行,因此fun在类型T的RDD上运行必须为Iterator |
mapPartitionsWithIndex(func) | 与mapPartitions相似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须为(Int,Iterator |
sample(withReplacement, fraction, seed) | 对数据进行随机采样。withReplacementde代表是否有放回抽样。fraction如果是0.几那么就代表抽中百分比,如果是1,2之类的整数,代表每个元素随机抽中的次数。 |
union(otherDataset) | 并集 |
intersect(otherDataset) | 交集 |
distinct([numPartitions]) | 返回不同的元素 |
groupByKey([numPartitions]) | 根据key进行分组。例如:val rdd = sc.parallelize(List((1, 3), (4, 7), (1, 86))) val groupData = rdd.groupByKey() 这样将3和86归为1,7归为4 |
reduceByKey(func, [numPartitions]) | 根据key合并分组。例如:val rdd = sc.parallelize(List((1, 3), (4, 7), (1, 86))) val reduceData = rdd.reduceByKey((a, b) => a + b) 这样就会得到1,89 4,7 |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | 设定一个初始值和两个函数,在初始值的基础上结合两个函数对原始数据进行操作。例如:val rdd = sc.parallelize(List(1, 2, 3, 4)) val result = rdd.aggregate(1)(add1, add2) //会先通过初始值1+值1,2,3,4计算出值为11,然后11带入add2初始值1+11=12 def add1(p1: Int, p2: Int) = { p1 + p2 } def add2(p1: Int, p2: Int) = { p1 + p2 } |
sortByKey([ascending], [numPartitions]) | 根据key进行排序 |
join(otherDataset, [numPartitions]) | 将两组相同key的值链接到一起组成新的键值对。 |
cogroup(otherDataset, [numPartitions]) | 在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable |
cartesian(otherDataset) | 在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集。比如:(1,2)和(3) 会返回(1,3)(2,3)组合 |
pipe(command, [envVars]) | 可以进行外部程序调用。比如调用python:val scriptpath="/tmp/test/test.py" println(rddData.pipe(scriptpath).collect().toList) |
coalesce(numPartitions) | 将RDD分区减少到numPartitions。 |
repartition(numPartitions) | 在RDD中随机重排以获得更多或更少的分区。 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的分区程序对RDD进行重新分区,并在每个分区结果中,按其键进行排序。 |
Action | 说明 |
---|---|
reduce(func) | 使用func函数聚合数据集元素。 |
collect() | 在driver中收集所有元素并返回。通常收集较小数据集时候用。 |
count() | 返回数据集中的元素数。 |
first() | 返回第一个元素,类似于(take(1)) |
take(n) | 返回n个元素。 |
takeSample(withReplacement, num, [seed]) | 返回带有num个元素的随机取样数据。withReplacement代表是否重复提取。 |
takeOrdered(n, [ordering]) | 获取排序后的前几个数据 |
saveAsTextFile(path) | 将数据以文本文件的形势写入到本地文件系统,HDFS或其它支持Hadoop的系统中。 |
saveAsSequence(path) | 将数据以SequenceFile形势保存在本地文件系统,HDFS或其它支持Hadoop的系统中。 |
saveAsObjectFile(path) | 将数据以Object的形势保存。 |
countByKey() | 作用于k,v类型,返回k,int统计value的数量。 |
foreach(func) | 在数据集每个函数上运行func。 |
val conf = new SparkConf().setAppName("PVCalc").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.textFile("./data/ipData")
val pv = data.map(line => (line.split(" ")(0), 1)).countByKey()
pv.foreach(println)
val conf = new SparkConf().setMaster("local").setAppName("uv")
val sc = new SparkContext(conf)
val data = sc.textFile("./data/ipData")
val uv = data.map(line => (line.split(" ")(0) + "_" + line.split(" ")(1))).distinct().map(line => (line.split("_")
(0), 1)).reduceByKey(_ + _).sortBy(_._2, false) //按照倒序进行排列,如果参数仅仅使用一次,可以用"_"代替,如上面的写法按照常规的应该写为.sortBy(line=>line._2)
// ,如果有多个参数出现一次,可以使用"_*"代替,比如sum(1 to 5:_*)
uv.foreach(println)
参考资料:
idea如何设置自动换行: https://blog.csdn.net/zwj_jyzl/article/details/98473864
Scala中的_ 和 _*分别代表什么?: https://blog.csdn.net/wyz0516071128/article/details/81042667
背景
运行reduceBykey等操作需要在集群上拉取数据。
性能影响
由于大量的磁盘操作,于是考虑使用内存,但是内存容量达到一定级别就会持久化,这样还是很浪费性能的。
提供了几种持久化级别。
Storage Level | 说明 |
---|---|
MEMORY_ONLY(默认) | 将RDD作为反序列化存在RDD中。如果RDD不能完全缓存,则缓存一部分。 |
MEMORY_AND_DISK | 将RDD作为反序列化存在RDD中。如果RDD不能完全缓存,则都存在磁盘中。 |
MEMORY_ONLY_SER (Java and Scala) | 将RDD序列化为JAVA对象。这样会更节省时间,但是读取时很耗费CPU。 |
MEMORY_AND_DISK_SER (Java and Scala) | 与MEMORY_ONLY_SER类似,但是内存存不开会都存入磁盘。 |
DISK_ONLY | 将RDD分区存在磁盘。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与DISK_ONLY类似,但是在两个集群节点上复制每个分区,及存两份。 |
OFF_HEAP (experimental) | 与MEMORY_ONLY_SER类似,但是将数据存的是堆外存。 |
可以等待LRU规则自动释放,也可以调用RDD.unpersist()手动释放。
对所有集群进行广播数据。
Sava实现:
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value
Java实现:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
这样就能在任何节点上获取。
可以对集群中的数据进行累加。
Scala案例:
val accum=sc.longAccumulator
data.map{x=>accum.add(x);}
Java案例:
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
Spark非常适合使用任何流行的单元测试框架进行单元测试。 只需在测试中创建一个SparkContext,并将主URL设置为local,运行您的操作,然后调用SparkContext.stop()将其拆解。 确保您在finally块或测试框架的tearDown方法中停止上下文,因为Spark不支持在同一程序中同时运行的两个上下文。
Spark属性配置
通过编程配置(主要设置SparkConf),举例:
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
提交的时候进行指定,举例:
./bin/spark-submit --name "My App" --master local[4]
在conf/spark-defaults.conf 中进行配置,每行包含一个键和一个由空格分隔的值,举例:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
Spark环境变量配置
在./conf/spark-env.sh中进行配置,举例:
export JAVA_HOME=/opt/java/jdk1.8.0_221
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=tuge1:2181,tuge2:2181,tuge3:2181 -Dspark.deploy.zookeeper.dir=/opt/spark"
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/etc/hadoop"}
Spark日志配置
在log4j.properties中配置。
我们可以直接复制 ./conf/log4j.properties.template进行配置。举例:
mc log4j.properties.template log4j.properties
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
使用关系查询处理结构化数据。
SparkSession是所有功能的入口点,要创建SparkSession,只需要使用SparkSession.builder();即可
Scala举例:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
Java举例:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
在examples / src / main / scala / org / apache / spark / examples / sql / SparkSQLExample.scala” 可以找到示例源码。
使用SparkSession可以从现有的RDD,Hive或Spark数据流中读取数据。
例如从JSON文件中读取数据。
Scala举例:
val spark = SparkSession.builder.appName("sparkSession Study").master("local").getOrCreate()
val txtData= spark.read.textFile("./data/words")
txtData.show()//打印结果
Java举例:
SparkSession spark = SparkSession.builder().appName("SparkSession案例").master("local").getOrCreate();
Dataset<String> txtData = spark.read().textFile("./data/words");
txtData.show();//打印结果
显示结果:
+-------------+
| value|
+-------------+
| hello YiMing|
|hello XiaoBei|
| hi LiSi|
| hello|
| hello|
+-------------+
可以在 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 找到案例。
people文件内容:
{"name":"Michael"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}
Scala演示:
val spark = SparkSession.builder.appName("sparkSession Study").master("local").getOrCreate()
import spark.implicits._ //使用$符号需要此导入
val df = spark.read.json("./data/people")
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
Java演示:
SparkSession spark = SparkSession.builder().appName("SparkSession案例").master("local").getOrCreate();
Dataset<String> df = spark.read().textFile("./data/people");
df.printSchema();
df.groupBy("age").count().show();
df.select("name").show();
df.filter(col("age").gt(21)).show();
df.select(col("name"),col("age").plus(2)).show();
更多代码可在 examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala 找到。
Scala演示:
val spark = SparkSession.builder().appName("spark SQL Demo").master("local").getOrCreate()
val data = spark.read.json("./data/people")
data.createOrReplaceTempView("person")
val dfSql = spark.sql("select age from person")
dfSql.show()
// +----+
// | age|
// +----+
// |null|
// | 30|
// | 19|
// +----+
Java演示:
SparkSession ss = SparkSession.builder().appName("spark demo ").master("local").getOrCreate();
Dataset<Row> ds = ss.read().text("./data/people");
ds.createOrReplaceTempView("people");
Dataset<Row> dsSql = ss.sql("select *from people");
dsSql.show();
SparkSQL是临时视图作用域的,如果创建它的会话终止,它将消失。如果要在所有会话中共享一个临时视图,直到Spark程序终止,则可以创建全局临时视图。全局临时视图数据库与global_temp关联。例如,选择select *from global_temp view1
。
Scala演示:
val spark = SparkSession.builder().appName("spark SQL Demo").master("local").getOrCreate()
val data = spark.read.json("./data/people")
data.createGlobalTempView("people")
spark.sql("select *from global_temp.people").show() //打印结果
spark.newSession().sql("select *from global_temp.people").show()//打印结果
Java演示:
SparkSession ss = SparkSession.builder().appName("spark demo ").master("local").getOrCreate();
Dataset<Row> ds = ss.read().text("./data/people");
ds.createGlobalTempView("people");
ss.sql("select *from global_temp.people").show();
ss.newSession().sql("select *from global_temp.people").show();
在examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
查看示例。
Dataset和RDD相似,但是不是使用Java和Kryo序列化,而是有专门的Encoder对象进行序列化。虽然Encoder和通常的序列化一样将对象转换为字节,但是Encoder是动态生成的代码,并使用一种格式,该格式允许过滤,排序,哈希处理等,并且处理过程中无需反序列化。
Scala举例:
val spark = SparkSession.builder().appName("spark SQL Demo").master("local").getOrCreate()
import spark.implicits._
//示例1:将集合转换为DataSet
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect().foreach(println)
// 2
// 3
// 4
//示例2:将文件内容转换为对象
val peopleData = spark.read.json("./data/people")
val peopleDS = peopleData.as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Spark SQL支持两种将现有的RDD转换为Dataset的方法。一种是通过反射,这种代码简洁,适用于运行前就了解架构。另一种是通过编程接口指定,这种代码比较长,这种适用于运行时才了解架构的数据集。
使用反射推断架构
words文件内容:
张三 23
小贝 24
王五 55
小明 16
小强 11
Scala实现:
val spark = SparkSession.builder().appName("spark SQL Demo").master("local").getOrCreate()
import spark.implicits._
val peopleDF = spark.sparkContext
.textFile("./data/words")
.map(_.split(" "))
.map(attributes => Person(attributes(0), attributes(1).toInt))
.toDF()
peopleDF.createOrReplaceTempView("people")
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Name:" + teenager.getAs("name")).show()
// +---------+
// | value|
// +---------+
// |Name:小明|
// +---------+
println("映射完后打印")
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] //指定编码
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
//Map(name -> 小明, age -> 16)
以编程接口方式指定架构
val spark = SparkSession.builder().appName("spark demo").master("local").getOrCreate()
//首先创建一个RDD
//根据第一步创建的RDD创建struct
//将struct应用于RDD
val wordRDD = spark.sparkContext.textFile("./data/words")
val structString = "name age"
val structRDD = structString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(structRDD)
val rowRDD = wordRDD.map(line => line.split(" ")).map(line => Row(line(0), line(1)))
val wordsDF = spark.createDataFrame(rowRDD, schema)
wordsDF.createOrReplaceTempView("people")
val select = spark.sql("select name,age from people")
implicit val encoder=org.apache.spark.sql.Encoders.STRING//添加字符串类型编码器
select.map(attributes => "Name: " + attributes(0)).show()
结果:
+----------+
| value|
+----------+
|Name: 张三|
|Name: 小贝|
|Name: 王五|
|Name: 小明|
|Name: 小强|
+----------+
未类型化的用户定义的聚合函数
Scala演示:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// Register the function to access it
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
类型化的用户定义的聚合函数
Scala演示:
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
Spark SQL可以通过DataFrame对接各种数据源进行操作。DataFrame可以使用关系转换进行操作,还可以创建临时视图。将DataFrame创建为临时视图可以运行SQL查询。本节介绍了使用Spark加载和保存数据源的一般方法,然后介绍了可用内置数据源的特定选项。
由于咨询了一些朋友,都说公司用scala,所以下面就用scala实现了。
公共代码部分:
val spark = SparkSession.builder().appName("test Scala").master("local").getOrCreate()
手动指定操作的文件格式
读取parquet格式文件,并提取保存为text格式。
val peopleDF=spark.read.format("parquet").load("./data/namesAndAges.parquet")
peopleDF.select("name").write.format("text").save("./data/names.text")
直接在文件上运行SQL
除了使用Read API运行API外,还可以直接使用SQL查询文件。
val sqlDF = spark.sql("SELECT * FROM parquet.`data/namesAndAges.parquet`")
保存模式(Save Modes)
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists (default) | "error" or "errorifexists" (default) | 如果文件存在则报错。 |
SaveMode.Append | "append" | 如果有内容,则追加。 |
SaveMode.Overwrite | "overwrite" | 如果有内容,则覆盖。 |
SaveMode.Ignore | "ignore" | 如果有内容,则不作任何操作。 |
代码示例:
fsWords.write.format("parquet").mode(SaveMode.Overwrite).save("./data/namesAndAges.parquet")
保存永久表
除了使用save将数据保存文件,还可以使用saveAsTable将数据保存在hive里面。如果不指定路径,则保存默认路径,删除表时会自动删除。当然了,还可以使用df.write.option("path", "/some/path")
指定路径。如果手工指定路径的话,默认不会收集metastore信息,需要手动在控制台执行 MSCK REPAIR TABLE
命令。
分组,排序和分区
分组的数据必须保存在table表里面,示例如下:
val fsWords = spark.read.format("json").load("./data/people.json") //加载数据
val fsTable = fsWords.write.bucketBy(1, "name").sortBy("age").saveAsTable("people")//保存到表
spark.sql("select *from people").show()//展示数据
分区示例代码:
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
分区和分桶一起使用:
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
代码可参考: "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"
上面已经演示了通用的加载方法,这里还有专门针对Parquet的加载方法,其实也就是做了一步封装。
以编程方式加载数据
Scala演示如下:
val jsonData = spark.read.json("./data/people.json") //加载JSON文件
jsonData.write.mode(SaveMode.Ignore).parquet("./data/people.parquet") //将文件保存为parquet格式
val parquetData = spark.read.parquet("./data/people.parquet") //获取parquet文件
parquetData.createOrReplaceTempView("people") //将内容转换为临时表
val sparkSql = spark.sql("select name,age from people where age>20") //使用SQL查询大于20的人
implicit val encoder = org.apache.spark.sql.Encoders.STRING //声明类型
sparkSql.map(attributes => "Name: " + attributes(0) + ",Age:" + attributes.getAs("age")).show() //打印
分区发现
可以将文件放在多个分区里面,然后使用spark.read.parquet,spark.read.load 加载数据,Spark SQL会自动提取分区数据。
提取的数据会自动解析类型,如果不想自动解析的话,可以在SparkSession构建的时候配置 spark.sql.sources.partitionColumnTypeInference.enabled 为false。
文件会根据分区路径加载下面的数据。
文件合并
合并两个格式相同的文件,默认合并 功能是 关闭的,可以通过设置mergeSchema为true开启。例如:
import spark.implicits._
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("./data/squaresDF/key=1") //保存在key标识符为1的文件夹内
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i)).toDF("value", "cube")
cubesDF.write.parquet("./data/squaresDF/key=2") //保存在key标识符为2的文件夹内
val mergedDF = spark.read.option("mergeSchema", true).parquet("./data/squaresDF") //读取文件
mergedDF.printSchema() //打印结构
// root
// |-- value: integer (nullable = true)
// |-- square: integer (nullable = true)
// |-- cube: integer (nullable = true)
// |-- key: integer (nullable = true)
Hive Metastor Parquet默认支持技术
Spark在读取Hive 的Metastor Parquet文件时,使用的是Spark SQL自己的Parquet支持,而不是使用Hive Serde进行序列化操作。
可以通过 spark.sql.hive.convertMetastoreParquet
进行配置。
读取hive表时,数据变化,需要手动刷新表
读取hive的parquet时,通常会缓存下提高读取性能。
这个时候当parquet文件数据增加时,并且不是通过spark sql添加的,而是hive自己或其它外部工具添加的,这个时候需要手动刷新缓存,如下:
spark.catalog.refreshTable("person");
parquet配置
可以在创建SparkSession时对parquet进行配置
Property Name | Default | Meaning |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 在编写parquet时,其它一些parquet系统,例如hive,impala和旧版的Spark Sql不会区分二进制和字符串,这个属性设置为ture告诉SparkSql将二进制转换为字符串。以提供系统兼容性。 |
spark.sql.parquet.int96AsTimestamp |
true | 在编写parquet时,其它一些parquet系统,例如hive和impala,将时间戳存在INT96里面,该标识告诉Saprk Sql将INT96转换为时间戳,以提供系统兼容性。 |
spark.sql.parquet.compression.codec |
snappy | 指定压缩,可选择的值包括:none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd。 |
spark.sql.parquet.filterPushdown |
true | 启用过滤优化。 |
spark.sql.hive.convertMetastoreParquet |
true | 设置false时,Spark SQL将使用Hive SerDe用于parquet。 |
spark.sql.parquet.mergeSchema |
false | 合并所有收集的文件。 |
spark.sql.parquet.writeLegacyFormat |
false | 设置为true,将将以Spark1.4及更早期版本的方式写入数据。 |
ORC是Hadoop生态圈的一个列式存储文件,最早产生于Hive,它和parquet相似,它旨在于降低存储空间时提高Hive的查询速度。有关它的配置如下:
Property Name | Default | Meaning |
---|---|---|
spark.sql.orc.impl |
native |
ORC实现的名称。 它可以是“ native”和“ hive”之一。 “ native”表示基于Apache ORC 1.4构建的本机ORC支持。 “ hive”是指Hive 1.2.1中的ORC库。 |
spark.sql.orc.enableVectorizedReader |
true |
设置为true,则在本机中使用矢量化解码。设置为false,则在本机中使用非矢量化解码。 |
Spark可以自动完成JSON类型的加载。使用SparkSession.read.json()就可以完成此转换。
例如:
val jsonData=spark.read.json("./data/people.json")
jsonData.createOrReplaceTempView("people")
spark.sql("select age from people").show()
我们在编码之前,如果你使用maven的话,或者其它,要确认项目引入了spark-hive,否则一切都用不了。
使用hive不需要搭建环境,配置 spark.sql.warehouse.dir 就能直接运行。
简要代码如下:
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("hive demo")
.master("local")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport().getOrCreate()
import spark.sql
sql("create table if not exists src(id int,name string) row format delimited fields terminated by ‘ ‘")//创建表
sql("load data local inpath ‘./data/words‘ overwrite into table src")//投放数据
sql("select name from src").show()
指定Hive存储格式
使用 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat ‘parquet‘) 定义。
Property Name | Meaning |
---|---|
fileFormat | fileFormat是一种存储格式规范的软件包,其中包括“ serde”,“ input format”和“ output format”。 目前,我们支持6种文件格式:“ sequencefile”,“ rcfile”,“ orc”,“ parquet”,“ textfile”和“ avro”。 |
inputFormat, outputFormat | 这2个选项将对应的InputFormat 和OutputFormat 类的名称指定为字符串文字,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果已经指定 fileFormat`选项,则不能指定它们。 |
serde | 此选项指定Serde类的名称。 当指定fileFormat 选项时,如果给定的fileFormat 已经包含serde信息,则不要指定此选项。 当前,“ sequencefile”,“ textfile”和“ rcfile”不包含Serde信息,您可以将此选项与这3种fileFormats一起使用。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim | 这些选项只能与“文本文件” fileFormat一起使用。 它们定义了如何将定界文件读取为行。 |
与Hive Metastor不同版本进行交互
与Hive metastore的交互是Spark SQL对Hive的最重要支持之一,它使Spark SQL可以访问Hive表的元数据。 从Spark 1.4.0开始,使用以下描述的配置,可以使用Spark SQL的单个二进制版本来查询Hive元存储的不同版本。 请注意,与用于与Metastore进行通信的Hive版本无关,Spark SQL在内部将针对Hive 1.2.1进行编译,并将这些类用于内部执行(serdes,UDF,UDAF等)。
Property Name | Default | Meaning |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hive Metastore的版本。 可用的选项是0.12.0到2.3.3。 |
spark.sql.hive.metastore.jars |
用于实例化HiveMetastoreClient的jar的位置。 此属性可以是以下三个选项之一: 1.builtin 选用和Hive1.2.1捆绑在一起的 2.maven 使用maven下载jar 3.JVM标准路径 包括Hive和所有依赖项,一般用于生产。 |
|
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql com.microsoft.sqlserver,oracle.jdbc |
共享类。 |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
使用前缀加载,例如:org.apache.spark.* |
举例:
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
属性参照表:
Property Name | Meaning |
---|---|
url | 要连接的JDBC URL指定。 mysql://localhost:3306/test?user=fred&password=secret |
dbtable | 设置查询的数据表。 |
query | 查询语句,例如:spark.read.format("jdbc").option("url", jdbcUrl).option("query", "select c1, c2 from t1").load() |
driver | JDBC驱动URL配置。 |
partitionColumn, lowerBound, upperBound | 设置分区数,这三个这顶的话 ,必须都设定。lowerBound和upperBound用于设置步幅。 |
numPartitions | 表读写中并行处理的最大分区数,这也决定了JDBC连接的最大数量。 |
queryTimeout | 查询超时设置,0表示没有限制。 |
fetchsize | 每次读取文件的大小。适用于read。 |
batchsize | 批处理大小,该大小决定每次往返插入多少行。默认是1000行。 |
isolationLevel | 定义事物隔离级别,它可以是“ NONE”,“ READ_COMMITTED”,“ READ_UNCOMMITTED”,“ REPEATABLE_READ”或“ SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务隔离级别,默认值为“ READ_UNCOMMITTED”。此选项适用于write。 |
sessionInitStatement | 这个选项用于执行在数据库进行会话后,执行自定义SQL语句之前。用于实现初始化。示例:option("sessionInitStatement", """BEGIN execute immediate ‘alter session set "_serial_direct_read"=true‘; END;""") |
truncate | 这是与JDBC编写器相关的选项。用于wrie。 |
cascadeTruncate | 这是与JDBC编写器相关的选项。用于wrie。 |
createTableOptions | 创建数据表。(例如,“ CREATE TABLE t(名称字符串)ENGINE = InnoDB”) |
createTableColumnTypes | 用于添加字段。应以与CREATE TABLE列语法相同的格式指定(例如:“名称CHAR(64),注释VARCHAR(1024)”),此选项用于write。 |
customSchema | 指定读取模式的自定义数据类型,例如:("customSchema", "id DECIMAL(38, 0), name STRING") |
pushDownPredicate | predicate是一个函数式接口,可以用于lambda接口和方法引用。此选项是用于启用或者禁用将数据推送给JDBC处理,默认是开启的,如果Spark处理predicate速度比较快的话,可以禁用掉。 |
Avro是一个数据序列化系统,由Hadoop创始人 Doug Cutting 开发。
部署中
将spark-avro添加到spark-submit中
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.5 ...
将spark-avro添加到spark-shell中
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.5 ...
加载和保存功能
引入spark-avro依赖项,并在项目中声明org.apache.spark.sql.avro
举例:
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
to_avro()和from_avro()
通过from_avro处理数据,并通过to_avro向下传递,代码如下所示:
import org.apache.spark.sql.avro._
// from_avro需要JSON字符串格式的Avro模式
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. 将Avro数据解码为结构;
// 2. 按“ favorite_color”列过滤;
// 3. 以Avro格式对“名称”列进行编码。
val output = df
.select(from_avro(‘value, jsonFormatSchema) as ‘user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as ‘value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
数据源选项
可以通过DataFrameReader或DataFrameWriter上的option设置Avro的数据选项。
Property Name | Default | Meaning | Scope |
---|---|---|---|
avroSchema | None | 用于以JSON格式提供可选的Avro模式。 | read and write |
recordName | topLevelRecord | 写入文件的时候,定义名称。 | write |
recordNamespace | "" | 写入文件的时候,定义命名空间。 | write |
ignoreExtension | true | 是否忽略扩展名。(带与不带.avro) | read |
compression | snappy | 写入文件时,指定压缩解码器。支持的压缩解码器有uncompressed , snappy , deflate , bzip2 and xz 。 |
write |
配置
可以通过SparkSession上的setConf方法或通过SQL 运行set key=value来完成avro的配置。
Property Name | Default | Meaning |
---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | 设置为true,则将com.databricks.spark.avro映射到外部数据模块,以提供向后兼容。 |
spark.sql.avro.compression.codec | snappy | 指定压缩解码器,可选则的有:uncompressed, deflate, snappy, bzip2 and xz. |
spark.sql.avro.deflate.level | -1 | 指定压缩级别,可选的数值为1-9和-1.默认是-1. |
与Databricks spark-avro兼容性
该Avro数据源模块最初来自Databricks的开源存储库spark-avro并与之兼容。
AvroDataFrameWriter和AvroDataFrameReader删除了,改用DataFrameWriter或DataFrameReader中的.format(“ avro”)。
Avro->Spark SQL转换支持的类型
Avro type | Spark SQL type |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | See below |
除了上面列出的类型之外,它还支持读取联合类型。 以下三种类型被视为基本联合类型:
union(int,long)将映射到LongType。
union(float,double)将被映射为DoubleType。
union(something,null),其中something是任何受支持的Avro类型。 这将被映射为与something相同的Spark SQL类型,并将nullable设置为true。 所有其他联合类型都被认为是复杂的。
它还支持读取以下Avro逻辑类型:
Avro logical type | Avro type | Spark SQL type |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
Spark SQL支持的类型->Avro类型的转换
Spark SQL type | Avro type | Avro logical type |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
您还可以使用选项avroSchema指定整个输出Avro模式,以便可以将Spark SQL类型转换为其他Avro类型。 默认情况下,不应用以下转换,并且需要用户指定的Avro模式:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |
Spark SQL可以通过调用spark.catalog.cacheTable("tableName")
或者调用dataFrame.cache()
。然后,Spark SQL要查询必须的列。取消缓存使用spark.catlog.uncacheTable("tableName")
。
可以使用setConf来设置缓存数据。
Property Name | Default | Meaning |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 设置为true,Spark Sql将自动选择压缩器来进行压缩。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 设置缓存的批处理大小。设置的越大越能提高缓存的处理和压缩效率。但是太大了会出现OOMs问题。 |
以下选项也可以用于调整查询执行的性能。 随着自动执行更多优化,这些选项可能会在将来的版本中弃用。
Property Name | Default | Meaning |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | read文件时,打包到单个分区的最大字节数。 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | 可以同时扫描打开文件的总大小。 |
spark.sql.broadcastTimeout | 300 | 广播超时时间。 |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置表的最大大小。 |
spark.sql.shuffle.partitions | 200 | 配置join和aggregations时的分区数。 |
join表时进行广播,例如:
import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
Spark SQL可以使用JDBC和ODBC进行调用,在这种模式下Spark SQL可以直接运行SQL,而无需其它代码。
运行以下命令来启动JDBC/ODBC。
./sbin/start-thriftserver.sh
使用以下./sbin/start-thriftserver.sh --help
查看帮助,默认运行在localhost:10000端口。也可以手动配置:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh --master <master-uri> ...
或运行时配置:
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=<listening-port> --hiveconf hive.server2.thrift.bind.host=<listening-host> --master <master-uri>
...
现在你可以使用beeline 对JDBC/ODBC进行测试:
./bin/beeline
使用以下命令进行连接:
beeline> !connect jdbc:hive2://localhost:10000
belline会要求您提供用户名和密码。在非安全模式下,用户名和密码空白即可。
通过将您的hive-site.xml,core-site.xml和hdfs-site.xml文件放在/conf中来完成配置。
JDBC支持通过Http传输发送Thrift RPC,可以在/conf下的hive-site.xml文件中进行启用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
要进行测试,可以使用beeline通过以下方式通过http进行测试。
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
Spark SQL CLI是一种工具,可以在本地模式下运行Hive Metastore服务执行查询。使用以下命令打开:
./bin/spark-sql
结构化数据流是基于Spark Sql进行的一种流式计算框架。原来的架构延迟低至100毫秒,新架构能低至1毫秒。
在执行程序前必须创建的结构就是Spark了。
//创建Spark
val spark = SparkSession
.builder()
.appName("hive demo")
.master("local")
.getOrCreate()
操作Streaming。
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()
//从9999端口获取流数据
val words = lines.as[String].flatMap(_.split(" ")) //分隔数据
val wordCount = words.groupBy("value").count() //计算值的数量
val query = wordCount.writeStream.outputMode("complete").format("console").start() //打印到控制台
query.awaitTermination() //等待结束,以防止程序还没执行完就停止运行
执行的话可以选择在本地编译执行,可以放在服务器执行现有的Demo。
这里我选择在Linux执行。
$ nc -lk 8888
然后打开一个新的端口执行:
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 8888
然后,将对运行netcat服务器的终端中键入的任何行进行计数并每秒打印一次。 它将类似于以下内容:
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
Structured Streaming的关键思想是将传输的流数据看作是被连续添加的表。
基本概念
新添加的流都会追加新表中。
计算一段时间,会生成一个结果表,后续的计算会更新此表,并且此表会同步到外部存储。
写入外部存储器的几种模式:
1.完整模式:将整个结果表对存储器进行更新。
2.追加模式:仅仅追加新写入的行。
3.更新模式:仅仅更新改过的内容或新追加的内容。(2.1.1版本后才可用)
拿上面简单演示举例,wordcount是生成的结果表,spark会不断检查是否有了新数据,如果有了的话,则它会将原来的结果集和新数据集合并进行计算。
PS:结构化流不会实现整个表。 它从流数据源读取最新的可用数据,对其进行增量处理以更新结果,然后丢弃该源数据。 它仅保留更新结果所需的最小中间状态数据。
时间处理和延迟数据
每行数据都有个时间,根据时间可以查询出某个时间段的数据流。
有晚到达的数据,也能很好的自行处理。
容错
采用了端到端跟踪处理,可以采用重启或重新处理来处理故障。
可以使用SparkSession像处理静态有界数据一样处理动态无界数据。
可以使用SparkSession.readStream返回的DataStreamReader创建streaming DataFrames。
File source :可以从json,text,orc,parquet,csv获取数据。
Kafka source :可以从kafka获取数据。
Socket source (for testing) :可以从socket获取数据,此功能用于测试。
Rate source (for testing) :可以生成带时间的一行行数据进行测试。
下面是一些详细介绍:
Source | Options | 容错 |
---|---|---|
File source | maxFilesPerTrigger:处理文件的最大数量。(默认值no max)。 latestFirst:是否处理最新的文件。当存在积压文件时。(默认值false)。 fileNameOnly:是否基于文件名,而不是完整路径检查文件。(默认值false)。当为true时,E:/1.txt 和F:/1.txt将视为一样。 |
Yes |
Socket Source | host:要链接的主机。 port:要链接的端口。 |
No |
Rate Source | rowsPerSecond:每秒生成多少行。(默认值1) rampUpTime:生成速度。(默认值0) rowsPerSecond:加速时间。(默认最大加速) numPartitions:所生成的分区号,例如10.(默认是spark的并行度) |
Yes |
Kafka Source | 参考 Kafka Integration Guide. | Yes |
接下来演示下:
val socketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", "22")
.load()
println(socketDF.isStreaming)//打印是否是流数据,这里显示true
socketDF.printSchema()//打印结构图
// root
// |-- value: string (nullable = true)
//将文件流存入csv文件。
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream
.option("sep", ";").
schema(userSchema)
.csv("./data/csvDir")
默认情况下从源转换为structured streaming时,需要手动指定schema,而不是让它自动推断,这样在发生故障时方便进行恢复,当然了也可以通过spark.sql.streaming.schemaInference
设置为true来启动自动模式推断。
当存在/key=value/的子目录时,分区发现会产生,列表会自动归到这些目录中。
你可以在DataFrames和Datasets上运行无类型的操作,例如SQL操作(select,join,groupBy),和有类型的操作,类似于RDD操作,例如(filter,map,flatMap)。
基本操作:选择,投影,汇总
DataFrame和Dataset支持大部分操作,还有一小部分不支持,在后面会讲。
举例:
//创建Spark
val spark = SparkSession
.builder()
.appName("hive demo")
.master("local")
.getOrCreate()
import spark.implicits._
val df: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", "22").load()
val ds: Dataset[DeviceData] = df.as[DeviceData]
df.select("device").where("signal > 10")//使用"untyped "API
ds.filter(_.signal > 10).map(_.device)//使用"typed "API
//获取每种设备上的更新信号
df.groupBy("deviceType").count()
//获取每种设备上的平均信号
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))
也可以生成临时视图,然后使用SQL调用。
import spark.implicits._
val df: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", "22").load()
df.createOrReplaceTempView("updates")//生成updates临时视图
spark.sql("select *from updates")//使用SQL调用
PS:您可以使用df.isStreaming来确定DataFrame/Dataset是否有数据流。
事件时间窗口的操作
sliding 类似于聚合索引。考虑到单词可能会进入两个重合的时间段之间,因此可以指定单词名称和时间戳。例如下图是10分钟统计一次,5分钟更新一次效果图:
由于此窗口化类似于分组,因此在代码中,您可以使用groupBy()和window()操作来表示窗口化聚合。
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
处理后期数据和加水印
现在考虑如果事件之一迟到了应用程序会发生什么。 例如,应用可以在12:11接收在12:04(即事件时间)生成的单词。 应用程序应使用12:04而不是12:11来更新窗口12:00-12:10的旧计数。 这在基于窗口的分组中很自然地发生–结构化流可以长时间保持部分聚合的中间状态,这样后期的数据就可以正确地更新旧窗口的聚合,如下所示。
但是,要连续几天运行此查询,系统必须限制其累积的中间内存状态量。这意味着系统需要知道何时可以从内存中状态删除旧聚合,因为应用程序将不再接收该聚合的最新数据。为此,在Spark 2.1中,我们引入了水印功能,该功能使引擎自动跟踪数据中的当前事件时间,并尝试相应地清除旧状态。您可以通过指定事件时间列和有关事件时间期望数据延迟的阈值来定义查询的水印。对于在时间T结束的特定窗口,引擎将维持状态并允许延迟数据更新状态,直到(引擎看到的最大事件时间-延迟阈值> T)。换句话说,阈值内的延迟数据将被汇总,但阈值后的数据将开始被丢弃(有关确切保证,请参阅本节后面的内容)。让我们通过一个例子来理解这一点。我们可以使用withWatermark()在前面的示例中轻松定义水印,如下所示。
import spark.implicits._
val userSchema = new StructType().add("timestamp", "timestamp").add("word", "string")
val words = spark.readStream.schema(userSchema).json("./data/senddata") //流格式{"timestamp":TimeStamp,"word":String}
words.printSchema()
import org.apache.spark.sql.functions._
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
在此示例中,我们将timestamp定义为查询的水印,并将10分钟定义为数据晚到的阈值。流程图如下:
?
? (更新模式)
?
? (追加模式)
PS:1.加水印必须为“更新”或“追加”模式,“完整复制”必须保留所有中间结果,所以不支持添加水印。
流式数据和静态数据可以进行整合操作。
流静态联接
自动Spark2.0以来,Spark支持流和静态之间的链接。如下所示:
val staticDF=spark.read....
val streamingDF=spark.readStream...
streamingDF.join(staticDF,"type") //内链接
streamingDF.join(staticDF,"type","right_join")//外链接
请注意,流静态联接不是有状态的,因此不需要状态管理。 但是,尚不支持某些类型的流静态外部联接。 这些在本“连接”部分的末尾列出。
流连接
在Spark2.3中,添加了对流链接的支持。两个图链接的困难在于在任何时间点,连接双方的数据图不完整。因此,对于这两个输入流,我们将过去的流状态进行缓冲,以便我们可以将每个流状态的输入与过去的输入进行匹配。
内部链接加水印(可选)
PS:支持任何类型的列上的内部链接以及任何类型的链接条件。但是,随着流的运行,流状态将无限期的增长,因为必须保存过去的输入,因为任何输入都可以与过去的输入匹配。为了避免无界状态,您必须定义其它链接条件,以使无限期的旧输入不能与将来输入匹配,因此可以从状态中清除它们。换句话说,您必须在连接中执行以下步骤。
1.给两个流加水印。
2.在两个时间之间进行约束。
举例:将广告展示次数和点击流进行关联,以统计点击多少次可获利。代码如下所示:
import org.apache.spark.sql.functions.expr
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
外部链接加水印
外部链接和上面的内部链接不同之处时,使用外部链接时必须指定事件+水印。语法差不多,不过多加一个joinType进行指定。
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType="leftOuter"
)
流查询中的联接支持矩阵
流重复数据删除
可以使用带水印进行延迟删除,也可以不带水印直接删除。如下所示:
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid")
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
处理多个水印的政策
流查询可以有多个输入流,这些输入流可以合并或进行链接。每个流都可以有自己的阈值。
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)//joinCondition是join条件的意思,这里我们根据实际情况配置。
默认情况下选择最小水印作为全局水印,因为如果一个流落后于其它流,它却没有数据时而被丢弃为时已晚。
但是,在某些情况下,即使在最慢的流中获取数据,您也想获取最快的结果。从spark2.4开始,您可以通过spark.sql.streaming.multipleWatermarkPolicy
设置max(默认为min)来设置多重水印,这使水印以最快的速度流动。
任意状态作业
与聚合相比,许多用例需要更高级的状态操作。 例如,在许多用例中,您必须跟踪事件数据流中的会话。 为了进行这种会话化,您将必须将任意类型的数据保存为状态,并在每个触发器中使用数据流事件对状态执行任意操作。 从Spark 2.2开始,可以使用mapGroupsWithState操作和功能更强大的flatMapGroupsWithState操作来完成此操作。 两种操作都允许您将用户定义的代码应用于分组的数据集以更新用户定义的状态。 有关更多具体细节,请查看API文档(Scala / Java)和示例(Scala / Java)。
不支持的操作
不支持.cout()而要使用ds.groupBy().count()
不支持.foreach()而要使用ds.writeStream.foreach()
? 定义了DataFrame和DataSet后,接下来就是流查询了。
输出方式
有几种类型的输出模式:
1.追加模式-将自上次以来添加的结果进行追加。(默认)
2.更新模式-紧更新自上次变动的内容。
3.完全模式-每次触发后会将整个内容覆盖。
不同类型的查询支持不同的输出模式,如下所示:
有几种内置类型的输出接收器:
1.File sink-将输出存储在目录
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
2.Kafka sink-将输出存储在Kafka的一个或多个主题
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
3.Foreach sink-对输出的记录运行任意计算
writeStream
.foreach(...)
.start()
4.Console-sink(供调试使用):每次运行,将内容输出到控制台。
writeStream
.format("console")
.start()
5.Memonry-sink(供调试使用):输出作为内存表存储在内存中。
writeStream
.format("memory")
.queryName("tableName")
.start()
一些接收器是不容错的,紧用于调试的目的,下面表格展示具体信息。
下面写个Demo演示下:
scala val peopleStruct = new StructType().add("name", "string").add("age", "integer") val peopleDataDF = spark.readStream.schema(peopleStruct).format("json").load("./data/people.json") val result = peopleDataDF.select("name").where("age>18")
println("----没有聚合的情况---")
//在控制台打印
result
.writeStream
.format("console")
.start()
//写入parquet文件
result.writeStream
.format("parquet")
.option("checkpointLocation", "./data/result")
.option("path", "./data/result")
.start()
println("---有聚合的情况---")
val groNameDF = peopleDataDF.groupBy("name").count()
//将更新的聚合打印到控制台
groNameDF.writeStream
.outputMode("complete") //指定模式为完整编译
.format("console")
.start()
//将结果打印到内存表
groNameDF.writeStream
.queryName("person")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select *from person")
使用foreach和foreachBatch操作
foreach流查询
foreachBatch(...)允许您在每个微批处理的输出数据上指定函数。从Spark2.4开始支持。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// Transform and write batchDF
}.start()
使用foreachBatch,可以执行以下操作。
1.重用现有的批处理数据源-对于许多存储系统,可能还没有流式接收器,但是可能已经存在用于批处理查询的数据写入器。 使用foreachBatch,可以在每个微批处理的输出上使用批处理数据写入器。
2.写入多个位置-如果要将流查询的输出写入多个位置,则可以简单地多次写入输出DataFrame / Dataset。 但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。 为了避免重新计算,您应该缓存输出DataFrame / Dataset,将其写入多个位置,然后取消缓存。 这是一个轮廓。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
foreach
foreachBatch从根本上依赖于微批处理执行,无法连续处理,如果使用连续模式,请改用foreach。
步骤就是打开->处理->关闭。
在scala中,您必须扩展foreach扩展类。
result.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String): Unit = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()
Triggers(触发)
流查询触发器定义了流处理的时间,无论是foreachBatch还是foreach,以下是否支持的各种触发器。
下面有一些案例:
result.writeStream
.format("console")
.start()
result.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()
result.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
result.writeStream
.format("console")
.trigger(Trigger.Continuous("1 hours"))
.start()
启用查询时创建的StreamingQuery对象用于监视和管理查询。
val query = df.writeStream.format("console").start() //
query.id // 获取正在运行的查询的唯一标识符,该标识符在从检查点数据重新启动后持续存在
query.runId // 获取此查询运行的唯一ID,它将在每次启动/重新启动时生成
query.name //获取自动生成的名称或用户指定的名称
query.explain() // 打印查询的详细说明
query.stop() // 停止查询
query.awaitTermination() // 阻塞,直到查询终止,带有stop()或错误
query.exception // 如果查询因错误而终止,则为异常
query.recentProgress // 此查询的最新进度更新的数组
query.lastProgress // 此流查询的最新进度更新
您可以 在单个SparkSession启动任意数量的查询,它们将同时运行以共享群资源。您可以使用spark.streams()获取可用于管理当前活动查询的StreamingQueryManager。
spark.streams.active // 获取当前活动的流查询列表
spark.streams.get(id) // 通过其唯一ID获取查询对象
spark.streams.awaitAnyTermination() //阻止直到其中任何一个终止
有多种方法可以监视活动的流查询。 您可以使用Spark的Dropwizard指标支持将指标推送到外部系统,也可以通过编程方式访问它们。
交互阅读指标
您可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取活动查询的当前状态和指标。下面是一些案例:
val query: StreamingQuery = ... //以writeStream...start() 结尾的
println(query.lastProgress)
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status)
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
使用异步API以编程方式报告指标
您还可以通过附加的StreamingQueryListener来异步监视与SparkSession关联的所有查询。
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
使用Dropwizard报告指标
Spark支持使用Dropwizard库的报告指标。 要同时报告结构化流查询的指标,您必须在SparkSession中显式启用配置spark.sql.streaming.metricsEnabled。
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
启用此配置后,在SparkSession中启动的所有查询都将通过Dropwizard向所有已配置的接收器报告指标(例如Ganglia,Graphite,JMX等)。
万一发生故障或有意关闭时,您可以恢复先前的进度和先前查询的状态,并在中断的地方继续进行。 这是通过使用 checkpointing 和预写日志来完成的。 您可以使用 checkpoint 配置查询,查询会将所有进度信息(即在每个触发器中处理的偏移量范围)和正在运行的聚合(例如快速示例中的字数)保存到检查点位置。 此检查点位置必须是与HDFS兼容的文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项。
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
这部分内容较多,都是文字,没有代码,自行阅览官网。
这是个实验功能,说可以实现最低1ms的延迟,之前最低是100毫秒。自行阅览官网。
Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。 可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中提取数据,并可以使用以高级功能(如map,reduce,join和window)表示的复杂算法来处理数据。 最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。 实际上,您可以在数据流上应用Spark的机器学习和图形处理算法。
在内部,它的工作方式如下。 Spark Streaming接收实时输入数据流,并将数据分成批次,然后由Spark引擎进行处理,以生成批次的最终结果流。
Spark Streaming提供了称为离散流或DStream的高级抽象,它表示连续的数据流。 DStreams可以根据来自诸如Kafka,Flume和Kinesis之类的源的输入数据流来创建,也可以通过对其他DStreams应用高级操作来创建。 在内部,DStream表示为RDD序列。
本指南将向您演示使用DStream操作Spark Streaming程序。
在详细介绍如何编写自己的Spark Streaming程序之前,让我们快速看一下简单的Spark Streaming程序的外观。 假设我们要计算从TCP套接字侦听的数据服务器接收到的文本数据中的单词数。 您需要做的如下。
首先,我们将Spark Streaming类的名称以及从StreamingContext进行的一些隐式转换导入到我们的环境中,以便向我们需要的其他类(如DStream)添加有用的方法。 StreamingContext是所有流功能的主要入口点。 我们创建具有两个执行线程和1秒批处理间隔的本地StreamingContext。
编写代码前,如果你使用的Maven,请先引入
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
要使用Kafka,Flume,Kinesis添加以下依赖项:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-10_2.12 |
Flume | spark-streaming-flume_2.12 |
Kinesis | spark-streaming-kinesis-asl_2.12 [Amazon Software License] |
编写代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("demo").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1)) //SparkStreaming中最主要的就是StreamingContext
val lines = ssc.socketTextStream("localhost", 9999) //获取socket的text流
val words = lines.flatMap(_.split(" "))//分割字段
val wordKey = words.map(word => (word, 1))//映射字段文字为(key,value)也就是给每个单词加初始值1,便于后面进行计算
val wordCount = wordKey.reduceByKey(_ + _)//统计文字数量
ssc.start() //开始执行
ssc.awaitTermination()//等待全部执行完毕
如果在Linux服务器运行可以使用$ nc -lk 9999
来开启一个9999端口的小型服务。
然后通过执行$ ./bin/run-example streaming.NetworkWordCount localhost 9999
来运行项目。
要初始化Spark Streaming程序,必须创建StreamingContext对象,该对象是所有Spark Streaming功能的主要入口点。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName参数是您的应用程序在群集UI上显示的名称。master是一个Spark,Mesos,Kubernetes或YARN群集URL,或一个特殊的“ local []”字符串,以本地模式运行。 实际上,当在集群上运行时,您将不希望对程序中的母版进行硬编码,而希望通过spark-submit启动应用程序并在其中接收它。 但是,对于本地测试和单元测试,您可以传递“ local [*]”以在内部运行Spark Streaming(检测本地系统中的内核数)。 请注意,这在内部创建了一个SparkContext(所有Spark功能的起点),可以作为ssc.sparkContext访问。
必须根据应用程序的延迟要求和可用的群集资源来设置批处理间隔。 有关更多详细信息,请参见性能调整部分。
也可以从现有的SparkContext对象创建StreamingContext对象。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
定义上下文后,您必须执行以下操作。
1.通过创建输入DStream定义输入源。
2.通过将转换和输出操作应用于DStream来定义流计算。
3.开始使用streamingContext.start()接收数据并对其进行处理。
4.等待使用streamingContext.awaitTermination()停止处理(手动或由于任何错误)。
5.可以使用streamingContext.stop()手动停止该处理。
需要注意的几点:
1.一旦启动上下文,就无法设置新的流计算或将其添加到该流计算中。
2.上下文一旦停止,就无法重新启动。
3.JVM中只能同时激活一个StreamingContext。
4.StreamingContext上的stop()也会停止SparkContext。 要仅停止StreamingContext,请将名为stopSparkContext的stop()的可选参数设置为false。
5.只要在创建下一个StreamingContext之前停止了上一个StreamingContext(不停止SparkContext),就可以将SparkContext重用于创建多个StreamingContext。
离散流或DStream是Spark Streaming提供的基本抽象。 它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。 在内部,DStream由一系列连续的RDD表示,这是Spark对不可变的分布式数据集的抽象(有关更多详细信息,请参见Spark编程指南)。 DStream中的每个RDD都包含来自特定间隔的数据,如下图所示。
在DStream上执行的任何操作都转换为对基础RDD的操作。 例如,在将行流转换为单词的较早示例中,flatMap操作应用于行DStream中的每个RDD,以生成单词DStream的RDD。 如下图所示。
这些基础的RDD转换由Spark引擎计算。 DStream操作隐藏了大多数这些细节,并为开发人员提供了更高级别的API,以方便使用。 这些操作将在后面的部分中详细讨论。
输入DStream是表示从流源接收的输入数据流的DStream。在快速示例中,行是输入DStream,因为它表示从netcat服务器接收的数据流。每个输入DStream(文件流除外,本节后面将讨论)都与一个Receiver对象(Scala doc,Java doc)相关联,该对象从源接收数据并将其存储在Spark的内存中以进行处理。
Spark Streaming提供了两类内置的流媒体源。
基本来源:直接在StreamingContext API中可用的来源。示例:文件系统和套接字连接。
高级资源:可以通过其他实用程序类获得诸如Kafka,Flume,Kinesis等资源。如链接部分所述,它们需要针对额外的依赖项进行链接。
我们将在本节后面的每个类别中讨论一些资源。
请注意,如果要在流应用程序中并行接收多个数据流,则可以创建多个输入DStream(在“性能调整”部分中进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但是请注意,Spark工作程序/执行程序是一项长期运行的任务,因此它占用了分配给Spark Streaming应用程序的核心之一。因此,重要的是要记住,必须为Spark Streaming应用程序分配足够的内核(或线程,如果在本地运行),以处理接收到的数据以及运行接收器。
要记住的要点
在本地运行Spark Streaming程序时,请勿使用“ local”或“ local [1]”作为主URL。 这两种方式均意味着仅一个线程将用于本地运行任务。 如果您使用的是基于接收方的输入DStream(例如套接字,Kafka,Flume等),则将使用单个线程来运行接收方,而不会留下任何线程来处理接收到的数据。 因此,在本地运行时,请始终使用“ local [n]”作为主URL,其中n>要运行的接收器数(有关如何设置主服务器的信息,请参见Spark属性)。
为了将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的内核数必须大于接收器数。 否则,系统将接收数据,但无法处理它。
我们已经在快速示例中查看了ssc.socketTextStream(...),该示例根据通过TCP套接字连接接收的文本数据创建DStream。 除了套接字外,StreamingContext API还提供了从文件作为输入源创建DStream的方法。
文件流
为了从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件读取数据,可以通过StreamingContext.fileStream [KeyClass,ValueClass,InputFormatClass]创建DStream。
文件流不需要运行接收器,因此无需分配任何内核来接收文件数据。
对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
For text files
streamingContext.textFileStream(dataDirectory)
Spark Streaming将监视目录dataDirectory并处理在该目录中创建的所有文件。
可以监视一个简单目录,例如“ hdfs:// namenode:8040 / logs /”。发现后,将直接处理该路径下的所有文件。
可以提供POSIX全局模式,例如“ hdfs:// namenode:8040 / logs / 2017 / *”。在这里,DStream将由目录中与模式匹配的所有文件组成。也就是说:它是目录的模式,而不是目录中的文件。
所有文件必须具有相同的数据格式。
根据文件的修改时间而不是创建时间,将其视为时间段的一部分。
处理后,在当前窗口中对文件的更改不会导致重新读取该文件。也就是说:更新将被忽略。
目录下的文件越多,扫描更改所需的时间就越长-即使未修改任何文件。
如果使用通配符标识目录,例如“ hdfs:// namenode:8040 / logs / 2016- *”,则重命名整个目录以匹配路径会将目录添加到受监视目录列表中。流中仅包含目录中修改时间在当前窗口内的文件。
调用FileSystem.setTimes()修复时间戳是一种在以后的窗口中拾取文件的方法,即使其内容没有更改。
使用对象存储作为数据源
HDFS之类的“完整”文件系统倾向于在创建输出流后立即对其文件设置修改时间。当打开文件时,甚至在数据完全写入之前,它也可能包含在DStream中-之后,将忽略同一窗口中对该文件的更新。也就是说:更改可能会丢失,流中会省略数据。
为了确保在窗口中能够接收到更改,请将文件写入一个不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。如果重命名的文件在创建窗口期间出现在扫描的目标目录中,则将拾取新数据。
相反,由于实际复制了数据,因此诸如Amazon S3和Azure存储之类的对象存储的重命名操作通常较慢。此外,重命名的对象可能具有named()操作的时间作为其修改时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。
需要对目标对象存储进行仔细的测试,以验证存储的时间戳行为与Spark Streaming期望的一致。直接写入目标目录可能是通过所选对象存储流传输数据的适当策略。
有关此主题的更多详细信息,请查阅Hadoop Filesystem Specification。
与RDD相似,转换允许修改来自输入DStream的数据。 DStream支持普通Spark RDD上可用的许多转换。 一些常见的方法如下。
Transformation | Meaning |
---|---|
map(func) | 通过将源DStream的每个元素传递给函数 func 来返回新的DStream。 |
flatMap(func) | 与map相似,但是每个输入项可以映射到0个或多个输出项。 |
filter(func) | 通过仅选择* func *返回true的源DStream记录来返回新的DStream。 |
repartition(numPartitions) | 通过创建更多或更少的分区来更改此DStream中的并行度。 |
union(otherStream) | 返回一个新的DStream,其中包含源DStream和* otherDStream *中元素的并集。 |
count() | 通过计算源DStream的每个RDD中的元素数,返回一个新的单元素RDD DStream。 |
reduce(func) | 通过使用函数* func *(带有两个参数并返回一个)来聚合源DStream的每个RDD中的元素,从而返回一个单元素RDD的新DStream。 该函数应具有关联性和可交换性,以便可以并行计算。 |
countByValue() | 在类型为K的元素的DStream上调用时,返回一个新的(K,Long)对的DStream,其中每个键的值是其在源DStream的每个RDD中的频率。 |
reduceByKey(func, [numTasks]) | 根据reduce后面的函数进行聚合。 |
join(otherStream, [numTasks]) | 在(K,V)和(K,W)对的两个DStream上调用时,返回一个新的(K,(V,W))对的DStream,其中每个键都有所有元素对。 |
cogroup(otherStream, [numTasks]) | 在(K,V)和(K,W)对的DStream上调用时,返回一个新的(K,Seq [V],Seq [W])元组的DStream。 |
transform(func) | 通过对源DStream的每个RDD应用RDD-to-RDD函数来返回新的DStream。 这可用于在DStream上执行任意RDD操作。 |
updateStateByKey(func) | 返回一个新的“状态” DStream,在该DStream中,通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态。 这可用于维护每个键的任意状态数据。 |
updateStateByKey操作使您可以保持任意状态,同时用新信息连续更新它。 要使用此功能,您将必须执行两个步骤。
定义状态-状态可以是任意数据类型。
定义状态更新功能-使用功能指定如何使用输入流中的先前状态和新值来更新状态。
在每个批次中,Spark都会对所有现有密钥应用状态更新功能,无论它们是否在批次中具有新数据。 如果更新函数返回None,则将删除键值对。
让我们用一个例子来说明。 假设您要保持在文本数据流中看到的每个单词的连续计数。 此处,运行计数是状态,它是整数。 我们将更新函数定义为:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
将为每个单词调用update函数,其中newValues的序列为1(从(word,1)对开始),而runningCount具有先前的计数。
请注意,使用updateStateByKey要求配置检查点目录,这将在检查点部分中详细讨论。
转换操作
可以很方便的将一个 RDD流转换为另一个RDD流。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 包含垃圾信息的RDD
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // 将数据流与垃圾信息一起加入进行清理
...
}
PS:每个批处理都会调用方法,这样保证了每次有新的数据都会执行。
Window Operations(Window计算)
下面演示下每10秒减少一次30秒的数据的操作。
// 每10秒减少最后30秒的数据
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些window操作定义如下,这些都依赖操作windowLength和slideInternal实现的。
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | window处理DStream后,返回新的DStream。 |
countByWindow(windowLength, slideInterval) | 返回window计数。 |
reduceByWindow(func, windowLength, slideInterval) | 返回slide时间内聚合的数据。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 返回通过func聚合的K,V数据。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 这个和上面的区别是多了一个invFunc参数,这个参数的意思是处理输出值用的。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 作用于(K,V)上,返回(K,Long)数据。 |
流合并
Scala实现:
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
Java实现:
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
这样就把两个流合并在一起了。
Scala实现:
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Java实现:
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
流数据集连接
Scala实现:
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
Java实现:
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
输出操作可以将DStream输出到外部系统,比如HDFS,数据库。
Output Operation | Meaning |
---|---|
print() | 打印前10个数据。 |
saveAsTextFiles(prefix, [suffix]) | 保存为text文件。 |
saveAsObjectFiles(prefix, [suffix]) | 保存为object文件。 |
saveAsHadoopFiles(prefix, [suffix]) | 保存为hadoop文件。 |
foreachRDD(func) | 将func应用于循环RDD。 |
dstream.foreachRDD是一个功能强大的原语,可以将数据发送到外部系统。使用过程中应该创建一个连接对象,负责连接外部系统。
Scala实现:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool是静态的,延迟初始化连接池。
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) //返回池中以备调用
}
}
Java实现:
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool是静态的,延迟初始化连接池。
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // 返回池中以备调用
});
});
Scala演示:
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 获取SparkSession单例实例
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// 将rdd转换为DataFrame
val wordsDataFrame = rdd.toDF("word")
// 创建一个临时表
wordsDataFrame.createOrReplaceTempView("words")
// 使用sql语句进行统计
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
DStream操作默认使用缓存,无序开发人员手动设置。
检查点的作用是,在服务器节点发生故障时,可以根据检查点进行恢复。
以下是一个设置检查点的Demo:
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // 创建context
val lines = ssc.socketTextStream(...) // 创建DStream
...
ssc.checkpoint(checkpointDirectory) // 设置检查点
ssc
}
// 从检查点创建一个新的StreamingContext
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
//根据需要进行其他设置,
// 不管是启动还是重新启动
context. ...
// 启动上下文
context.start()
context.awaitTermination()
如果检查点恢复累加器和广播变量,那么我们要做的是延迟创建累加器和广播变量。如下:
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// 获取注册广播变量
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// 获取注册累加器
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// 删除黑名单列表包含的单词,并使用droppedWordsCounter对其进行计数。
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
将项目打包成jar来部署。
要确保有足够的内存村中间结果。
Spark不会对读写日志进行加密,请手动加密。
更新应用有两种方式:一种直接上传服务,然后新服务旧服务一块跑,等待黄金时机把旧服务去掉。 第二种,是直接把旧服务去掉,换成新服务。
监控主要有两点,一种是批处理时间,一种是批处理等待时间。
详细的调优指南参考页面,这里只列出最重要的几个调优。
通过将单个流转为多个流提高并行度。例如:
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
通过设置spark.default.parallelism
来设置并行度,以充分利用系统性能。
有时候在流数据不大的情况下,可以禁用序列化,以降低序列化和反序列化带来的CPU和时间消耗。
在观察批处理间隔和批处理时间是否稳定,如果批处理时间很长,则需要优化,如降低批处理量。
在本节中,我们将讨论发生故障时Spark Streaming应用程序的行为。
RDD自己是容错的,Spark 1.2 及以上也实现了容错机制,实现零数据丢失。
标签:标识 密码 进制 页面 高可用 encoder global SOS 实现
原文地址:https://www.cnblogs.com/shun7man/p/12736702.html