标签:系统 pack 一起 left join bootstra shuffle 目录 pac output
先读一份static 数据:
val static = spark.read.json("s3://tang-spark/data/activity-data/")
static.printSchema
root
|-- Arrival_Time: long (nullable = true)
|-- Creation_Time: long (nullable = true)
|-- Device: string (nullable = true)
|-- Index: long (nullable = true)
|-- Model: string (nullable = true)
|-- User: string (nullable = true)
|-- gt: string (nullable = true)
|-- x: double (nullable = true)
|-- y: double (nullable = true)
|-- z: double (nullable = true)
看一下部分内容:
下面我们使用同样的数据集创建一个streaming 版本,在流的情况下,它会一个接一个地读取每个输入文件。
Streaming DataFrames与static DataFrame基本一致,基本上,所有static DF中的transformation都可以应用在Streaming DF中。不过,一个小小的区别是:Structured Streaming并不允许我们执行schema 推断(inference),除非显式地启用此功能。可以通过设置spark.sql.streaming.schemaInference 为 true来开启schema inference。在这个例子中,我们会读取一个文件的schema(当然我们已知了这些文件具有有效的schema),并从static DF中pass一个dataSchema对象到streaming DF中。不过在实际中一般要避免这么做,因为数据可能是会改变的。
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).json("s3://tang-spark/data/activity-data")
(这里我们指定了maxFilesPerTrigger为1,仅是为了测试方便,让stream一次读一个文件,但是一般在生产环境中不会配置这么低。)
与其他Spark API 一样,streaming DataFrame 的创建以及执行是lazy的。现在我们可以为streaming DF指定一个transformation,最后调用一个action开始执行这个流。在这个例子中,我们会展示一个简单的transformation —— 根据字段gt,进行group 并 count:
val activityCounts = streaming.groupBy("gt").count()
若是使用的单机模式,也可以设置一个更小的spark.sql.shuffle.partitions 值(默认为200,Configures the number of partitions to use when shuffling data for joins or aggregations.),以避免创建过多的shuffle partitions:
spark.conf.set("spark.sql.shuffle.partitions", 5)
现在我们已经设置好了transformation,下一步仅需要指定一个action就可以开始执行query了。根据上文所述,我们还需要指定一个输出的destination,或是针对这个query结果的output sink。对于这个简单的例子,我们会使用memory sink,它会维护一个在内存中的一个table of the results。
在指定sink时,我们还需要定义Spark如何输出它的数据,在这个例子中,我们会使用complete 的输出模式。这个模式会在每次trigger后,重写所有的keys以及它们的counts值:
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
在我们运行上面的代码前,我们也要加上这条代码:
activityQuery
.
awaitTermination
()
在这条代码开始运行后,streaming 计算将会在后台运行。而这个query object即是active streaming query 对应的handle,而我们必须使用activityQuery.awaitTermination() 等待query的结束,以防止driver进程在query仍是active时退出。在生产环境中必须要加上这条语句,否则stream无法执行。
开始执行:
[Stage 8:==========================> (94 + 4) / 200]
可以看到有 200 个task,因为使用的是默认的 spark.sql.shuffle.partitions 值。
可以使用 spark.streams.active 列出active的stream:
> spark.streams.active
res24: Array[org.apache.spark.sql.streaming.StreamingQuery] = Array(org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@612c6083)
它也会为每个stream分配一个UUID,所以如果有必要的话,可以迭代这个list,并选出上面的那个流。不过在这个例子中,我们已经将这个流assign给一个变量了,所以没有必要。
现在这个流正在运行,我们可以通过query这个in-memory table来检查结果,这个表维护了当前streaming aggregation的输出。这个table将被称为 activity_counts,与流的名称一样。在检查当前output table 的输出时,仅需要query此表即可。我们会写一个循环,每秒打出这个streaming query 的结果:
for (i <- 1 to 5){
spark.sql("select * from activity_counts").show()
Thread.sleep(1000)
}
结果类似于:
整体语句为:
val static = spark.read.json("s3://tang-spark/data/activity-data/")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).json("s3://tang-spark/data/activity-data")
val activityCounts = streaming.groupBy("gt").count()
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
for (i <- 1 to 5){
spark.sql("select * from activity_counts").show()
Thread.sleep(1000)
}
Streaming中的transformations 基本涵盖了static DF 中的 transformations,例如select、filter等简单的transformations。不过仍是有些限制,例如在Spark 2.2 版本中,用户无法sort一个没有aggregated的stream,并且在没有使用Stateful Processing时,无法执行多层的aggregation,等等。这些限制可能会随着版本更替有所解决,对此最好参考Spark的官方文档,查看更新情况。
在Structured Streaming中,支持所有的select 与 filter。下面是一个简单的例子,由于我们并不会随着时间更新key,所以会使用Append output mode,将新数据append到输出表中:
import org.apache.spark.sql.functions.expr
val simpleTransform = streaming.withColumn("stairs", expr("gt like ‘%stairs%‘"))
.where("stairs")
.where("gt is not null")
.select("gt", "model", "arrival_time", "creation_time")
.writeStream
.queryName("simple_transform")
.format("memory")
.outputMode("append")
.start()
下面查询一下,先可以看看有哪些表:
然后查询 simple_transform 表:
spark.sql("select * from simple_transform").show()
Structured Streaming 对 aggregation操作的支持非常好。我们可以指定任意聚合操作。例如,我们可以使用一个比较有特点的聚合(例如Cube),在sensor的phone model 、activity、以及average x,y,z acceleration 字段上聚合Cube:
val deviceModelStats = streaming.cube("gt", "model").avg()
.drop("avg(Arrival_time)")
.drop("avg(Creation_Time)")
.drop("avg(Index)")
.writeStream.queryName("device_counts").format("memory").outputMode("complete")
.start()
spark.sql("select * from device_counts").show()
相信大家已经发现了,这里queryName(“device_counts”) 也就是我们最后生成的表名。
在Spark 2.2 版本以后,Structured Streaming支持streaming DataFrames 与 static DataFrames进行join。Spark 2.3 会增加多个streams 的join 功能。
val historicalAgg = static.groupBy("gt", "model").avg()
val deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")
.cube("gt", "model").avg()
.join(historicalAgg, Seq("gt", "model"))
.writeStream.queryName("device_counts_join").format("memory").outputMode("complete")
.start()
spark.sql("select * from device_counts_join").show()
在Spark 2.2 中,full outer join、left join(stream在右)、right join(stream在左)暂不支持。Structured Streaming 也还没有支持stream-to-stream 的join,不过这个也是一个正在开发中的功能。
这节我们会深入讨论,在Structured Streaming 中,sources、sinks以及output modes 是如何工作的。特别地,我们会讨论数据流是怎样、何时、从哪,进入以及流出系统。
下面讨论的source 与 sink 都是端到端相同的情况一起讨论的,当然端到端也可以是不一样的(例如source 是kafka,sink 是file)
Structured Streaming 支持多个production sources 与 sinks(files 以及Kafka),以及一些debug 工具(例如memory table sink)。我们之前已经介绍过这些,现在进一步讨论。
可能能想到的最简单的source就是file source,基本上我们能想到的所有文件类型都适用,如Parquet、text、JSON、CSV等。
在Spark static file source 与Structured Streaming file source 中,它们唯一的区别就是:在streaming 中,我们可以控制每次trigger中读入的文件数,通过maxFilesPerTriger选项即可。
需要注意的是,任何添加到Streaming input 目录中的文件,都必须是原子的。否则,在你写入完成前,Spark会partially 处理文件。在如本地文件系统或是HDFS这种允许partial write 文件系统中,最好是在外部文件夹写完文件后,再移动到 input 目录中。在Amazon S3 中,对象一般仅在fully writes 后才可被访问。
Kafka 是一个分布式的publish-and-subscribe 系统,它可以像一个消息队列一样,让我们可以publish records 到流中。可以将Kafka 认作为一个分布式的buffer,它让我们在某个类别中(topic)存储流中的records。
读Kafka Source
在读kafka 时,需要选择一种选项:assign,subscribe,或是subscribePattern。在读Kafka时,只能选择其中一种选项。
然后,我们还需要指定kafka.bootstrap.servers,除此之外,还有其他几个options需要指定:
当然,还有其他配置选项,如Kafka consumer timeouts,fetch retries,以及 intervals 等。
下面是一个例子:
// subscribe to 1 topic
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
.option("subscribe", "topic1")
.load()
// subscribe to multiple topics
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
.option("subscribe", "topic1, topic2")
.load()
// subscribe to a pattern of topics
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
.option("subscribePattern", "topic.*")
.load()
这里若是弹出以下报错:
org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
则根据Spark 官网添加jar 包:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5
或:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 ...
在Kafka Source 中的每条row都会有以下schema:
在Kafka中的每条message都会以某种方式序列化。可以使用Spark原生的方法或是UDF将message 处理为一个更具结构化的形式,常见的模式有JSON或是Avro,去读写Kafka。
写Kafka Sink
写 Kafka Sink与读Kafka Sink 类似,仅有几个参数不一样。我们仍需要指定Kafka bootstrap servers,而其他需要提供的选项是topic以及列名,有两种方式提供,如下所示,下面两个写法是等价的:
// specify topic and columns together
ds1.selectExpr("topic", "CAST(key as STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("checkpointLocation", "/kafka/checkpoint/dir")
.option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
.start()
// specify column and topic seperately
ds1.selectExpr("CAST(key as STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("checkpointLocation", "/kafka/checkpoint/dir")
.option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
.option("topic", "topic1")
.start()
foreach sink 类似于Dataset API 中的 foreachPartitions。这个操作允许operations可以在per-partition 的基础下进行并行地计算。在使用foreach sink时,必须实现ForeachWriter 接口,可以在Scala/Java 文档中查看,它包含3个方法:open、process 和 close。在trigger触发了一组生成的rows后,会执行相对应的这三个方法。
总的来说,这个是一个自定义的方法,可以实现自己想要的业务逻辑。
这些主要用于做测试,一般不会在生产环境中使用,因为它们无法提供端到端的fault tolerance。
Socket source
通过TCP sockets发送的数据,指定host和port 来读数据,如:
val socketDF = spark.readStream.format("socket")
.option("host", "localhost").option("port", 9999).load()
在客户端可以执行:
nc -lk 9999
Console sink
可以将输出吸入到console,支持append和complete的输出模式:
activityCounts.format("console").write()
Memory sink
类似于console sink,区别是会将数据collect到driver中,然后以in-memory table的形式提供交互式query:
activityCounts.writeStream.format("memory").queryName("my_device_table")
在Structured Streaming 中,output modes的概念与static DF 中的一样。
Append mode 是默认的行为,也是最简单最容易理解的。在一个新row添加到result table时,它们会基于指定的trigger,输出到sink。在fault-tolerant sink的条件下,这个模式可以确保每行row仅输出一次(并且仅此一次)。当我们将append mode 与event-time 和watermarks 结合使用时,仅有最终的结果会输出到sink。
Complete mode 会输出result table 的整个state到配置的output sink 中。在一些stateful 数据的场景下,例如所有的rows都会随时间改变,或是配置的sink 并不支持行级的updates 时,complete mode 非常有用。如果query 并不包含aggregation,则这个等同于 append mode
Update mode 类似于 complete mode,区别是:仅有与上一次不同的rows 会被写入到sink 中。不过,sink必须支持行级的 updates,才能支持这个模式。如果query 不包含任何 aggregation,则这个模式等同于 append mode。
Structured Streaming 对于每个output mode,都会结合query的信息进行限制。例如,假设我们的query仅是做一个map操作,那么Structured Streaming不会允许complete mode,因为这样会需要记录所有的输入records,并重写整个output table。下面是一些小手册,关于什么时候使用各个output mode。
在控制什么时候将数据输出到sink时,我们会设置一个trigger。默认情况下,Structured Streaming会在上一个trigger完成处理后,启动数据。我们可以使用triggers来确保不会有太多的updates压垮output sink,或是控制output中的文件大小。当前,仅有一种定期触发的trigger类型,基于的是processing time;以及一次性的trigger,用于手动一次执行一个processing step。在未来会加入更多的triggers。
对于processing time trigger,我们仅需要指定一个string 类型的duration时间即可(或是在Scalat中的Duration 类型、Java中的TimeUnit类型),例如:
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream
.trigger(Trigger.ProcessingTime("100 seconds"))
.format("memory")
.outputMode("complete")
.start()
ProcessingTime trigger 会等到给定duration的时间再输出数据。例如,假设给定的duration是1分钟,则trigger会在12:00,12:01,12:02 触发,依次类推。如果由于上一个处理没完成,导致了一个trigger time miss,则Spark会等到下一个trigger 点(也就是下一分钟),而不是在上一个处理完成后立即终止。
我们也可以仅让一个streaming job 执行一次,指定once trigger即可。这个听起来有点奇怪,不过在开发与生产中,这个trigger都非常有用。对于开发,可以用于测试。而对于生产,Once trigger可以让我们以一个非常低频的方式执行任务(例如,向数据源导入数据是一个不确定的时间周期)。由于Structured Streaming 仍可以追溯所有已经处理过的文件,以及计算后的state,它比自身再写一个追溯batch job 的处理要简单地多,并且可以节省较多的资源(因为流程序定期运行的话,会一直消耗计算资源)。使用方式如下:
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.Once())
.format("console")
.outputMode("complete")
.start()
最后一点需要提到的是,在Structured Streaming 中,不仅只限于对stream使用DataFrame API。还可以使用Datasets执行同样的计算,但是用的是type-safe 的方式。我们可以将一个streaming DataFrame转换为一个Dataset。
Spark Structured Streaming(二)实战
标签:系统 pack 一起 left join bootstra shuffle 目录 pac output
原文地址:https://www.cnblogs.com/zackstang/p/13073327.html