标签:core 互联网 有一个 man tor 告诉 dmi select nec
我觉得这个分两种情况,一是数据量比较少,业务上每次拉取所有的数据,可以在客户端做排序,二是数据比较多,需要分页,这种情况下客户端做显然不合适,因为要从服务器拉取所有数据,排序完成,获取某一页,剩余的数据全都不用,资源损耗比较严重,比较推荐做法是充分利用 hbase rowkey 的特性,数据是按照 rowkey 字典序排列的,如果排序字段是不变的,可以把排序字段加到 rowkey 里,这样吐出的数据自然就是有序的。
假设业务上有个查询,排序的字段是可以变得,这样放到 rowkey 里就不合适了,因为排序字段变,意味着 rowkey 也会变,不是推荐的做法。这时候考虑的一种实现就是使用 redis 维护一个zset 索引,score 是排序字段,value 是对应记录的 rowkey,每次排序的字段变了,就去更新 zset 对应的数据。查询的话就相当于先去 redis 查出 rowkey 列表,然后根据 rowkey 列表去 hbase 批量查。
常见的两种方式,一种是更多这种按钮,不支持跳转到某一页,一种是可以选择某个特定的页进行跳转。这两种方式 hbase 在实现上会有区别,下面会分别介绍下。
这种分页不支持跳转到某一页,只能不断地下一页下一页,使用 hbase 可以以一种比较简单的方式实现。由服务端告诉 app 端或者 web 端下一页请求的参数,假设某一页获取20条数据,服务端去获取21条,第21条数据的 rowkey 就是下一次扫描的 startrowkey,把它加到返回给 web 或者 app 的参数里,这样就可以实现分页。有人可能会说,假设下一页的 startrowkey 返回给前端之后,这时候有新的数据插入,不是会有问题吗?这种情况其实还好,首先互联网应用大多是读多写少,你浏览某个列表时,列表内容更新的概率本来就小,就算真的发生,数据会按照排序方式插入到列表首,你不刷新首屏内容,仅仅也就是新加的内容没展现出来,不影响其他内容的展示,而只要你一刷新首屏,新的内容就出来了。
假设分页可以直接跳转到某一页呢?这个用 hbase 实现确认比较尴尬,hbase scan 扫描的时候本来就是根据 rowkey 范围和 limit 扫描,想到的实现方式依旧是 zset,score 排序字段,value 是 rowkey。也不一定非用 redis,反正就是要有一个地方维护查询索引。去 hbase 直接根据 rowkey 查询。
简单Hbase分页方案
某位仁兄发给的,不知道怎么样::::
网上大多数分页方案分为从服务端分页或者从客户端分页 服务端分页方式主要利用PageFilter过滤器,首先太复杂,其次针对集群的兼容性不是很好,作者利用服务端分页+客户端分页结合方式给出一种简单易行的中间方案。
Filter pageSize = new PageFilter(pageSize *pageNo);
List<Result> resultList = new ArrayList<>(); // 计算起始页和结束页 Integer firstPage = (pageNo) *pageSize; Integer endPage = firstPage + pageSize; //客户端分页 int i = 0; for (Result rs : scanner) { if (!rs.isEmpty() && i >= firstPage && i < endPage) { resultList.add(rs); } if (resultList.size() == log.getPageSize()) { break; } i++; }
public static void mai(String[] args){ Connection connection = HBaseClientUtil.getConnection(); table = connection.getTable(TableName.valueOf(ProcessLogUtil.HB_TB_NAME)); Scan scan = new Scan(); Filter pageSizeFilter = new PageFilter(pageSize *pageNo); scan.setFilter(pageSizeFilter ); ResultScanner scanner = table.getScanner(scan); List<Result> resultList = new ArrayList<>(); // 计算起始页和结束页 Integer firstPage = (pageNo) *pageSize; Integer endPage = firstPage + pageSize; //客户端分页 int i = 0; for (Result rs : scanner) { if (!rs.isEmpty() && i >= firstPage && i < endPage) { resultList.add(rs); } if (resultList.size() == log.getPageSize()) { break; } i++; } }
package cp.app.service.impl; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; import cp.app.batch.utils.ConfigUtil; import cp.app.comm.CpConstants; import cp.app.service.HBaseService; /** * HBase查询与插入操作工具类 * * @author author * */ //采用注入方式,HBaseService为定义的查询接口,可不需要。 @Service public class HBaseServiceImpl implements HBaseService{ private static Logger log = Logger.getLogger(HBaseServiceImpl.class.getName()); static ConfigUtil util = new ConfigUtil("conf/zookeeper.properties"); private static final String HBASE_ZOOKEEPER_QUORUM = util .getString("hbase_zookeeper_quorum"); private static final String ZOOKEEPER_ZNODE_PARENT = util .getString("zookeeper_znode_parent"); private static Configuration conf = HBaseConfiguration.create(); static { conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM); conf.set("zookeeper.znode.parent", ZOOKEEPER_ZNODE_PARENT); } /** * 创建表 * * @param tableName * 表名 * @param columnFamily * 列簇集合 * @return 成功-true 失败-false */ @SuppressWarnings("resource") public boolean createTable(String tableName, List<String> columnFamily) { try { if (StringUtils.isBlank(tableName) || columnFamily == null || columnFamily.size() < 0) { log.error("===Parameters tableName|columnFamily should not be null,Please check!==="); } HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tableName)) { return true; } else { HTableDescriptor tableDescriptor = new HTableDescriptor( TableName.valueOf(tableName)); for (String cf : columnFamily) { tableDescriptor.addFamily(new HColumnDescriptor(cf)); } admin.createTable(tableDescriptor); log.info("===Create Table " + tableName + " Success!columnFamily:" + columnFamily.toString() + "==="); } } catch (MasterNotRunningException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (ZooKeeperConnectionException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (IOException e) { // TODO Auto-generated catch block log.error(e); return false; } return true; } /** * 查询单条记录 * * @param tableName * 表名 * @param rowKey * rowKey值 * @return 返回单条记录 */ public List<Map<String, String>> selectOneByRowKey(String tableName, String rowKey) { if (StringUtils.isBlank(rowKey) || StringUtils.isBlank(tableName)) { log.error("===Parameters tableName|rowKey should not be blank,Please check!==="); return null; } List<Map<String, String>> rowList = new ArrayList<Map<String, String>>(); try { Get get = new Get(Bytes.toBytes(rowKey)); HTableInterface hTable = getHTable(tableName); if (hTable != null) { Result result = hTable.get(get); Map<String, String> cellMap = getRowByResult(result); rowList.add(cellMap); } hTable.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return rowList; } /** * 分页查询表数据 * * @param tableName * 表名 * @param ddate * 数据日期 * @param pageSize * 页大小 * @param lastrowKey * 起始rowkey值 * @return 返回查询数据结果集 */ public List<Map<String, String>> selectAllByPage(String tableName, String ddate, int pageSize, String lastrowKey) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "") || StringUtils.isBlank(lastrowKey)) { log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!==="); return null; } HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ALL); Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); Filter pageFilter = new PageFilter(pageSize); filterList.addFilter(rowFilter1); filterList.addFilter(pageFilter); if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) { Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(lastrowKey))); filterList.addFilter(rowFilter2); } scan.setFilter(filterList); List<Map<String, String>> lists = new ArrayList<Map<String, String>>(); try { ResultScanner rs = hTable.getScanner(scan); for (Result result : rs) { lists.add(getRowByResult(result)); } hTable.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return lists; } /** * 根据状态分页查询表数据 * * @param tableName * 表名 * @param ddate * 数据日期 * @param pageSize * 页大小 * @param lastrowKey * 起始rowkey值 * @param status * 发送状态 * @return 返回查询数据结果集 */ public List<Map<String, String>> selectAllByPageStatus(String tableName, String ddate, int pageSize, String lastrowKey, String status) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "") || StringUtils.isBlank(lastrowKey)) { log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!==="); return null; } HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ALL); filterList .addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL, Bytes .toBytes(status))); Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); Filter pageFilter = new PageFilter(pageSize); filterList.addFilter(rowFilter1); filterList.addFilter(pageFilter); if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) { Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(lastrowKey))); filterList.addFilter(rowFilter2); } scan.setFilter(filterList); List<Map<String, String>> lists = new ArrayList<Map<String, String>>(); try { ResultScanner rs = hTable.getScanner(scan); for (Result result : rs) { lists.add(getRowByResult(result)); } hTable.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return lists; } /** * 获取页数 * * @param tableName * 表名 * @param ddate * 数据日期 * @param pageSize * 分页大小 * @return 返回页数 */ public int getPages(String tableName, String ddate, int pageSize) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "")) { log.error("===Parameters tableName|ddate|pageSize should not be blank,Please check!==="); return 0; } enableAggregation(tableName); int total = 0; try { HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); scan.setFilter(rowFilter); AggregationClient aggregation = new AggregationClient(conf); Long count = aggregation.rowCount(hTable, new LongColumnInterpreter(), scan); total = count.intValue(); hTable.close(); } catch (Throwable e) { // TODO Auto-generated catch block log.error(e); } return (total % pageSize == 0) ? total / pageSize : (total / pageSize) + 1; } /** * 根据发送状态获取页数 * * @param tableName * 表名 * @param ddate * 数据日期 * @param pageSize * 分页大小 * @param status * 发送状态 * @return 返回页数 */ public int getPagesByStatus(String tableName, String ddate, int pageSize, String status) { if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate) || StringUtils.isBlank(pageSize + "") || StringUtils.isBlank(status)) { log.error("===Parameters tableName|ddate|pageSize|status should not be blank,Please check!==="); return 0; } enableAggregation(tableName); int total = 0; try { HTable hTable = (HTable) getHTable(tableName); Scan scan = new Scan(); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ALL); Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(ddate)); filterList.addFilter(rowFilter); filterList.addFilter(new SingleColumnValueFilter(Bytes .toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL, Bytes.toBytes(status))); scan.setFilter(filterList); AggregationClient aggregation = new AggregationClient(conf); Long count = aggregation.rowCount(hTable, new LongColumnInterpreter(), scan); total = count.intValue(); hTable.close(); } catch (Throwable e) { // TODO Auto-generated catch block log.error(e); } return (total % pageSize == 0) ? total / pageSize : (total / pageSize) + 1; } /** * 获取同一个rowkey下的记录集合 * * @param result * 结果集 * @return */ private Map<String, String> getRowByResult(Result result) { if (result == null) { log.error("===Parameter |result| should not be null,Please check!==="); return null; } Map<String, String> cellMap = new HashMap<String, String>(); for (Cell cell : result.listCells()) { String rowkey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); String cf = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); String qf = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); cellMap.put(CpConstants.HBASE_TABLE_PROP_ROWKEY, rowkey); cellMap.put(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY, cf); cellMap.put(qf, value); } return cellMap; } /** * 获取HTableInterface * * @param tableName * 表名 * @return 返回HTableInterface实例 */ private HTableInterface getHTable(String tableName) { if (StringUtils.isBlank(tableName)) { log.error("===Parameter |tableName| should not be blank,Please check!==="); return null; } HTableInterface hTable = null; try { HConnection conn = HConnectionManager.createConnection(conf); hTable = conn.getTable(Bytes.toBytes(tableName)); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); return null; } return hTable; } /** * 批量插入或更新 * * @param tableName * 表名 * @param paraList * 组装成json或xml后的参数 * @return 成功-true 失败-false */ public boolean batchPut(String tableName, List<Map<String, String>> paraList) { try { List<Put> puts = new ArrayList<Put>(); for (Map<String, String> map : paraList) { Put put = getPutByMap(map); puts.add(put); } HTable hTable = (HTable) getHTable(tableName); hTable.put(puts); hTable.close(); } catch (RetriesExhaustedWithDetailsException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (InterruptedIOException e) { // TODO Auto-generated catch block log.error(e); return false; } catch (IOException e) { // TODO Auto-generated catch block log.error(e); return false; } return true; } /** * 根据map返回put * * @param paraMap * 参数map * @return 返回put */ private Put getPutByMap(Map<String, String> paraMap) { if (paraMap == null) { log.error("===Parameter |paraMap| should not be null,Please check!==="); return null; } Set<Entry<String, String>> set = paraMap.entrySet(); Iterator<Entry<String, String>> it = set.iterator(); byte[] rowkey = Bytes.toBytes(paraMap .get(CpConstants.HBASE_TABLE_PROP_ROWKEY)); byte[] columnfamily = Bytes.toBytes(paraMap .get(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY)); Put put = new Put(rowkey); while (it.hasNext()) { Entry<String, String> entry = it.next(); String key = entry.getKey(); if (!CpConstants.HBASE_TABLE_PROP_ROWKEY.equals(key) && !CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY.equals(key)) { String value = entry.getValue(); put.add(columnfamily, Bytes.toBytes(key), Bytes.toBytes(value)); } } return put; } /** * 使表具有聚合功能 * * @param tableName * 表名 */ @SuppressWarnings("resource") private void enableAggregation(String tableName) { String coprocessorName = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; try { HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor htd = admin.getTableDescriptor(Bytes .toBytes(tableName)); List<String> coprocessors = htd.getCoprocessors(); if (coprocessors != null && coprocessors.size() > 0) { return; } else { admin.disableTable(tableName); htd.addCoprocessor(coprocessorName); admin.modifyTable(tableName, htd); admin.enableTable(tableName); } } catch (TableNotFoundException e) { // TODO Auto-generated catch block log.error(e); } catch (MasterNotRunningException e) { // TODO Auto-generated catch block log.error(e); } catch (ZooKeeperConnectionException e) { // TODO Auto-generated catch block log.error(e); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } } }
标签:core 互联网 有一个 man tor 告诉 dmi select nec
原文地址:https://www.cnblogs.com/tesla-turing/p/11670662.html