码迷,mamicode.com
首页 > 其他好文 > 详细

SparkStreaming 的编程模型

时间:2017-09-08 00:22:53      阅读:135      评论:0      收藏:0      [点我收藏+]

标签:design   throw   any   输入   oid   ges   throwable   int   signed   

技术分享

 

 

依赖管理

技术分享

 

 

 

基本套路

技术分享

 

 

Dstream输入源 ---input DStream

技术分享

 

 

Dstream输入源--- Receiver

技术分享

 

 

内置的input Dstream : Basic Source

技术分享

 

 

内置的input Dstream :Advanced Sources

技术分享

 

 

 

Dstream 输入源: multiple input DStream

 

技术分享

 

Dstream 输入源: Custom Receiver

技术分享

官方参考网站 http://spark.apache.org/docs/1.6.2/streaming-custom-receivers.html

scala 参考模版

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
     // Connect to host:port
     socket = new Socket(host, port)

     // Until stopped or connection broken continue reading
     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()

     // Restart in an attempt to connect again when server is active again
     restart("Trying to connect again")
    } catch {
     case e: java.net.ConnectException =>
       // restart if could not connect to server
       restart("Error connecting to " + host + ":" + port, e)
     case t: Throwable =>
       // restart if there is any other error
       restart("Error receiving data", t)
    }
  }
}

 

 

java 参考模版

public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
      @Override public void run() {
        receive();
      }
    }.start();
  }

  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data ‘" + userInput + "");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

 

 

 

无状态的转换操作

技术分享

 

 

有状态的转换操作1-updateStateByKey

技术分享

 

 

 有状态的转换操作2-window

 技术分享

 

 

 技术分享

 

 有状态的转换操作2-window普通规约与增量规约

技术分享

 

理解增量规约

技术分享

 

输出操作

Dstream输出

技术分享

 

持久化操作

 技术分享

 

SparkStreaming 的编程模型

标签:design   throw   any   输入   oid   ges   throwable   int   signed   

原文地址:http://www.cnblogs.com/braveym/p/7489016.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!