标签:appname oca 种类 控制台 als host log wordcount check
上篇了解了一些基本的Structured Streaming的概念,知道了Structured Streaming其实是一个无下界的无限递增的DataFrame。基于这个DataFrame,我们可以做一些基本的select、map、filter操作,也可以做一些复杂的join和统计。本篇就着重介绍下,Structured Streaming支持的输入输出,看看都提供了哪些方便的操作。
Structured Streaming 提供了几种数据源的类型,可以方便的构造Steaming的DataFrame。默认提供下面几种类型:
file数据源提供了很多种内置的格式,如csv、parquet、orc、json等等,就以csv为例:
package xingoo.sstreaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
object FileInputStructuredStreamingTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local")
.appName("StructuredNetworkWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val userSchema = new StructType().add("name", "string").add("age", "integer")
val lines = spark.readStream
.option("sep", ";")
.schema(userSchema)
.csv("file:///Users/xingoo/IdeaProjects/spark-in-action/data/*")
val query = lines.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
这样,在对应的目录下新建文件时,就可以在控制台看到对应的数据了。
aaa;1
bbb;2
aaa;5
ddd;6
还有一些其他可以控制的参数:
在我们自己练习的时候,一般都是基于这个socket来做测试。首先开启一个socket服务器,nc -lk 9999
,然后streaming这边连接进行处理。
spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
这个是生产环境或者项目应用最多的数据源,通常架构都是:
应用数据输入-->kafka-->spark streaming -->其他的数据库
由于kafka涉及的内容还比较多,因此下一篇专门介绍kafka的集成。
在配置完输入,并针对DataFrame或者DataSet做了一些操作后,想要把结果保存起来。就可以使用DataSet.writeStream()方法,配置输出需要配置下面的内容:
详细的来看看这个输出模式的配置,它与普通的Spark的输出不同,只有三种类型:
Structed Streaming提供了几种输出的类型:
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
noAggDF
.writeStream
.format("console")
.start()
aggDF
.writeStream
.queryName("aggregates")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
writeStream
.foreach(...)
.start()
这个foreach的功能很强大,稍后也会详细的说明。
Structured Streaming教程(2) —— 常用输入与输出
标签:appname oca 种类 控制台 als host log wordcount check
原文地址:https://www.cnblogs.com/xing901022/p/9135257.html