码迷,mamicode.com
首页 > 其他好文 > 详细

How to implement connection pool in spark streaming

时间:2015-06-16 09:15:13      阅读:116      评论:0      收藏:0      [点我收藏+]

标签:

在spark streaming的文档里,有这么一段:

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

但是怎么让worker得到一个ConectionPool呢?简单的想法是在使用static变量指向一个ConnectionPool。但这里有一个讲究:怎么保证这个ConnectionPool是worker上的,而不是driver上的?

用pyhton为例:

在ConnectionPool.py里实现一个pool

#/usr/bin/python
#connection_pool.py
import
psycopg2 import settings from DBUtils.PooledDB import PooledDB pool = PooledDB(psycopg2, settings.connection_pool_size, host=settings.db_host, database=settings.database, user=settings.db_user, password=settings.db_password)
def getConnection():
return pool.connection()

假设stream的主代码在main.py里,提交spark

spark-submit --py-files connection_pool.py main.py

这样connection_pool.py将被发送到worker执行,main.py里的 sendPartition 在worker节点上执行的时候就可以获得ConnectionPool.getConnection()调用。

这里的关键是明白哪些代码在driver上跑,哪些在worker上跑。

 

How to implement connection pool in spark streaming

标签:

原文地址:http://www.cnblogs.com/englefly/p/4579863.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!