标签:where mil select hdf await als owa write unit
What is the full routine of Structured Streaming?
Let’s look at the code (the example is from the Spark source code and I made some edits):
val spark = SparkSession .builder .
master("local[2]") .
appName("StructuredNetworkWordCount").
getOrCreate()
val schemaExp = StructType(
StructField("name", StringType, false) ::
StructField("city", StringType, true)
:: Nil
)
//Standard DataSource API, only the read is changed to readStream.
val words = spark.readStream.format("json").schema(schemaExp)
.load("file:///tmp/dir")
//Some APIs of DataFrame.
val wordCounts = words.groupBy("name").count()
//Standard DataSource writing API, only the write is changed to writeStream.
val query = wordCounts.writeStream
//complete,append,update。Currently,
//only the first two types are supported.
.outputMode("complete")
//The console, parquet, memory, and foreach types
.format("console")
.trigger(ProcessingTime(5.seconds))//Here is where the timer is set.
.start()
query.awaitTermination()
This is the complete routine of Structured Streaming.
Structured Streaming currently only supports File and Socket sources. It can output four types, as mentioned above. The foreach can be infinitely expanded. For example:
val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds))
.outputMode("complete")
.foreach(new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}"))
fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp"))
true
}
}).start()
标签:where mil select hdf await als owa write unit
原文地址:http://www.cnblogs.com/loveItLoveFaimly/p/7612069.html