1,前期准备
(1) 本文采用的hbase是采用三台服务器搭建的集群,zookeeper也是相同服务器搭建的集群,集群ip分别是192.168.183.101; 192.168.183.102; 192.168.183.103。其中102是主节点(HMaster),101以及103都是HRegionServer
(2) 这次测试安装的hbase的版本是 hbase-0.99.2.-bin.tar
(3)java api引用的maven依赖路径如下
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.2</version> </dependency>
(4)配置本地的hosts文件(在本地配置集群的ip与主机名的映射关系)
zookeeper集群的内部有时候通过主机名来进行寻址。如果不在配置hosts文件,在获得下面的Connection连接时,程序一直会在createConenction这个方法上,测试时等待了3分钟程序一直卡着。
C:\Windows\System32\drivers\etc\hosts的配置如下:
192.168.183.101 mini01
192.168.183.102 mini02
192.168.183.103 mini03
2,获取Connection对象
引入maven后,先获取hbase的java操作HBase的Connection对象,传入zookeeper的地址以及zookeeper的端口号zookeeper,通过ConnectionFactory可以获取hbase的连接Connection.
/**
* 获取Connection对象
*/
static Configuration config = null;
private Connection connection = null;
private Table table = null;
Logger LOG = LoggerFactory.getLogger(HbaseGetImpl.class);
@Before
public void init() throws Exception {
config = HBaseConfiguration.create();// 配置
config.set("hbase.zookeeper.quorum", "mini01,mini02,mini03");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
connection = ConnectionFactory.createConnection(config);
}
下面看一下ConnectionFactory.createConnection(config)的方法注解
/** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces * created from returned connection share zookeeper connection, meta cache, and connections * to region servers and masters. * <br> * The caller is responsible for calling {@link Connection#close()} on the returned * connection instance. * 调用方在调用拿到connection之后,有责任在随后的代码中调用Connection.close()方法来关闭连接 * Typical usage: 典型用法如下: * <pre> * Connection connection = ConnectionFactory.createConnection(conf); * Table table = connection.getTable(TableName.valueOf("table1")); * try { * table.get(...); * ... * } finally { * table.close(); * connection.close(); * } * </pre> * * @param conf configuration * @param user the user the connection is for * @param pool the thread pool to use for batch operations * @return Connection object for <code>conf</code> */ public static Connection createConnection(Configuration conf, ExecutorService pool, User user) throws IOException { if (user == null) { UserProvider provider = UserProvider.instantiate(conf); user = provider.getCurrent(); } return createConnection(conf, false, pool, user); }
3,创建hbase的表
利用connection对象,可以指定HBase表的表名以及表的列族,可以调用admin.createTable(htd, splits);的方法来创建Hbase的表。下面说明以下三点注意事项:
(1)hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)这个方法可以指定hbase数据块的编码类型,可以选择的是DIFF,FAST_DIFF,PREFIX这几种的编码类型。
(2)hcd.setCompressionType的方法,可以指定数据压缩算法,分别是GZ和SNAPPY。值得注意的是在我的本机集群测试时,当我选择SNAPPY作为压缩算法时,在调用testCreateTable时会卡住,并且不能创建表。当我把这行代码注释掉后,创建hbase表顺利。怀疑是我本机的hbase版本与Snappy的配置项不匹配。
(3)在创建表时,admin.createTable(htd, splits)方法可以指定指定splits的参数,预定义表的region分区,这边预定了4个region分区。
(4)在创建hbase表结束后,需要将connection关闭。
@Test public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("student2")); // Set the column family name to info. HColumnDescriptor hcd = new HColumnDescriptor("info"); // Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX // and PREFIX_TREE hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SNAANPPY // hcd.setCompressionType(Compression.Algorithm.SNAPPY); htd.addFamily(hcd); Admin admin = null; //注[2] try { // Instantiate an Admin object. admin = connection.getAdmin(); if (!admin.tableExists(TableName.valueOf("student2"))) { LOG.info("Creating table..."); // admin.createTable(htd); // 创建一个预划分region的表 byte[][] splits = new byte[4][]; splits[0] = Bytes.toBytes("A"); splits[1] = Bytes.toBytes("H"); splits[2] = Bytes.toBytes("O"); splits[3] = Bytes.toBytes("U"); admin.createTable(htd, splits); LOG.info(String.valueOf(admin.getClusterStatus())); LOG.info(String.valueOf(admin.listNamespaceDescriptors())); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin " ,e); } } } LOG.info("Exiting testCreateTable."); }
2,测试往hbase表中put数据
(1)先通过connnection以及tableName获得Table对象
(2)再构造Put对象时,Put put = new Put(rowkey),在构造方法里面的是hbase表的唯一键rowkey(相当于mysql中的id)。 此外还要指定两个参数,分别是familyName 列族的名字 ,qulifiers列名。这边需要注意的是tableName以及familyName是在创建表时就已经确定好了的。而hbase中的列名qulifers不同于mysql中的列名,mysql中的列名是在创建mysql表时就已经确定的。qulfiers可以在插入输入时随意指定名称,不是预定义的。
(3)hbase存储的数据是不带数据类型的,全是Bytes。所以在插入hbase表前,需要调用Byte.toBytes()方法,将数据转成Bytes后再进行插入。
@Test public void testPut() { // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // byte[] familyName1 = Bytes.toBytes(""); // Specify the column name. byte[][] qualifiers = { Bytes.toBytes("colu1") // Bytes.toBytes("l2"), // Bytes.toBytes("l3") }; Table table = null; try { // Instantiate an HTable object. table = connection.getTable(TableName.valueOf("student2")); List<Put> puts = new ArrayList<Put>(); for(int i=1;i<10;i=i+1){ // Instantiate a Put object. String rowkey = UUID.randomUUID().toString();
Put put = new Put(Bytes.toBytes(rowkey)); //high_risk put.addColumn(familyName, qualifiers[0], Bytes.toBytes("tommy1")); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(80)); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(UUID.randomUUID().toString())); //temporary_plate puts.add(put); } // Submit a put request. table.put(puts); LOG.info("Put successfully."); } catch (IOException e) { LOG.error("Put failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testPut."); }
3,从hbase中获取数据
获取数据的方式一共有两种:
(1)用hbase中的唯一键:rowkey,通过get方法获取数据
Get get = new Get(rowkey) ,通过指定的rowkey来构造Get对象,另外在还可以通过Get对象指定你需要获取的列名get.addColumn(familyName,qulifiers)
这个方法中需要注意的地方在: 获得cell对象后:(for(Cell cell: results.rawCells()) ) ,从cell中取出数据时,需要将hbase的存储的数据类型Bytes还原成原来的数据类型。 CellUtil.cloneValue(cell)拿到的是Bytes类型的数据,需要通过Bytes.toString() 或Bytes.toInt()等不同的方法转为原来的类型。
例如:插入时原始数据是Int,那么必须调用Bytes.toInt()的方法还原回int,初始数据如果是long类型,必须调用Bytes.toLong()的方法还原回Long, 总结来说就是插入hbase之前是什么样的,出来必须与插入前是相同的数据类型。(否则的话,取出来的数据是乱码的)
@Test public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("f1"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("l1")}; // Specify RowKey. byte[] rowKey = Bytes.toBytes("105f1fd2-7048-4fd3-8c7a-65cf04542be2"); Table table = null; try { // Create the Configuration instance. table = connection.getTable(TableName.valueOf("table")); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell)));
// + Bytes.toLong(CellUtil.cloneValue(cell));
// + Bytes.toInt(CellUtil.cloneValue(cell));
// + Bytes.toBoolean(CellUtil.cloneValue(cell));
} LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testGet."); }
(2)构建简单的scan对象扫描获取数据
这边也可以构建简单的scan对象,通过表扫描的来获取数据,获得数据的数据类型也是Bytes类型的。关于scan对象我们还可以设置scan的开始rowkey, scan的结束rowkey,scan可以设置缓存大小,可以设置rowkey过滤器,column过滤器等等过滤器。这些会在下一篇章记录。
@Test public void testScanData() { logger.info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = connection.getTable(TableName.valueOf(TABLENAME)); // Instantiate a Get object. Scan scan = new Scan(); // scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l1")); // scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l2")); // Set the cache size. scan.setCaching(1000); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { logger.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + "\"" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\":"+"\"" + Bytes.toString(CellUtil.cloneValue(cell))+"\""); } } logger.info("Scan data successfully."); } catch (IOException e) { logger.error("Scan data failed ", e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { logger.error("Close table failed ", e); } } } logger.info("Exiting testScanData."); }