码迷,mamicode.com
首页 > 编程语言 > 详细

通过线程池,从hbase中拿数据

时间:2019-11-25 14:57:06      阅读:82      评论:0      收藏:0      [点我收藏+]

标签:demo   cli   get   apache   error   else   nec   cto   err   

1.线程池类HbasePool

package com.example.demospringboothbase.common;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class HbasePool {
    private Logger log = Logger.getLogger(HbasePool.class);



    //代理类主要用于获取连接
    public class HbaseProxy {

        private String zk;
        private String zknode;
        private Connection connection;

        public HbaseProxy(String zk, String zknode) {
            this.zk = zk;
            this.zknode = zknode;
            init();
        }

        public void init() {
            Configuration entries = HBaseConfiguration.create();
            entries.set("hbase.zookeeper.quorum",zk);
            entries.set("zookeeper.znode.parent",zknode);
            try {
                this.connection = ConnectionFactory.createConnection(entries);
            } catch (IOException e) {
                log.error("获取连接失败!");
                e.printStackTrace();
            }

        }

        public Connection getConnection(){
            return this.connection;
        }

        public void close(){
            if(this.connection !=null){
                try {
                    this.connection.close();
                } catch (IOException e) {
                    log.error("链接关闭失败~");
                    e.printStackTrace();
                }
            }
        }

    }


    public class HbasePoolFactary extends BasePooledObjectFactory<HbaseProxy>{
        private String zk;
        private String zknode;

        public HbasePoolFactary(String zk, String zknode) {
            this.zk = zk;
            this.zknode = zknode;
        }

        @Override
        public HbaseProxy create() throws Exception {
            return new HbaseProxy(this.zk,this.zknode);
        }

        @Override
        public PooledObject<HbaseProxy> wrap(HbaseProxy hbaseProxy) {
            return new DefaultPooledObject<HbaseProxy>(hbaseProxy);
        }

        @Override
        public void destroyObject(PooledObject<HbaseProxy> p) throws Exception {
            HbaseProxy object = p.getObject();
            object.close();
            super.destroyObject(p);
        }
    }

    private  static HbasePool pool;
    //开始编写我们的单例池子
    private HbasePool(){}

    public static HbasePool getPool(){
        if(pool ==null){
            pool = new HbasePool();
        }
        return pool;
    }

    //还得写一个构造池子的单例方法。用通用的池子对象来进行构造
    private GenericObjectPool<HbaseProxy> gop;

    public GenericObjectPool<HbaseProxy> getGop(String zk,String zknode){
        if(gop ==null){
            HbasePoolFactary hbasePoolFactary = new HbasePoolFactary(zk, zknode);
            gop = new GenericObjectPool<HbaseProxy>(hbasePoolFactary);
            gop.setMaxTotal(10);
        }
        return gop;
    }
}

2.通过get来拿自己hbase中的数据

这里将逻辑类和测试类写一块了。


package com.example.demospringboothbase.serverce;

import com.alibaba.fastjson.JSON;
import com.example.demospringboothbase.common.HbasePool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class test {
//从连接池中拿链接、
private HbasePool hbasePool = HbasePool.getPool();
//客户给定一个表名、rowkey、rowkey的规则、哪些列、列的规则、列簇
//输出的结果格式如下[{},{},{}]
public List<Map> resultByRowkey(String tableName, List<String> rowkey, String rowkeyAttr,
List<String> column, String columnAttr, String columnFamily) throws Exception {
//先定义一个List
ArrayList<Map> list = new ArrayList<>();
Table table = null;
//get操作是基于表名和rowkey来进行的
ArrayList<Get> gets = new ArrayList<>();
//这里将rowkey都放到gets中
for (String rk:rowkey){
Get get = null;
if (rowkeyAttr.equals("rowkey")){
get = new Get(rk.getBytes());
}
//在这里要指定列,因为只有指定列才会按照列输出,不指定列某人输出的是全部列
if (columnAttr.equals("column")){
for(String cl:column){
get.addColumn(columnFamily.getBytes(),cl.getBytes());
}
}
gets.add(get);
}
//和hbase取的联系
GenericObjectPool<HbasePool.HbaseProxy> gop = hbasePool.getGop("server3:2181", "/hbase-unsecure");
//从连接池中拿一个连接
HbasePool.HbaseProxy hbaseProxy = gop.borrowObject();
//指定表
table = hbaseProxy.getConnection().getTable(TableName.valueOf(tableName));
Result[] results = table.get(gets);
if (results!=null){
for (Result r:results){
HashMap map = new HashMap();
while (r.advance()){
Cell current = r.current();
String q = Bytes.toString(CellUtil.cloneQualifier(current));
String p = Bytes.toString(CellUtil.cloneValue(current));
map.put(q,p);
}
String rowkey1 = Bytes.toString(r.getRow());
map.put("rowkey",rowkey1);
list.add(map);
}
}else{
return list;
}
return list;
}
//测试是否成功
public static void main(String[] args) throws Exception {
test ceshi = new test();
ArrayList<String> rowkey = new ArrayList();
ArrayList<String> colum = new ArrayList();
rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222603");
// rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222802");
rowkey.add("0001b04bf9473458af40acb4c13f1476_20111230002114");
colum.add("click");
colum.add("url");
colum.add("serch");
List<Map> maps = ceshi.resultByRowkey("sogo3", rowkey, "rowkey", colum, "colum", "oo");

System.out.println(JSON.toJSONString(maps));
}
}
输出结果:[{"serch":"福彩3d单选一注法","rank":"10","rowkey":"000080fd3eaf6b381e33868ec6459c49_20111230222603","click":"5","url":"http://www.18888.com/read-htm-tid-6069520.html"},{"serch":"淫淫网","rank":"1","rowkey":"0001b04bf9473458af40acb4c13f1476_20111230002114","click":"1","url":"http://www.244uu.com/"}]

通过线程池,从hbase中拿数据

标签:demo   cli   get   apache   error   else   nec   cto   err   

原文地址:https://www.cnblogs.com/shiji7/p/11927583.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!