HBase在扫描数据的时候,使用scanner表扫描器。
HTable通过一个Scan实例,调用getScanner(scan)来获取扫描器。可以配置扫描起止位以及其他的过滤条件。
通过迭代器返回查询结果,使用起来虽然不是很方便,不过并不复杂。
但是这里有一点可能被忽略的地方,就是返回的scanner迭代器,每次调用next的获取下一条记录的时候,默认配置下会访问一次RegionServer。这在网络不是很好的情况下,对性能的影响是很大的,建议配置扫描器缓存。
hbase.client.scanner.caching
配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以对其进行配置:
1、在HBase的conf配置文件中进行配置。
2、通过调用HTable.setScannerCaching(int scannerCaching)进行配置。
3、通过调用Scan.setCaching(int caching)进行配置。
三者的优先级越来越高。
package Scanner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class Scanner {
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
private Scanner(String rootDir,String zkServer,String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);
hConn = HConnectionManager.createConnection(conf);
}
public void scanTable(String tablename){
Scan scan = new Scan();
//设置扫描缓存
scan.setCaching(1000);
try {
HTableInterface table = hConn.getTable(tablename);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
format(result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void format(Result result){
//行键
String rowkey = Bytes.toString(result.getRow());
//Return an cells of a Result as an array of KeyValues
KeyValue[] kvs = result.raw();
for (KeyValue kv : kvs) {
//列族名
String family = Bytes.toString(kv.getFamily());
//列名
String qualifier = Bytes.toString(kv.getQualifier());
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));
System.out.println("rowkey->"+rowkey+", family->"
+family+", qualifier->"+qualifier);
System.out.println("value->"+value);
}
}
//命令行 scan ‘students‘
public static void main(String[] args) throws IOException {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
//初始化
Scanner conn = new Scanner(rootDir,zkServer,port);
conn.scanTable("students");
}
}
1、使用过滤器可以提高操作表的效率, HBase中两种数据读取函数get()和scan()都支持过滤器,支持直接访问和通过指定起止行键来访问,但是缺少细粒度的筛选功能(如基于正则表达式对行键或值进行筛选的功能)。
2、可以使用预定义好的过滤器或者是实现自定义过滤器。
3、 过滤器在客户端创建,通过RPC传送到服务器端,在服务器端执行过滤操作,把数据返回给客户端。
1、Comparision Filters(比较过滤器)
RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter
2、Dedicated Filters(专用过滤器)
SingleColumnValueFilter
SingleColumnValueExcludeFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
TimestampsFilter
RandomRowFilter
3、 Decorating Filters(附加过滤器)
SkipFilter
WhileMatchFilters
package Filter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class FilterDemo {
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
private FilterDemo(String rootDir,String zkServer,String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);
hConn = HConnectionManager.createConnection(conf);
}
//比较过滤器
public void filterTable(String tablename){
Scan scan = new Scan();
scan.setCaching(1000);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("Tom")));
scan.setFilter(filter);
try {
HTableInterface table = hConn.getTable(tablename);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
format(result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void filterTableRegex(String tablename){
Scan scan = new Scan();
scan.setCaching(1000);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("T\\w+"));
scan.setFilter(filter);
try {
HTableInterface table = hConn.getTable(tablename);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
format(result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
//专用过滤器
public void filterTablePage(String tablename){
PageFilter pageFilter = new PageFilter(4);//每一行3行记录,预留一行。
byte[] lastRow = null;//记录最后一次读到的rowkey作为下一次查询的rowkey
int pageCount = 0;//表示第几页
try {
HTableInterface table = hConn.getTable(tablename);
while(++pageCount>0){
System.out.println("pageCount = " + pageCount);
Scan scan = new Scan();
scan.setFilter(pageFilter);
if (lastRow != null) {
scan.setStartRow(lastRow);
}
ResultScanner resultScanner = table.getScanner(scan);
int count=0;
for (Result result : resultScanner) {
lastRow = result.getRow();
if (++count>3) {
break;
}
format(result);
}
if(count<3){//当某次读取的数据小于3表示要结束了,终止循环
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void format(Result result){
//行键
String rowkey = Bytes.toString(result.getRow());
//Return an cells of a Result as an array of KeyValues
KeyValue[] kvs = result.raw();
for (KeyValue kv : kvs) {
//列族名
String family = Bytes.toString(kv.getFamily());
//列名
String qualifier = Bytes.toString(kv.getQualifier());
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));
System.out.println("rowkey->"+rowkey+", family->"
+family+", qualifier->"+qualifier);
System.out.println("value->"+value);
}
}
public static void main(String[] args) throws IOException {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
//初始化
FilterDemo filterDemo = new FilterDemo(rootDir, zkServer, port);
//filterDemo.filterTable("students");
//filterDemo.filterTableRegex("students");
filterDemo.filterTablePage("students");
}
}
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/46699153