码迷,mamicode.com
首页 > 其他好文 > 详细

sparkStreaming向hbase写数据

时间:2016-08-24 11:04:59      阅读:138      评论:0      收藏:0      [点我收藏+]

标签:

在SparkStreaming中统计了数据之后,我们需要将结果写入外部文件系统。

本文,以向Hbase中写数据,为例,说一下,SparkStreaming怎么向Hbase中写数据。

首先,需要说一下,下面的这个方法。

foreachRDD(func)

最通用的输出操作,把func作用于从stream生成的每一个RDD。

注意:这个函数是在 运行streaming程序的driver进程 中执行的。

下面跟着思路,看一下,怎么优雅的向Hbase中写入数据

向外部写数据 常见的错误:

向外部数据库写数据,通常会建立连接,使用连接发送数据(也就是保存数据)。

开发者可能 在driver中创建连接,而在spark worker 中保存数据

例如:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 这个会在driver中执行
  rdd.foreach { record =>
    connection.send(record) //这个会在 worker中执行
  }
}

上面这种写法是错误的!上面的写法,需要connection 对象被序列化,然后从driver发送到worker。

这样的connection是很少在机器之间传输的。知道这个问题后,我们可以写出以下的,修改后的代码:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

这种写法也是不对的。这会导致,对于每条数据,都创建一个connection(创建connection是消耗资源的)。

下面的方法会好一些:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

上面的方法,使用 rdd.foreachPartition 创建一个connection 对象, 一个RDD分区中的所有数据,都使用这一个connection。

更优的方法

在多个RDD之间,connection对象是可以重用的,所以可以创建一个连接池。如下

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一个静态的,延迟初始化的连接池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中 以便别人使用  }
}

连接池中的连接应该是,应需求而延迟创建,并且,如果一段时间没用,就超时了(也就是关闭该连接)

 

sparkStreaming向hbase写数据

标签:

原文地址:http://www.cnblogs.com/CWX21/p/5801998.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!