标签:封装 依据 窗口 键值对 解决方案 原子操作 多用户 复制 key值
目录
本文实际上是《HBase不睡觉书》的重点归纳。该书不涉及:集群备份、ACL权限控制、REST客户端等。而对于一些不算常用的技术,书中也提示可略过(“集群搭建”除外),本文便以“略”表示。
关系型数据库在大数据情况下受到并发和关联等复杂查询的影响而导致性能下降。
非关系型数据库放弃线性一致性,仅满足最终一致性。
线性一致性:让一个系统看起来好像只有一个数据副本,而且所有的操作都是原子性的。
HBase采用KV存储,意味着随着数据量增大,查询性能也不会下降多少。同时HBase作为列式存储,可以把字段分开存储,从而分散负载。但是这样复杂的存储结构和分布式导致HBase很慢,只是在大数据环境下慢得不明显。
HBase除了不适用于小数据,如单表不超过千万,还不适用于数据分析,如做报表。只有当单表超千万,且并发高,或者数据分析相当简单且非实时。
Master(维护表结构信息)和RegionServer服务器(存储数据,HDFS上)。客户端直接通过RS获取数据,所以Master挂点不影响查询,但不能再新建表了。
ZK管理所有RS的信息,包括具体的数据段存放在哪个RS。HBase还自带了ZK,进程名为HQuirumPeer(多了H),在生产环境下要修改设置,防止其自带的ZK启动。
Region是一段数据(多个行)的集合,HBase的表一般拥有一个到多个Region。(相当于关系型数据库中的分区)
RegionServer
存放Region的容器,直观上是服务器上的一个服务。通常一个服务器只会安装一个RS。客户端通过ZK获取RS地址后,直接从RS获取数据。
Master
负责启动的时候分配Region到具体的RegionServer, 执行各种管理操作, 比如Region的分割和合并、建表、删表、移动Region、修改列族配置等跨RS操作。一般的查询、存储、删除数据都不需要。
列:是基本单位,一列或者多列组成一行。各行的列数可以不等,不同行数据或者同一行数据可以存储在不同的机器上。
每行都有唯一的航键来标定一个行的唯一性。每个列都有多个版本,多个版本的值存储在单元格中。
若干个列可以被归为列族。
rowkey:Hbase中只能根据rowkey来排序,无法根据某个col来排序。规则为字典序,例如row11在row2前面。如果向HBase插入相同的rowkey,那会更新rowkey值,旧值需要带上版本参数才能找到。一个列上可以有多个版本的单元格,它是存储的最小单位。
列族:需要一开始就设置好(列的修改就灵活很多),包括过期时间、数据块缓存以及是否压缩等,所以一个列如果不存在与列族将失去这些属性而失去意义,也因此在命名上,列名前面始终有列族名。列族的存在让同列族的列尽量存储在一台机器上。列族在满足需求的前提下尽量少,这样能减少性能的损失,也少一点BUG。
单元格:确定一条数据的表达为rowkey:column family:column:version(optional)
HBase的写入必须一行一行来。
《HBase不睡觉书》中Hadoop、Zookeeper和HBase的集群环境搭建是我见到过的最易懂且完整的教程了,连集群的开机启动脚本都逐步介绍了。
HA模式下不能用
namenode主机:端口
来访问Hadoop集群,因为端口ip已经不是固定的了,需要采用serviceid访问,它存储在ZK中。
put ‘mytable‘, ‘row1‘, ‘mycf:name‘, ‘apple‘
# 启动(练习模式)
zkServer.sh start
start-dfs.sh
start-hbase.sh
hbase shell
create ‘table_name‘, ‘col_family‘
alter ‘table_name‘, ‘col_family‘ # 新建列族 (先disable这个表,因为会影响所有拥有这个表的RS)
list # 查看有哪些表
describe ‘table_name‘ # 看表信息
scan ‘table‘,{STAETORW=>‘row3‘,ENDROW=>‘row4‘} # 看表数据,start和end可选,但生产中必须用上。如果在创建table时设置了VERSIONS参数大于1,那么scan也是可以看到历史版本记录的。
get ‘table_name‘, ‘row_key‘, ‘col_family:col_name‘ # 数据量大时比scan快不少
get ‘table_name‘, ‘row_key‘, {COLUMN=>‘col_family:col_name‘,VERSIONS=>5}
put ‘table_name‘, ‘row_key‘, ‘col_family:col_name‘, ‘value‘,timestamp(optional) # 新增,默认version为1
alter ‘table_name‘,{col_name=>‘col_name2‘,VERSIONS=>timestamp} # 修改数据,这里timestamp可以是任意数字
delete ‘table_name‘,‘row_key‘,‘col_family:col_name‘,ts # 注意,这是删除ts版本之前的所有版本。delete只是做标记,查询一般无法查到被标记的数据,即便重新put,除非用下面语句。HBase会定期清理这些标记的数据。
scan ‘table_name‘, {RAW=>true,VERSION=>5}
deleteall ‘table_name‘,‘row_key‘ # delete必须加上列
disable ‘table_name‘ # 删除表前需要将表下线。执行速度取决于该表当前的负载
drop ‘table_name‘ #删除表
没有列定义,某行数据有属性A才有了A列,如果都没有属性A就没有A列。
没有列属性,几乎没有表属性,有列族属性。
public static void main(String[] args) throws URISyntaxException, IOException {
Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path(
ClassLoader.getSystemResource("hbase-site.xml").toURI()));
conf.addResource(new Path(
ClassLoader.getSystemResource("core-site.xml").toURI()));
// 建立连接
try (Connection conn = ConnectionFactory.createConnection(conf);
// 负责管理建表、 改表、 删表等元数据操作的接口
Admin admin = conn.getAdmin()) {
// 定义表
TableName tn = TableName.valueOf("mytable");
HTableDescriptor table = new HTableDescriptor(tn);
// 定义和添加列族
HColumnDescriptor mycf = new HColumnDescriptor("mycf");
table.addFamily(mycf);
createOrOverwriteTable(admin, table);
// 修改cf属性
mycf.setCompressionType(Compression.Algorithm.GZ);
mycf.setMaxVersions(HConstants.ALL_VERSIONS); // Integer.MAX_VALUE
table.modifyFamily(mycf);
admin.modifyTable(tn, table); // 此时才真正修改
// 往 table 里添加 newcf 列族
HColumnDescriptor newCol = new HColumnDescriptor("newcf");
admin.addColumn(tn, newCol);
// 删除表
admin.disableTable(tn);
admin.deleteColumn(tn, "mycf".getBytes("UTF-8"));
admin.disableTable(tn);
}
}
// 下面只是封装后的函数,可忽略。
private static void deleteSchema(Configuration conf) throws IOException {
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
TableName tn = TableName.valueOf("mytable");
// 删除表
admin.disableTable(tn);
admin.deleteColumn(tn, "mycf".getBytes("UTF-8"));
admin.disableTable(tn);
}
}
private static void modifySchema(Configuration conf) throws IOException {
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
TableName tn = TableName.valueOf("mytable");
if (!admin.tableExists(tn)) {
System.out.println("Table does not exist.");
System.exit(-1);
}
// 往 table 里添加 newcf 列族
HColumnDescriptor newCol = new HColumnDescriptor("newcf");
admin.addColumn(tn, newCol);
// 修改cf属性
HTableDescriptor table = admin.getTableDescriptor(tn);
HColumnDescriptor mycf = new HColumnDescriptor("mycf");
mycf.setCompressionType(Compression.Algorithm.GZ);
mycf.setMaxVersions(HConstants.ALL_VERSIONS); // Integer.MAX_VALUE
table.modifyFamily(mycf);
admin.modifyTable(tn, table); // 此时才真正修改
}
}
private static void createSchemaTables(Configuration conf) throws IOException {
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
HTableDescriptor table = new HTableDescriptor(TableName.valueOf("mytable"));
// 定义和添加列族
table.addFamily(new HColumnDescriptor("mycf")
.setCompressionType(Compression.Algorithm.GZ));
createOrOverwriteTable(admin, table);
}
}
private static void createOrOverwriteTable(Admin admin, HTableDescriptor table) throws IOException {
if (admin.tableExists(table.getTableName())) {
admin.disableTable(table.getTableName());
admin.deleteTable(table.getTableName());
}
admin.createTable(table);
}
包括:put、checkAndPut、has、append、increment、get、exists、delete、checkAndDelete、mutateRow、batch、put(ArrayList)、get(ArrayList)、 delete(ArrayList)、getScanner
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
// conf 相关的同上,故省略
// 建立连接,早期直接通过conf获取table的方法已经被废除
try (Connection conn = ConnectionFactory.createConnection(conf)) {
Table table = conn.getTable(TableName.valueOf("mytable"));
// 在HBase中,所有数据都是bytes
Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"),
Bytes.toBytes("value1")) // 早期使用add,已经废除
.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"),
Bytes.toBytes("value2"));
// 执行
table.put(put);
// CheckAndPut能防止check之后插入数据之前修改数据,它有两个实现:
// 第一个调用方式是在put操作之前先把指定的value跟即将写入的行
// 中的指定列族和指定列当前的value进行比较, 如果是一致的则进行put操作并返回true。
// 第二个调用方式是第一个调用方式的增强版, 可以传入CompareOp来进行更详细的比较
// checkAndPut最后一个参数put中的rowkey必须跟第一个参数的row一致
Put put2 = new Put(Bytes.toBytes("row2"));
put2.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"),
Bytes.toBytes("ted"));
// 将旧值为 jack 的数据修改为 row2 。如果旧值已经被修改,res1 为 false。
// 如果将 row2 改为 null,则判断该数据是否存在,不存在就会 put。
boolean res1 = table.checkAndPut(Bytes.toBytes("row2"), Bytes.toBytes("mycf"),
Bytes.toBytes("name"), Bytes.toBytes("jack"), put2);
// LESS 表示传入的数如果小于当前值就 put
boolean res2 = table.checkAndPut(Bytes.toBytes("row2"), Bytes.toBytes("mycf"),
Bytes.toBytes("name"), CompareFilter.CompareOp.LESS,
Bytes.toBytes("jack"), put2);
// has
boolean has = put2.has(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
// append数据
Append append = new Append(Bytes.toBytes("row2")); // 可以设置 offset 和 len 来切割 "row2"
append.add(Bytes.toBytes("mycf"), Bytes.toBytes("name"),
Bytes.toBytes("Wang"));
table.append(append);
// increment
Increment inc = new Increment(Bytes.toBytes("row3"));
// 要保证 inc.addColumn 中的 col 是 long 型
inc.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), 10L);
table.increment(inc);
// get
Get get = new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"))
.setTimeRange(0L, 1L)
.setMaxVersions(); // 默认 Integer.MAX_VALUE, 不调用这个函数就为 1
Result res3 = table.get(get);
byte[] name = res3.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
System.out.println(Bytes.toString(name));
// 上面 res.getValue() 只能获取最新版本数据,如果想获取多个版本的,就需要 Cell
get.setMaxVersions(10);
Result res4 = table.get(get);
List<Cell> cells = res4.getColumnCells(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
for (Cell c : cells) {
// getValue 内部 也是使用 cloneValue 的
byte[] cValue = CellUtil.cloneValue(c);
System.out.println(Bytes.toString(cValue));
}
// 节省网络开销,get 一个比较大的列时能缩短传输时间
boolean exists = table.exists(get);
// delete
Delete del = new Delete(Bytes.toBytes("row1"));
del.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
table.delete(del);
// 如果当前存储的不是 tim ,则不会删除。如果传 null,则 col 不存在时删除
boolean res = table.checkAndDelete(Bytes.toBytes("row1"), Bytes.toBytes("mycf"),
Bytes.toBytes("name"), Bytes.toBytes("tim"), del);
// mutation 大原子操作,例如增加一列并删除另一列。
// 实际名字叫 mutateRow,强调针对一行进行操作,rowkey不同会报错
// 删除 mycf:age 列
Delete delete = new Delete(Bytes.toBytes("row3"));
delete.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"));
// 修改 mycf:name 为 chris
Put edit = new Put(Bytes.toBytes("row3"));
edit.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"),
Bytes.toBytes("chris"));
// 新增列
Put put3 = new Put(Bytes.toBytes("row3"));
put3.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("job"),
Bytes.toBytes("engineer"));
RowMutations rowMutations = new RowMutations(Bytes.toBytes("row3"));
rowMutations.add(delete);
rowMutations.add(edit);
rowMutations.add(put);
table.mutateRow(rowMutations);
// 只会在一开始check一次给出的value跟数据库中现有的value是否一致
table.checkAndMutate(Bytes.toBytes("row3"), Bytes.toBytes("mycf"),
Bytes.toBytes("age"), CompareFilter.CompareOp.LESS,
Bytes.toBytes("5"), rowMutations);
// 批量操作,在一个 actions 中不要同时放针对一个单元格的 put 和 delete,因执行顺序不定
List<Row> actions = new ArrayList<>();
Get get1 = new Get(Bytes.toBytes("row2"));
actions.add(get1);
Put put1 = new Put(Bytes.toBytes("row3"));
put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"),
Bytes.toBytes("lily"));
actions.add(put1);
Delete delete1 = new Delete(Bytes.toBytes("row1"));
actions.add(delete1);
Object[] res5 = new Object[actions.size()];
table.batch(actions, res5); // 早期有不需要传递 res5 的方法,但不安全,操作失败时没有返回
// res5 的结果可能是:
// null: 操作与服务器通信失败
// EmptyResult: Put和Delete操作成功后的返回结果
// Result: Get 操作成功后的结果,没有匹配就是一个空的Result
// Throwable
byte[] oneRes = ((Result) res5[0]).getValue(Bytes.toBytes("mycf"),
Bytes.toBytes("name"));
System.out.println(Bytes.toString(oneRes));
// 批量 put,不是原子,可能有的成功有的不成功。
// 不成功会重试,除非 NoSuchColumnFamilyException。
// 失败数据会被放到写缓冲区,等下一次插入数据时重试
table.put(new ArrayList<Put>());
// 批量 get,如果有一个失败有会整体失败,如果需要部分返回,则用batch
table.get(new ArrayList<Get>());
// 批量删除,如果全部删除成功,那么 List 的长度变为0
try {
table.delete(new ArrayList<Delete>());
} catch (RetriesExhaustedException e) {
// 可以获取异常的原因和参数
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
// Scan,获取多条数据。通常加上始末位置。另外,如果addColumn,那么没有添加的就不会被扫描,所以,如果想实现SELECT name from A where age > 10的时候,需要把name和age都添加进去,这样针对age的过滤器(高级API介绍)才会起作用。
Scan scan = new Scan(Bytes.toBytes("row1")) // 通常还会加上Filter,暂时忽略
.setCaching(100); // 一次 RPC 请求返回多少条数据
try (ResultScanner res6 = table.getScanner(scan);) {
// 获取结果的方法与 get 不同,因为 get 的结果要等 table.get() 执行完才能获得
// 而 scan 则是在遍历 ResultScanner 时才执行 scan
for (Result r : res6) {
String name1 = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")));
System.out.println(name1);
}
}
}
}
简单的预告与补充
HBase不支持表关联,部分支持ACID。
Namespace
把相同业务的表分成同一组,从而方便配额管理、安全管理等。目前只能通过shell调用,在创建表时加上。
create ‘myns:table1‘,‘mycf1‘
alter_namespace ‘myns‘,{METHOD=>‘set‘,‘PROPERTY_NAME‘=>‘PROPERTY_VALUE‘}
alter_namespace ‘ns1‘,{METHOD=>‘unset‘,NAME=>‘PROPERTY_VALUE‘}
HBase中有两个保留表空间是预先定义好的:
RegionServer
Mutation的子类,如Put、Append、Increment、Delete都可以通过setDurability(Durability.SKIP_WAL)
设置WAL相关。也可设置异步,并设定时间间隔,但数据有可能丢失,时间间隔权衡可能丢失的量。
WAL滚动:WAL的检查间隔由hbase.regionserver.logroll.period定义,默认值为1小时。检查的内容是把当前WAL中的操作跟实际持久化到HDFS上的操作比较,看哪些操作已经被持久化了,被持久化的操作就会被移动到.oldlogs文件夹内(这个文件夹也是在HDFS上的)。一个WAL实例包含有多个WAL文件。WAL文件的最大数量通过
hbase.regionserver.maxlogs(默认是32)参数来定义。
其他触发条件:
hbase.regionserver.hlog.blocksize * hbase.regionserver.logroll.multiplier
),前者把它设置为HDFS块大小,后者默认0.95,则如果WAL文件所占的空间大于或者等于95%的块大小,则这个WAL文件就会被归档到.oldlogs文件夹内。Master会负责定期地去清理.oldlogs文件夹,如果WAL文件没有任何引用指向。这些指向有两个:
MemStore:每个Store中有一个MemStore实例。数据写入WAL之后就会被放入MemStore。MemStore是内存的存储对象,只有当MemStore满了的时候才会将数据刷写(flush)到HFile中。
设计目的
数据是先写入WAL(HDFS,数据到达的顺序),再被放入Memstore(内存,数据整理后的顺序),最后被持久化到HFile(HDFS,数据整理后的顺序)中。HDFS中的文件是不可修改的,只能创建、追加、删除。
HFile:在Store中有多个HFile。当MemStore满了之后HBase就会在HDFS上生成一个新的HFile,然后把MemStore中的内容写到这个HFile中。HFile直接跟HDFS打交道,它是数据的存储实体。
HFile(StoreFile,HFile的抽象类)是由一个一个的块组成的。在HBase中一个块的大小默认为64KB,由列族上的BLOCKSIZE属性定义。
Data数据快组成:BlockType, Cell(KeyValue), Cell, Cell …。一行的一列代表一个KV,遍历的时候也是一个一个KV地遍历。
Data数据块的第一位存储的是块的类型,后面存储的是多个KeyValue键值对,也就是单元格(Cell)的实现类。块的类型一直在增加,例如上面提及的DATA、META、FILE_INFO、ROOT_INDEX等。
KeyValue:如下图,每个KV都有列族和列名等信息,所以如果他们太大,会很占空间。如果数据主要是归档数据,不太要求读写性能,那么建议使用压缩。
增删查改:实际上,HBase几乎总是在做新增操作。新增/修改/删除单元格时,HBase都会在HDFS新增数据,只是写上不同的cell,如版本号、类型为DELETE且值为null。
这种方式必然会破环数据的连续性和顺序性,这促使HBase需要定期进行数据的合并,在major compaction时,一旦检测到delete标记的记录就会忽略,从而实现删除(后面有稍详细的介绍)。
一个KeyValue在从客户端被发送出来到被持久化进HBase或者从HBase持久化层被读出到客户端的过程。
写入:WAL - MemStore - HFile
读取:先BlockCache再到Memstore+HFile。
墓碑标记和数据不在一个地方,所以有可能先读到数据后读到墓碑。要知道数据是否已被删除,HBase的Scan操作会继续往下扫描,直到被扫描的数据大于给出的限定条件为止,这样它才能知道哪些数据应该被返回给用户,而哪些应该被舍弃。所以你增加过滤条件也无法减少Scan遍历的行数,只有缩小STARTROW和ENDROW之间的行键范围才可以明显地加快扫描的速度。
在Scan扫描的时候store会创建StoreScanner实例。StoreScanner会把MemStore和HFile结合起来扫描,所以具体从MemStore还是HFile中读取数据,外部的调用者都不需要知道具体的细节。当StoreScanner打开的时候,会先定位到起始行键(STARTROW)上,然后开始往下扫描,一个一个KV地扫描。
宏观层面读取
0.96版本之后使用二层查询架构。
过滤器相当于SQL中的Where语句。HBase中的过滤器被用户创建出来后会被序列化为
可以网络传输的格式,然后被分发到各个RegionServer。在RegionServer中Filter被还原出来。这样在Scan的遍历和Get的过程中,不满足过滤器条件的结果就不会被返回客户端。
由于分发到不同机器,所以并不知道各个scanner的结果数量,如果需要限制数量,只能在scanner返回结果时对结果进行处理。
所有的过滤器都要实现Filter接口。HBase同时还提供了FilterBase抽象类,它提供了Filter接口的默认实现,这样大家就不必把Filter接口的每一个方法都写上自己的实现了。
// 和基础API一样,下面代码是包裹在try{}里面的
Table table = conn.getTable(TableName.valueOf("mytable"));
Scan scan = new Scan();
// 选出 value 为 ‘%apple%‘ 的数据。注意是针对 value ,不区分列
ValueFilter filter1 = new ValueFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator("apple"));
// 选出 value 为 ‘apple‘
// new BinaryComparator("apple")
// 数值比较,下面选出等于10的
// new BinaryComparator(Bytes.toBytes(10))
scan.setFilter(filter1);
try (ResultScanner res = table.getScanner(scan);){
for (Result r : res) {
String name = Bytes.toString(r.getValue(Bytes.toBytes("mycf"),
Bytes.toBytes("name")));
// 如果匹配的(无论哪一列)就会有 Result 返回,但这里打印时选择 name,所以非 name 列的
// 的 name 变量会是null
System.out.println(name);
}
}
// 选出 name 列的 value 为 ‘%apple%‘ 的数据。
// 务必保证每行记录都包含有将要比较的列,否则没有比较列的行会整行被放入结果集。
// 如果无法保证,第一种方案:则在遍历结果集的时候再次判断结果是否包含所需的列,没有的话会像上面那样返回null。
// 第二种方案:使用过滤器列表 FilterList(后面介绍)。这种方法比较慢,但不会像第一种方法那样返回过多数据。
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("mycf"),
Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL, new SubstringComparator("apple"));
// 分页过滤器,如果放到 FilterList 中,一般都是放在最后
PageFilter pageFilter = new PageFilter(2L);
// 翻页
PageFilter pageFilter1 = new PageFilter(2L);
scan.setFilter(filter1);
byte[] lastRowkey = null;
try(ResultScanner rs1 = table.getScanner(scan)){
lastRowkey = printResult(rs1); // 代码看下面
}
byte[] startRowkey = Bytes.add(lastRowkey, new byte[1]); // 加上一个0字节来排出上一次结果的最后一行
scan.setStartRow(startRowkey);
try(ResultScanner rs2 = table.getScanner(scan)){
printResult(rs2);
}
// FilterList 方案,实现 (xiamen OR shanghai) AND active=‘1‘
FilterList innerFilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
Filter xiamenFilter = new SingleColumnValueFilter(Bytes.toBytes("mycf"),
Bytes.toBytes("city"), CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("xiamen")));
innerFilterList.addFilter(xiamenFilter);
Filter shanghaiFilter = new SingleColumnValueFilter(Bytes.toBytes("mycf"),
Bytes.toBytes("city"), CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("shanghai")));
innerFilterList.addFilter(shanghaiFilter);
FilterList outerFilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
outerFilterList.addFilter(outerFilterList);
Filter activeFilter = new SingleColumnValueFilter(Bytes.toBytes("mycf"),
Bytes.toBytes("active"), CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("1")));
outerFilterList.addFilter(activeFilter);
scan.setFilter(outerFilterList);
// printResult
private static byte[] printResult(ResultScanner res){
byte[] lastRowKey = null;
for (Result r : res) {
byte[] rowkey = r.getRow();
String name = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")));
int age = Bytes.toInt(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("age")));
System.out.println(Bytes.toString(rowkey) + ": name=" + name + " age=" + age);
lastRowKey = rowkey;
}
return lastRowKey;
}
行过滤器RowFilter:根据rowkey,配合比较器来过滤,这样比Get或Scan配合STARTROW和ENDROW更灵活。
多范围过滤器MultiRowRangeFilter:把不同的范围RowRange加入到数组,然后放入MultiRowRangeFilter实现。
行键前缀过滤器PrefixFilter:使用时要配合scan.setStartRow(startRowkey),避免从头开始遍历。当遍历时发现rowkey大于规定的前缀就会停止扫描。
模糊行键过滤器FuzzyRowFilter:根据处在中间或者结尾的关键词来过滤行键。
FuzzyRowFilter filter = new FuzzyRowFilter(Arrays.asList(
new Pair<>(
Bytes.toBytesBinary("2016_??_??_4567"),
new byte[] {0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 0}
)
));
包含结尾过滤器InclusiveStopFilter:也可以通过scan.setEndRow(endRowkey),这里endRowkey = Bytes.add(endRowkey, new byte[1])
随机行过滤器RandomRowFilter:用于采样,设置一个0.0~1.0的数,相当于每个样本被采集概率。
FamilyFilter
QualifierFilter
依赖列过滤器DependentColumnFilter:表中需要设置一个依赖列,然后DependentColumnFilter以该依赖列的时间戳去过滤其他的列,凡是时间戳比依赖列的时间戳大的列都会被过滤掉。这是为了解决高并发下多用户修改数据,导致scan出来的数据是部分更新的脏数据。如下图所示,client2可以在最后才更新依赖列来避免脏读。通常加上dropDependentColumn参数为true,因为依赖列始终是有结果返回的,但如果其他列都被过滤了,那么只有依赖列的结果通常也是无意义的。有需要的话还可以加上比较器。
setBatch和DependentColumnFilter不能同时使用,因为使用了setBatch(int n)后scan每遍历n个单元格都会停下来把结果返回给客户端。这样就有可能出现,某行数据读取到一半,但是还没有读取到依赖列,就满足了batch的条件,并将结果集返回给了客户端。这种情况下,依赖列过滤器就无法工作了。
列前缀过滤器ColumnPrefixFilter
多列前缀过滤器
byte[][] filter_prefix = new byte[2][];
filter_prefix[0] = Bytes.toBytes("a");
filter_prefix[2] = Bytes.toBytes("b");
MultipleColumnPrefixFilter prefixFilter = new MultipleColumnPrefixFilter(filter_prefix);
列键过滤器(KeyOnlyFilter):在不需要value,只需要列名时使用。注意从Result中获取key的方法可参考下面代码
for (Result r : res) {
List<Cell> cells = r.listCells();
List<String> sb = new ArrayList<>();
byte[] rowkey = r.getRow();
sb.add("row=" + Bytes.toString(rowkey));
for (Cell cell : cells) {
sb.add("column=" + new String(CellUtil.cloneQualifier(cell)));
}
System.out.println(StringUtils.join(sb, ", "));
}
首次列键过滤器FirstKeyOnlyFIlter:只检索第一列后立马跳到下一行,一般用于count。
int count = 0;
for (Result r : res) {
count++;
}
列名范围过滤器ColumnRangeFilter
列数量过滤器ColumnCountGetFilter:针对Get,只选择前n个列返回。(略)
列翻页过滤器ColumnPaginationFilter:针对Get。(略)
单元格过滤器
装饰过滤器
跳转过滤器:当被包装的过滤器判断当前的KeyValue需要被跳过的时候,整行都会被跳过
Filter vf = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("north")));
Filter skipFilter = new SkipFilter(vf);
全匹配过滤器WhileMatchFilter:当遇到需要过滤的目标时停止scan。
自定义过滤器(略)
除了BinaryComparator和SubstringComparator,HBase还有下列比较器:
// 列数据生存时间。Put方法也有TTL,是单元格的TTL
newCol.setTimeToLive(10);
// 某个单元格的数据存储达到了最大版本数的数据的时候,再插入新数据会将旧数据删除
newCol.setMaxVersions(100);
// BloomFilter默认开启,且采用行模式。介绍看下面。
setBloomFilterType(BloomType bt);
//每次写入的时候是否更新布隆过滤器。默认为false
setCacheBloomsOnWrite(boolean value);
// 默认为开启
setBlockCacheEnabled(boolean blockCacheEnabled);
// 默认关闭
setMobEnabled(boolean isMobEnabled);
setMobThreshold(long threshold);
布隆过滤器
在之前的介绍可知,HFile中可有块索引,让HBase去扫描后再到data块去找,但速度还不够快,便引入布隆过滤器。如下图所示,如果BloomFilter认为某HFile中不存在所需数据,那么HBase就不会去扫描。
BloomFilter默认开启,且采用行模式,即针对行进行过滤。也有行列(ROWCOL)模式,针对列进行过滤,但如果查询中会遍历很多列,那么就不必要使用。这种行列模式需要存储行和列的信息,所以很占空间。
大字段
MOB(Medium Object):大于100KB小于10MB。HBase存储MOB字段的时候其实也是把该文件直接存储到HDFS上,而在表中只存储了该文件的链接。
该特性只在HFile版本3以上才有,所以使用该特性之前先打开hbase-site.xml确认一下你的HFile版本至少大于等于3
// 默认10GB,超过就会触发Region拆分。如果设置为null,则无限大。
setMaxFileSize();
setReadOnly();
setMemStoreFlushSize
// 增、改、删列族
// close region, serverName 为"服务器,服务器端口,服务器启动码",中间是英文逗号
admin.closeRegion(String regionname, String serverName);
// 重新上线
admin.assign(byte[] regionname);
// 获取region列表
admin.getOnlineRegions(ServerName sn);
// ServerName通过ServerName.valueOf(hostAndPort, startCode)获取
// 获取host, port, startCode
admin.getClusterStatus().getServers();
// 获取RegionServer列表,并从中获取素有Regions
Collection<ServerName> serverNames = admin.getClusterStatus().getServers();
Iterator<ServerName> iter = serverNames.iterator();
while (iter.hasNext()) {
ServerName serverName = iter.next();
List<HRegionInfo> regions = admin.getOnlineRegions(serverName);
for (HRegionInfo region : regions) {
System.out.println(region.getRegionNameAsString());
}
}
管理region的其他方法(略)
将某个表恢复到某个时刻的结构和数据,而且不需要担心创建和恢复的过程会很缓慢。这个过程实际上并没有复制数据,而是保存一份文件列表,通过修改表所链接的文件来改变表的数据。这样不但速度快,而且不额外占用磁盘空间。
使用:
// 修改配置,开启快照功能(默认开启)
// 创建快照
admin.snapshot("snapshot_name", TableName.valueOf("table"));
// 获取快照,如果这个集群之前没有创造过快照,那么得到的是空列表。
List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
// 使用快照前要将表下线
admin.disableTable(TableName.valueOf("table_name"));
while (true) {
Thread.sleep(1000);
if (admin.isTableDisabled(TableName.valueOf("table_name"))) {
admin.restoreSnapshot("snapshotName");
admin.enableTable(TableName.valueOf("table_name"));
break;
}
}
均衡器
移动Region到不同的RegionServer上。HBase使用StochasticLoadBalancer来实现,它考虑了下面5个因素:
相关参数:
影响Region的拆分/合并的参数:
average + (average * slop)
个region,那么就进行rebalance。规整器
当某个Region太大了,或者太小了就称其为不标准的Region。规整器就是为了调整region大小的。
步骤:
目录管理器
目录指的就是hbase:meta表中存储的Region信息。当HBase在拆分(Split)或者合并(merge)的时候,为了确保数据不丢失都会保留原来的Region信息。等拆分或者合并过程结束后,再使用目录管理器(catalog janitor)来清理这些旧的Region信息。
拆分过程:创建两个子Region,将数据复制到子Region中,删除父Region。
通过Admin的getClusterStatus方法可以获取集群状态(ClusterStatus)类。该类可以做很多事情,比如可以获取当前活着的RegionServer的数量、当前所有Region的数量、当前集群中的请求TPS等。
服务器负载对象
通过服务器负载对象(ServerLoad),大家可以获取当前服务器的负载信息,比如内存使用情况、磁盘使用情况、请求数量等信息。不过更丰富的信息需要从UI,即<servername>:16030/jmx获取
实现简单的权限控制。开启该功能前确保HBase使用的HFile版本达到3以上,同时在配置中开启VisibilityController。
说到权限控制,在shell中可以用whoami来得知自己的身份和用户组。在Java调用API中,利用System.getProperty("user.name")
来获知。
// 列出所有系统标签
VisibilityLabelsProtos.ListLabelsResponse resp = VisibilityClient.listLabels(conn, ".*");
List<ByteString> lables = resp.getLabelList();
for (ByteString lable : lables) {
System.out.println(lable.toStringUtf8());
}
// 添加标签,需要超级用户才能操作。超级用户可在配置文件中设置。
String[] labels2 = {"manager", "developer"};
VisibilityClient.addLabels(conn, labels2);
测试(详细看书)
内存分配
由于默认的RegionServer的内存才1GB,而Memstore默认是占40%,所以分配给Memstore的才400MB,在实际场景下,很容易就写阻塞了。在hbase-env.sh调大Master和RS各自的堆大小。
ambari例子:对于16GB的机器
如果没有MapReduce的话,RegionServer可以调整到大概一半的服务器内存。
GC调优
Full GC时间过长可能导致RS自杀(被ZK判断为宕机)。JVM提供下面4种回收器:
HBase比较合适的方案是ParallelGC和CMS的组合。
对于RS内存大于32GB的,G1GC比较适合。因为CMS在下面两种情况都会出发Full GC
XX:CMSInitiatingOccupancyFraction=70
缓解,代表堆内存占用了百分之几时启动回收。G1GC策略通过把堆内存划分为多个Region,然后对各个Region单独进行GC,这样整体的Full GC可以被最大限度地避免(Full GC还是不可避免的,我们只是尽力延迟Full GC的到来时间),而且这种策略还可以通过手动指定MaxGCPauseMillis参数来控制一旦发生Full GC的时候的最大暂停时间,避免时间太长造成RegionServer自杀。
如果内存在4~32GB之间,则需要试验,试验期间记得加上下列参数
-XX:PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
Justin Kestelyn在文章Tuning Java Garbage Collection for HBase中的调优参考结果看书。
Memstore的专属JVM策略MSLAB(MSLAB是在G1前发明的,G1与其类似,但两者可配合使用)
解决这个问题不能完全靠JVM的GC回收策略,最好的解决方案是从应用本身入手,自己来管好自己的内存空间。HBase的MSLAB机制完全沿袭了TLAB的实现思路:
这种机制会降低内存利用率,因为就算chunk里面只放1KB的数据,这个chunk也要占2MB的大小。不过可以完全避免Full GC。
相关参数(略)
拆分
为了避免传统关系型数据库把大量数据存储到同一磁盘而导致读取速度下降的问题。
拆分分为自动和手动:
自动:
Math.min(tableRegionsCount^3 * initialSize, defaultRegionMaxFileSize)
。如果设置了hbase.increasing.policy.initial.size
,initialSize就用它,否则为hbase.hregion.memstore.flush.size * 2
。defaultRegionMaxFileSize=默认10GB
。假设flush.size为128MB,那么只有一个文件时,上限为256MB,两个2GB,三个6.75Gb,4个10GB。手动:
预拆分:在建表的时候就定义好了拆分点。Linux命令为hbase org.apache.hadoop.hbase.util.RegionSplitter split_table splitAlg -c 10 -f mycf
。
splitAlg:
create ‘table‘, ‘mycf‘, SPLITS=>[‘aaa‘,‘bbb‘,‘fff‘]
强制拆分:用split命令
推荐:
合并
分为offline和online:
BlockCache的原理:读请求到HBase之后先尝试查询BlockCache,如果获取不到就去HFile(StoreFile)和Memstore中去获取。如果获取到了则在返回数据的同时把Block块缓存到BlockCache中。这些block有多种,参考HFile中的块种类。
LRUBlockCache:将Cache分为三个区域/阶段。BlockCache目前的堆内内存方案只有这个,而且无法关闭,只能调整大小。默认0.4。LRUBlockCache的问题在于可能引发Full GC。
BlockCache配置和Memstore配置的联动影响:Memstore + BlockCache的内存占用比例不能超过0.8,为必须要留20%作为机动空间。而Memstore也是默认0.4,所以调大Memstore或BlockCache时都需要调小另一个。
BucketCache(阿里员工提出):堆外缓存,默认开启。一上来就分配了14种(可自定义)区域,如4KB、8KB、16KB…,每种区域的大小都等于最大规格的4倍。缓存位置可以是堆(heap)、堆外(offheap)、文件(file,针对SSD硬盘)。BucketCache的长处是它自己来划分内存空间、自己来管理内存空间,Block放进去的时候是考虑到offset偏移量的(具体可以看源码的BucketAllocator),所以内存碎片少
每一种类型的Bucket至少要有一个Bucket,否则报错。
看缓存命中率:hadoop metrics,查看hbase.regionserver.blockCacheHitRatio
Memstore fush的五种情况:
单个memstore大小达到阈值hbase.hregion.memstore.flush.size
默认128MB:因为刷写是定期检查的,所以无法及时地在数据到达阈值时触发刷写。如果数据增加到阈值的好几倍(这个倍数通过hbase.hregion.memstore.block.multiplier
设置,默认4,所以默认阻塞机制的阈值为512MB)就会触发flush,但这个flush会阻塞所有写入该Store的写请求。HBase的这个机制本身的目的是应对如果数据再继续急速增长会带来更可怕的灾难性后果,所以不能因此而盲目增大阻塞阈值。
整个RegionServer的memstore总和达到阀值:globalMemStoreLimitLowMarkPercent * globalMemStoreSize
,前者通过hbase.regionserver.global.memstore.size.lower.limit
设置,默认0.95;后者通过hbase_heapsize(RegionServer占用的堆内存大小)* hbase.regionserver.global.memstore.size默认0.4
。一旦达到这个global.memstore.size便同样会触发一次阻塞写入的flush。
例如16G堆内存,上面lower.limit默认0.95,memstore.size默认0.4,那么触发flush的阈值是16 x 0.4 x 0.95 = 6.08,而触发阻塞的阈值为16 x 0.4 = 6.4。
WAL的数量大于maxLogs:这个flush不会阻塞,只是开启滚动,并创造新的memstore内存空间来加载WAL中的数据
Memstore达到刷写时间间隔:如果以上的所有条件都没有被触发到,HBase还是会按照一个特定的频率来flush。hbase.regionserver.optionalcacheflushinterval
,默认值为3600000,即1个小时。如果设定为0,则意味着关闭定时自动刷写。
也有手动flush(略)
合并的原因想尽量减少碎片文件,进而减少查询数据时的寻址。HFile合并操作就是在一个Store里面找到需要合并的HFile,然后把他们合并起来,最后把之前的碎文件移除。
合并策略
Minor:将Store中多个HFile合并为一个HFile。在这个过程中达到TTL的数据会被移除,但是被手动删除的数据不会被移除。这种合并触发频率较高。
0.96之后的合并策略ExploringCompactionPolicy:待合并文件挑选条件:文件 < (所有文件大小总和 - 该文件大小) * 比例因子
和文件大小小于最小合并大小hbase.hstore.compaction.min.size
(没设置就用hbase.hregion.memstore.flush.size
)都会进入待合并列表。从待合并文件中挑出多个文件进行穷举组合,组合要处于hbase.hstore.compaction.min/max
之间。挑选完组合后,比较哪个文件组合包含的文件更多,就合并哪个组合。如果出现平局,就挑选那个文件尺寸总和更小的组合。
FIFOCompactionPolicy:效果是过期的块被整个删除掉了。没过期的块完全没有操作。所以它实际上是一个删除策略。有些情况下HFile是没必要合并,比如TTL特别短,有些表只是业务中间表(而且一般不会出现跨几个HFile读取数据);BlockCache够大,可以把整个RegionServer上存储的数据都放进去。这个策略不适合TTL很长,或者MIN_VERSIONS > 0的情况。
MIN_VERSIONS就是当TTL到来的时候单元格需要保存的最小版本数。当版本达到TTL需要被删除的时候会先看一下单元格里面的版本数是不是等于MIN_VERSIONS,如果是的话就放弃删除操作。注意MIN_VERSIONS只是针对TTL,其他delete操作可以使版本数小于MIN_VERSIONS。
DateTieredCompactionPolicy:
ExploringCompactionPolicy
,也可以设置FIFO。最后一个参数是最老的层次时间。StripeCompactionPolicy:
目的:提高查询速度的稳定性,让Major Compaction可以只针对部分数据,从而克服了Major Compaction因涉及HFile文件过多而造成的IO不稳定。
Major:合并Store中的所有HFile为一个HFile。在这个过程中被手动删除的数据会被真正地移除。同时被删除的还有单元格内超过MaxVersions的版本数据。这种合并触发频率较低,默认为7天一次。不过由于Major Compaction消耗的性能较大,你不会想让它发生在业务高峰期,建议手动控制MajorCompaction的时机。
标签:封装 依据 窗口 键值对 解决方案 原子操作 多用户 复制 key值
原文地址:https://www.cnblogs.com/code2one/p/10405838.html