Spark Structured Streaming

Structured Streaming 是 Spark 2.0 引入的功能,有以下特点

  • 基于 Spark SQL engine
  • 可以直接使用 DataSet/DataFrame API,就像处理离线的批数据一样
  • Spark SQL engine 持续地、增量地处理流数据
  • 支持 streaming aggregations, event-time windows, stream-to-batch joins, 等等
  • 通过 checkpoint 和 WAL (Write Ahead Logs)提供端到端的精确一致语义(end-to-end exactly once)的错误恢复机制
  • 高效性(Spark SQL engine 会做优化)和可扩展性

Structured Streaming 可以大大简化代码编写


从 localhost:9999 不断接受数据并统计词量

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession     .builder     .appName("StructuredNetworkWordCount")     .getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark     .readStream     .format("socket")     .option("host", "localhost")     .option("port", 9999)     .load()

# Split the lines into words, name the new column as "word"
words = lines.select(
       split(lines.value, " ")

# Generate running word count
wordCounts = words.groupBy("word").count()

# Start running the query that prints the running counts to the console
query = wordCounts     .writeStream     .outputMode("complete")     .format("console")     .start()


先启动 netcat server

nc -lk 9999


spark-submit --master local ./spark_test.py

在 netcat 依次输入下面内容

hello world
apache spark
hello spark

spark 程序输出

Batch: 0
| word|count|
|hello|    1|
|world|    1|

Batch: 1
|  word|count|
| hello|    1|
|apache|    1|
| spark|    1|
| world|    1|

Batch: 2
|  word|count|
| hello|    2|
|apache|    1|
| spark|    2|
| world|    1|

spark 还有不断打出 streaming 信息

20/05/25 23:50:26 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "606f3e4f-72a7-4c9a-b87e-be9f34320e47",
  "runId" : "26ce998c-3f59-475b-95a0-2022f6a7ccc2",
  "name" : null,
  "timestamp" : "2020-05-25T15:50:10.789Z",
  "numInputRows" : 1,
  "inputRowsPerSecond" : 100.0,
  "processedRowsPerSecond" : 0.06258997308631158,
  "durationMs" : {
    "addBatch" : 15628,
    "getBatch" : 263,
    "getOffset" : 0,
    "queryPlanning" : 31,
    "triggerExecution" : 15977,
    "walCommit" : 39
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 2
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 2,
    "numInputRows" : 1,
    "inputRowsPerSecond" : 100.0,
    "processedRowsPerSecond" : 0.06258997308631158
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@224c4764"

可以看到虽然是分批处理,但还是每次都输出了总的统计结果,这是因为指定了 outputMode("complete")

Output Mode

  • Append Mode(默认模式):只对增量数据计算,并将增量计算的结果输出,这种模式意味着已有的结果不会被更改,只会添加新的结果,这也要求允许输出相同的记录而不会冲突,在前面的例子中,按 Append Mode 考虑的话,第三个 Batch 的输出应该是
Batch: 2
|  word|count|
| hello|    1|
| spark|    1|
  • Complete Mode:将增量计算的结果和已有的结果合并,相当于将流数据开始的时间,到当前时间之间的完整数据计算并输出结果,在前面的例子中,按 Complete Mode 考虑的话,第三个 Batch 的输出应该是
Batch: 2
|  word|count|
| hello|    2|
|apache|    1|
| spark|    2|
| world|    1|
  • Update Mode:和 Complete Mode 差不多,但只输出有更新的记录,在前面的例子中,按 Update Mode 考虑的话,第三个 Batch 的输出应该是
Batch: 2
|  word|count|
| hello|    2|
| spark|    2|


这个过程中 Spark 只维持中间结果,对于增量数据,处理完就会丢弃


Structured Streaming 允许基于数据自身携带的时间信息,通过窗口,进行各种聚合运算

# 假设 words 有两个字段,timestamp 和 word
# 基于 timestamp 使用窗口统计词量
# 窗口大小 10 min,滑动距离是 5 min
# 即在窗口 0~10,5~15,10~20,15~25 内统计词量
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),


from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

windowDuration = ‘10 seconds‘
slideDuration = ‘5 seconds‘

spark = SparkSession	.builder	.appName("StructuredNetworkWordCountWindowed")	.getOrCreate()

lines = spark	.readStream	.format(‘socket‘)	.option(‘host‘, host)	.option(‘port‘, port)	.option(‘includeTimestamp‘, ‘true‘)	.load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
	explode(split(lines.value, ‘ ‘)).alias(‘word‘),

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
	window(words.timestamp, windowDuration, slideDuration),

# Start running the query that prints the windowed word counts to the console
query = windowedCounts	.writeStream	.outputMode(‘complete‘)	.format(‘console‘)	.option(‘truncate‘, ‘false‘)	.start()



延迟数据和 Watermarking

基于窗口的运算中,可能会出现延迟数据,即某个窗口已经计算结束后,依然有属于该窗口的数据到来,Spark 通过 Watermarking (水印)指定最多可容忍多久的延迟

# Group the data by window and word and compute the count of each group
windowedCounts = words     .withWatermark("timestamp", "10 minutes")     .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word)     .count()

watermarking = max_event_time_received - threshold

watermarking > windown_end_time

假设窗口大小 10 分钟,滑动时间 5 分钟,在当前收到的所有数据中,最新的时间是 31 分,而 threshold 是 10 分,那么当前的 watermarking 就是 31 - 10 = 21,如果此时有一条 14 分的数据到来,这条数据会被丢弃,因为 14 分的数据属于 (5,15)和(10,20)这两个窗口,而这两个窗口的结束时间 15 和 20 都要小于 21

在 Update Mode 中,能被 watermarking 所允许的延迟数据会用于更新已有窗口
在 Append Mode 中,则是等到 watermarking 大于窗口结束时间,才真正计算输出该窗口


End-to-End exactly-once 语义的错误恢复机制

只适用于部分 Source 和 Sink

Source 要能支持记录上次读取的位置,即 offset
Sink 要支持幂等性,即同样的操作执行多次不会影响结果

然后 Spark 通过使用 checkpointing 和 write-ahead logs 记录 offset 和中间状态



aggDF     .writeStream     .outputMode("complete")     .option("checkpointLocation", "path/to/HDFS/dir")     .format("memory")     .start()

This checkpoint location has to be a path in an HDFS compatible file system

There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location

Build-in Source

  • File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet.
  • Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
  • Socket source (for testing) - Reads UTF8 text data from a socket connection.
spark = SparkSession        .builder        .appName("Test")        .getOrCreate()

# Read text from socket
socketDF = spark     .readStream     .format("socket")     .option("host", "localhost")     .option("port", 9999)     .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources


# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark     .readStream     .option("sep", ";")     .schema(userSchema)     .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")

# Create DataSet representing the stream of input lines from kafka
lines = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", "localhost:9092")    .option("subscribe", "topicA,topicB")    .load()    .selectExpr("CAST(value AS STRING)")

# Split the lines into words
words = lines.select(
    # explode turns each item in an array into a separate row
        split(lines.value, ‘ ‘)

还有个 Rate Source(for testing) 不清楚是干啥


# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type

spark.sql("select count(*) from updates")  # returns another streaming DF


Join 操作

流数据和批数据的 Join

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  # right outer join with a static DF

流数据和流数据的 Join

from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour

    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"


通过设置 queryName 可以使用 SQL

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF     .writeStream     .queryName("aggregates")     .outputMode("complete")     .format("memory")     .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table

queryName 指定的名字就是表名,可以使用 Spark SQL 查询


streamingDf = spark.readStream. ...

# Without watermark using guid column

# With watermark using guid and eventTime columns
streamingDf   .withWatermark("eventTime", "10 seconds")   .dropDuplicates("guid", "eventTime")


Arbitrary (任意的)Stateful Operations


Unsupported Operations


Output Sinks

  • File sink - Stores the output to a directory(只支持 append)
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
  • Kafka sink - Stores the output to one or more topics in Kafka
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
  • Foreach sink - Runs arbitrary computation on the records in the output.
  • Console sink (for debugging) - Prints the output to the console/stdout
  • Memory sink (for debugging) - The output is stored in memory as an in-memory table(不支持 update)

每种 sink 能支持的 output mode 和 fault tolerant 不一样,具体参考官网

通过 Foreach 和 ForeachBatch 执行任意操作

def foreach_batch_function(micro_batch_df, micro_batch_unique_id):
    # Transform and write batchDF
def process_row(row):
    # Write row to storage

query = streamingDF.writeStream.foreach(process_row).start()  
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.

    def close(self, error):
        # Close the connection. This method in optional in Python.
query = streamingDF.writeStream.foreach(ForeachWriter()).start()

Foreach 和 ForeachBatch 只保证 at-least-once 机制

Trigger(什么时候触发 micro-batch 处理)

# Default trigger (runs micro-batch as soon as it can)
# 处理完上一个 micro-batch 后,立刻将所有新增数据作为下一个 micro-batch 处理
df.writeStream   .format("console")   .start()

# ProcessingTime trigger with two-seconds micro-batch interval
# 在 Default 模式基础上
#     1. 如果上一个 micro-batch 在 2 秒内处理完,那会等够 2 秒才触发新的 micro-batch 
#     2. 如果上一个 micro-batch 处理时间超过 2 秒,会立刻触发新的 micro-batch 
#     3. 如果没有新数据,那么即使 2 秒的时间到了,也不会触发新的运算
df.writeStream   .format("console")   .trigger(processingTime=‘2 seconds‘)   .start()

# One-time trigger
# 处理一个 micro-batch 后就停止
df.writeStream   .format("console")   .trigger(once=True)   .start()

# Continuous trigger with one-second checkpointing interval
  .trigger(continuous=‘1 second‘)

Continuous 模式是实验性的

Streaming Queries Object

query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress()  # an array of the most recent progress updates for this query

query.lastProgress()    # the most recent progress update of this streaming query


Monitoring Streaming Queries

query = ...  # a StreamingQuery

Will print something like the following.

{u‘stateOperators‘: [], u‘eventTime‘: {u‘watermark‘: u‘2016-12-14T18:45:24.873Z‘}, u‘name‘: u‘MyQuery‘, u‘timestamp‘: u‘2016-12-14T18:45:24.873Z‘, u‘processedRowsPerSecond‘: 200.0, u‘inputRowsPerSecond‘: 120.0, u‘numInputRows‘: 10, u‘sources‘: [{u‘description‘: u‘KafkaSource[Subscribe[topic-0]]‘, u‘endOffset‘: {u‘topic-0‘: {u‘1‘: 134, u‘0‘: 534, u‘3‘: 21, u‘2‘: 0, u‘4‘: 115}}, u‘processedRowsPerSecond‘: 200.0, u‘inputRowsPerSecond‘: 120.0, u‘numInputRows‘: 10, u‘startOffset‘: {u‘topic-0‘: {u‘1‘: 1, u‘0‘: 1, u‘3‘: 1, u‘2‘: 0, u‘4‘: 1}}}], u‘durationMs‘: {u‘getOffset‘: 2, u‘triggerExecution‘: 3}, u‘runId‘: u‘88e2ff94-ede0-45a8-b687-6316fbef529a‘, u‘id‘: u‘ce011fdc-8762-4dcb-84eb-a77333e28109‘, u‘sink‘: {u‘description‘: u‘MemorySink‘}}

Will print something like the following.

{u‘message‘: u‘Waiting for data to arrive‘, u‘isTriggerActive‘: False, u‘isDataAvailable‘: False}
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
# 将 metrics 发到配置的 sink
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")


