码迷,mamicode.com
首页 > 编程语言 > 详细

HBase 常用java api获得客户端,创建表,查询,删除

时间:2018-03-01 19:58:52      阅读:567      评论:0      收藏:0      [点我收藏+]

标签:mysq   catch   caching   snappy   server   tar   windows   student   java api   

1,前期准备

(1) 本文采用的hbase是采用三台服务器搭建的集群,zookeeper也是相同服务器搭建的集群,集群ip分别是192.168.183.101; 192.168.183.102; 192.168.183.103。其中102是主节点(HMaster),101以及103都是HRegionServer

(2) 这次测试安装的hbase的版本是 hbase-0.99.2.-bin.tar

(3)java api引用的maven依赖路径如下

 <dependency>
     <groupId>org.apache.hbase</groupId>
     <artifactId>hbase-client</artifactId>
      <version>1.0.2</version>
</dependency>

(4)配置本地的hosts文件(在本地配置集群的ip与主机名的映射关系)

zookeeper集群的内部有时候通过主机名来进行寻址。如果不在配置hosts文件,在获得下面的Connection连接时,程序一直会在createConenction这个方法上,测试时等待了3分钟程序一直卡着。

C:\Windows\System32\drivers\etc\hosts的配置如下:

192.168.183.101 mini01
192.168.183.102 mini02
192.168.183.103 mini03

 

2,获取Connection对象

引入maven后,先获取hbase的java操作HBase的Connection对象,传入zookeeper的地址以及zookeeper的端口号zookeeper,通过ConnectionFactory可以获取hbase的连接Connection.

    /**
     * 获取Connection对象 
     */
    static Configuration config = null;
    private Connection connection = null;
    private Table table = null;

    Logger LOG = LoggerFactory.getLogger(HbaseGetImpl.class);
    @Before
    public void init() throws Exception {
        config = HBaseConfiguration.create();// 配置
        config.set("hbase.zookeeper.quorum", "mini01,mini02,mini03");// zookeeper地址
        config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
        connection = ConnectionFactory.createConnection(config);
    }

下面看一下ConnectionFactory.createConnection(config)的方法注解

/**

 * Create a new Connection instance using the passed <code>conf</code> instance. Connection
 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
 * created from returned connection share zookeeper connection, meta cache, and connections
 * to region servers and masters.
 * <br>
 * The caller is responsible for calling {@link Connection#close()} on the returned
 * connection instance.
 *  调用方在调用拿到connection之后,有责任在随后的代码中调用Connection.close()方法来关闭连接
 * Typical usage:
   典型用法如下:
 * <pre>
 * Connection connection = ConnectionFactory.createConnection(conf);
 * Table table = connection.getTable(TableName.valueOf("table1"));
 * try {
 *   table.get(...);
 *   ...
 * } finally {
 *   table.close();
 *   connection.close();
 * }
 * </pre>
 *
 * @param conf configuration
 * @param user the user the connection is for
 * @param pool the thread pool to use for batch operations
 * @return Connection object for <code>conf</code>
 */
public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
throws IOException {
  if (user == null) {
    UserProvider provider = UserProvider.instantiate(conf);
    user = provider.getCurrent();
  }

  return createConnection(conf, false, pool, user);
}

 

3,创建hbase的表

利用connection对象,可以指定HBase表的表名以及表的列族,可以调用admin.createTable(htd, splits);的方法来创建Hbase的表。下面说明以下三点注意事项:

(1)hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)这个方法可以指定hbase数据块的编码类型,可以选择的是DIFF,FAST_DIFF,PREFIX这几种的编码类型。

(2)hcd.setCompressionType的方法,可以指定数据压缩算法,分别是GZ和SNAPPY。值得注意的是在我的本机集群测试时,当我选择SNAPPY作为压缩算法时,在调用testCreateTable时会卡住,并且不能创建表。当我把这行代码注释掉后,创建hbase表顺利。怀疑是我本机的hbase版本与Snappy的配置项不匹配。

(3)在创建表时,admin.createTable(htd, splits)方法可以指定指定splits的参数,预定义表的region分区,这边预定了4个region分区。

(4)在创建hbase表结束后,需要将connection关闭。

@Test
    public void testCreateTable() {

        LOG.info("Entering testCreateTable.");
// Specify the table descriptor.
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("student2"));
// Set the column family name to info.
        HColumnDescriptor hcd = new HColumnDescriptor("info");
// Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX
// and PREFIX_TREE
        hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
// Set compression methods, HBase provides two default compression
// methods:GZ and SNAPPY
// GZ has the highest compression rate,but low compression and
// decompression effeciency,fit for cold data
// SNAPPY has low compression rate, but high compression and
// decompression effeciency,fit for hot data.
// it is advised to use SNAANPPY
//   hcd.setCompressionType(Compression.Algorithm.SNAPPY);
        htd.addFamily(hcd);
        Admin admin = null; //注[2]
        try {
// Instantiate an Admin object.
            admin = connection.getAdmin();
            if (!admin.tableExists(TableName.valueOf("student2"))) {
                LOG.info("Creating table...");
//                admin.createTable(htd);
                // 创建一个预划分region的表
                byte[][] splits = new byte[4][];
                splits[0] = Bytes.toBytes("A");
                splits[1] = Bytes.toBytes("H");
                splits[2] = Bytes.toBytes("O");
                splits[3] = Bytes.toBytes("U");
                admin.createTable(htd, splits);
                LOG.info(String.valueOf(admin.getClusterStatus()));
                LOG.info(String.valueOf(admin.listNamespaceDescriptors()));
                LOG.info("Table created successfully.");
            } else {
                LOG.warn("table already exists");
            }
        } catch (IOException e) {
            LOG.error("Create table failed " ,e);
        } finally {
            if (admin != null) {
                try {
// Close the Admin object.
                    admin.close();
                } catch (IOException e) {
                    LOG.error("Failed to close admin " ,e);
                }
            }
        }
        LOG.info("Exiting testCreateTable.");
    }

2,测试往hbase表中put数据

(1)先通过connnection以及tableName获得Table对象

(2)再构造Put对象时,Put put = new Put(rowkey),在构造方法里面的是hbase表的唯一键rowkey(相当于mysql中的id)。 此外还要指定两个参数,分别是familyName 列族的名字 ,qulifiers列名。这边需要注意的是tableName以及familyName是在创建表时就已经确定好了的。而hbase中的列名qulifers不同于mysql中的列名,mysql中的列名是在创建mysql表时就已经确定的。qulfiers可以在插入输入时随意指定名称,不是预定义的。

(3)hbase存储的数据是不带数据类型的,全是Bytes。所以在插入hbase表前,需要调用Byte.toBytes()方法,将数据转成Bytes后再进行插入。

 @Test
    public void testPut() {
        // Specify the column family name.
        byte[] familyName = Bytes.toBytes("info");
//        byte[] familyName1 = Bytes.toBytes("");
        // Specify the column name.
        byte[][] qualifiers = {
                Bytes.toBytes("colu1")
//                Bytes.toBytes("l2"),
//                Bytes.toBytes("l3")
        };
        Table table = null;
        try {
            // Instantiate an HTable object.
            table = connection.getTable(TableName.valueOf("student2"));
            List<Put> puts = new ArrayList<Put>();
            for(int i=1;i<10;i=i+1){
                // Instantiate a Put object.
               String rowkey = UUID.randomUUID().toString(); 
Put put = new Put(Bytes.toBytes(rowkey)); //high_risk put.addColumn(familyName, qualifiers[0], Bytes.toBytes("tommy1")); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(80)); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(UUID.randomUUID().toString())); //temporary_plate puts.add(put); } // Submit a put request. table.put(puts); LOG.info("Put successfully."); } catch (IOException e) { LOG.error("Put failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testPut."); }

3,从hbase中获取数据

获取数据的方式一共有两种:

(1)用hbase中的唯一键:rowkey,通过get方法获取数据

Get get = new  Get(rowkey) ,通过指定的rowkey来构造Get对象,另外在还可以通过Get对象指定你需要获取的列名get.addColumn(familyName,qulifiers)    

这个方法中需要注意的地方在: 获得cell对象后:(for(Cell cell: results.rawCells()) ) ,从cell中取出数据时,需要将hbase的存储的数据类型Bytes还原成原来的数据类型。  CellUtil.cloneValue(cell)拿到的是Bytes类型的数据,需要通过Bytes.toString() 或Bytes.toInt()等不同的方法转为原来的类型。

例如:插入时原始数据是Int,那么必须调用Bytes.toInt()的方法还原回int,初始数据如果是long类型,必须调用Bytes.toLong()的方法还原回Long, 总结来说就是插入hbase之前是什么样的,出来必须与插入前是相同的数据类型。(否则的话,取出来的数据是乱码的)

@Test
    public void testGet() {
        LOG.info("Entering testGet.");
        // Specify the column family name.
        byte[] familyName = Bytes.toBytes("f1");
        // Specify the column name.
        byte[][] qualifier = { Bytes.toBytes("l1")};
        // Specify RowKey.
        byte[] rowKey = Bytes.toBytes("105f1fd2-7048-4fd3-8c7a-65cf04542be2");
        Table table = null;
        try {
            // Create the Configuration instance.
            table = connection.getTable(TableName.valueOf("table"));
            // Instantiate a Get object.
            Get get = new Get(rowKey);
            // Set the column family name and column name.
            get.addColumn(familyName, qualifier[0]);
            // Submit a get request.
            Result result = table.get(get);
            // Print query results.
            for (Cell cell : result.rawCells()) {
                LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
                        + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
                        + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
                        + Bytes.toString(CellUtil.cloneValue(cell)));
// + Bytes.toLong(CellUtil.cloneValue(cell));
// + Bytes.toInt(CellUtil.cloneValue(cell));
// + Bytes.toBoolean(CellUtil.cloneValue(cell));
} LOG.info(
"Get data successfully."); } catch (IOException e) { LOG.error("Get data failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testGet."); }

 

(2)构建简单的scan对象扫描获取数据

这边也可以构建简单的scan对象,通过表扫描的来获取数据,获得数据的数据类型也是Bytes类型的。关于scan对象我们还可以设置scan的开始rowkey, scan的结束rowkey,scan可以设置缓存大小,可以设置rowkey过滤器,column过滤器等等过滤器。这些会在下一篇章记录。

@Test
    public void testScanData() {
        logger.info("Entering testScanData.");
        Table table = null;
        // Instantiate a ResultScanner object.
        ResultScanner rScanner = null;
        try {
            // Create the Configuration instance.
            table = connection.getTable(TableName.valueOf(TABLENAME));
            // Instantiate a Get object.
            Scan scan = new Scan();
//            scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l1"));
//            scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l2"));
            // Set the cache size.
            scan.setCaching(1000);
            // Submit a scan request.
            rScanner = table.getScanner(scan);
            // Print query results.
            for (Result r = rScanner.next(); r != null; r = rScanner.next()) {
                for (Cell cell : r.rawCells()) {
                    logger.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
                            + "\"" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\":"+"\""
                            + Bytes.toString(CellUtil.cloneValue(cell))+"\"");
                }
            }
            logger.info("Scan data successfully.");
        } catch (IOException e) {
            logger.error("Scan data failed ", e);
        } finally {
            if (rScanner != null) {
                // Close the scanner object.
                rScanner.close();
            }
            if (table != null) {
                try {
                    // Close the HTable object.
                    table.close();
                } catch (IOException e) {
                    logger.error("Close table failed ", e);
                }
            }
        }
        logger.info("Exiting testScanData.");
    }

 








HBase 常用java api获得客户端,创建表,查询,删除

标签:mysq   catch   caching   snappy   server   tar   windows   student   java api   

原文地址:https://www.cnblogs.com/yanyuechao/p/8489610.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!