码迷,mamicode.com
首页 > 其他好文 > 详细

Structured Streaming

时间:2017-09-29 19:51:48      阅读:229      评论:0      收藏:0      [点我收藏+]

标签:where   mil   select   hdf   await   als   owa   write   unit   

  1. import org.apache.spark.sql.types._
  2.  
  3.  
  4. val pathA = "hdfs:/tpc-ds/data/store_sales"
  5. val pathB = "hdfs:/tpc-ds/data/store/"
  6.  
  7. // For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)
  8.  
  9. val A_df = sqlContext.read.format("com.databricks.spark.csv")
  10. .option("header","false")
  11. .option("inferSchema","false")
  12. .option("delimiter","|")
  13. .load(pathA)
  14.  
  15.  
  16. // Assign column names to the Store Sales dataframe
  17. val storeSalesDF = A_df.select(
  18. A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
  19. A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
  20. A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
  21. A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
  22. )
  23.  
  24.  
  25. val B_df = sqlContext.read.format("com.databricks.spark.csv")
  26. .option("header","false")
  27. .option("inferSchema","false")
  28. .option("delimiter","|")
  29. .load(pathB)
  30.  
  31.  
  32. // Assign column names to the Region dataframe
  33. val storeDF = B_df.select(
  34. B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
  35. B_df("_c1").cast(StringType).as("S_STORE_ID")
  36. B_df("_c5").cast(StringType).as("S_STORE_NAME")
  37. )
  38.  
  39. val joinedDF = storeSalesDF.join(storeDF,
  40. storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
  41. )
  42.  
  43. joinedDF.take(5)

 

 

 

What is the full routine of Structured Streaming? 
Let’s look at the code (the example is from the Spark source code and I made some edits): 
val spark = SparkSession  .builder  . 
master("local[2]")  . 
appName("StructuredNetworkWordCount"). 
getOrCreate() 


 val schemaExp = StructType( 
      StructField("name", StringType, false) :: 
        StructField("city", StringType, true) 
        :: Nil 
    ) 

//Standard DataSource API, only the read is changed to readStream. 
   val words = spark.readStream.format("json").schema(schemaExp) 
      .load("file:///tmp/dir") 

   //Some APIs of DataFrame. 
    val wordCounts = words.groupBy("name").count() 

    //Standard DataSource writing API, only the write is changed to writeStream. 
    val query = wordCounts.writeStream 
//complete,append,update。Currently, 
//only the first two types are supported. 
      .outputMode("complete") 
//The console, parquet, memory, and foreach types 
      .format("console") 
      .trigger(ProcessingTime(5.seconds))//Here is where the timer is set. 
      .start() 

    query.awaitTermination() 

This is the complete routine of Structured Streaming. 
Structured Streaming currently only supports File and Socket sources. It can output four types, as mentioned above. The foreach can be infinitely expanded. For example: 
val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds)) 
      .outputMode("complete") 
      .foreach(new ForeachWriter[Row] { 

      var fileWriter: FileWriter = _ 

      override def process(value: Row): Unit = { 
        fileWriter.append(value.toSeq.mkString(",")) 
      } 

      override def close(errorOrNull: Throwable): Unit = { 
        fileWriter.close() 
      } 

      override def open(partitionId: Long, version: Long): Boolean = { 
        FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}")) 
        fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp")) 
        true 
      } 
    }).start() 

Structured Streaming

标签:where   mil   select   hdf   await   als   owa   write   unit   

原文地址:http://www.cnblogs.com/loveItLoveFaimly/p/7612069.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!