标签:
maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014;
Hadoop2.6.4,HBase1.1.2
源码下载:https://github.com/fansy1990/ssh_v3/releases
部署参考:http://blog.csdn.net/fansy1990/article/details/51356583
数据下载:http://download.csdn.net/detail/fansy1990/9540865 或 http://pan.baidu.com/s/1dEVeJz7本系统就是使用客观的方法来验证伪钞。本系统采用的方案是基于冠字号的,每张人民币的冠字号是唯一的,如果有一个大表可以把所有的人民币以及人民币对应的操作(在什么时间、什么地点存入或获取)记录下来,这样在进行存取时就可以根据冠字号先查询一下,看当前冠字号对应的纸币在大表中的保存的情况,这样就可以确定当前冠字号对应的纸币是否是伪钞了(这里假设在大表中的所有冠字号对应的钞票都是真钞)。
下面对应存储场景:
| 存/取 | 最近状态(表中有无) | 真钞/伪钞 |
场景1 | 存 | 有 | 伪钞 |
场景2 | 存 | 无 | 真钞 |
场景3 | 取 | 有(此时没有无状态) | 真钞 |
目前,基于传统数据库存储数据一般在千万级别(受限于查询等性能),但是如果要存储所有钞票的信息以及其被存储或获取的记录信息,那么传统数据库肯定是不能胜任的。所以本系统是基于HBase的。
? 存储万级用户信息;
? 存储百万级别钞票信息;
? 支持前端业务每秒500+实时查询请求;
? 数据存储和计算能够可扩展;
? 提供统一接口,支持前端相关查询业务;
? 数据层:包括基础数据MySQL、文档、Web数据等;
? 数据处理层:主要是数据的加载,包括MR加载方式、Java API加载模式、Sqoop加载模式等;
? 数据存储层:主要是HBase存储,包括钞票的所有信息以及用户信息等;
? 数据服务层:主要是对外提供查询、存储等接口服务;
? 数据应用层:存取钞系统,在存钞时设计到伪钞识别;其他应用系统;
create ‘records‘,{NAME=>‘info‘,VERSIONS=>1000},SPLITS =>[‘AAAM9999‘,‘AAAZ9999‘,‘AABM9999‘]
主键/列簇 | 字段名称 | 字段含义 | 字段值举例 | 备注 |
rowkey | - | 表主键(钞票冠字号) | AAAA0000 |
|
timestamp | - | 时间戳 | 1414939140000 | long型(可以存储用户操作的时间) |
info | - | 列簇 | - | who、when、where做了哪些操作 |
exist | 是否存在 | 1 | 如果用户是存储行为,那么在行为结束后,该值为1 | |
uid | 用户ID | 4113281991XXXX9919 |
| |
bank | 存取钞银行 | SPDBCNSH | 银行编号 |
create ‘user‘,{NAME=>‘info‘},SPLITS =>[‘4113281990XXXX0000‘,‘4113281991XXXX0000‘,‘4113281992XXXX0000‘]
主键/列簇 | 字段名称 | 字段含义 | 字段值举例 | 备注 |
Rowkey | - | 用户主键(身份证号) | 4113281991XXXX9919 |
|
Timestamp | - | 时间戳 | 1414939140000 | long型 |
info | - | 列簇 | - | 用户信息 |
name | 用户名 | JACO |
| |
gender | 用户性别 | femail |
| |
bank | 用户注册银行 | SPDBCNSH | 银行编号 | |
address | 用户住址 | EXX-O94-1319151759 |
| |
| birthday | 用户出生年月 | 1981-10-20 09:12 |
|
package ssh.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import ssh.util.HadoopUtils; /** * Job Driver驱动类 * * @author fansy * */ public class ImportToHBase extends Configured implements Tool { public static final String SPLITTER = "SPLITTER"; public static final String COLSFAMILY = "COLSFAMILY"; public static final String DATEFORMAT = "DATEFORMAT"; @Override public int run(String[] args) throws Exception { if (args.length != 5) { System.err .println("Usage:\n demo.job.ImportToHBase <input> <tableName> <splitter> <rk,ts,col1:q1,col2:q1,col2:q2> <date_format>"); return -1; } if (args[3] == null || args[3].length() < 1) { System.err.println("column family can‘t be null!"); return -1; } Configuration conf = getConf(); conf.set(SPLITTER, args[2]); conf.set(COLSFAMILY, args[3]); conf.set(DATEFORMAT, args[4]); TableName tableName = TableName.valueOf(args[1]); Path inputDir = new Path(args[0]); String jobName = "Import to " + tableName.getNameAsString(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(ImportMapper.class); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(ImportMapper.class); TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); job.setNumReduceTasks(0); HadoopUtils.setCurrJob(job);// 设置外部静态Job return job.waitForCompletion(true) ? 0 : 1; } }主类的run方法中使用的是传统的MR导入HBase的代码,只是设置了额外的参数,这里主类参数意思解释如下:
package ssh.mr; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * Mapper类,接收HDFS数据,写入到HBase表中 * @author fansy * */ public class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ private static final String COMMA = ","; private static final String COLON=":"; private String splitter = null; // private String colsStr = null; private int rkIndex =0; // rowkey 下标 private int tsIndex =1; // timestamp下标 private boolean hasTs = false; // 原始数据是否有timestamp private SimpleDateFormat sf = null; private ArrayList<byte[][]> colsFamily= null; private Put put =null; ImmutableBytesWritable rowkey = new ImmutableBytesWritable(); @Override protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { splitter = context.getConfiguration().get(ImportToHBase.SPLITTER,","); String colsStr = context.getConfiguration().get(ImportToHBase.COLSFAMILY,null); sf = context.getConfiguration().get(ImportToHBase.DATEFORMAT,null)==null ? new SimpleDateFormat("yyyy-MM-dd HH:mm") :new SimpleDateFormat(context.getConfiguration().get(ImportToHBase.DATEFORMAT)); String[] cols = colsStr.split(COMMA, -1); colsFamily =new ArrayList<>(); for(int i=0;i< cols.length;i++){ if("rk".equals(cols[i])){ rkIndex= i; colsFamily.add(null); continue; } if("ts".equals(cols[i])){ tsIndex = i; colsFamily.add(null); hasTs = true; // 原始数据包括ts continue; } colsFamily.add(getCol(cols[i])); } } /** * 获取 family:qualifier byte数组 * @param col * @return */ private byte[][] getCol(String col) { byte[][] fam_qua = new byte[2][]; String[] fam_quaStr = col.split(COLON, -1); fam_qua[0]= Bytes.toBytes(fam_quaStr[0]); fam_qua[1]= Bytes.toBytes(fam_quaStr[1]); return fam_qua; } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split(splitter, -1); if(words.length!=colsFamily.size()){ System.out.println("line:"+value.toString()+" does not compatible!"); return ; } rowkey.set(getRowKey(words[rkIndex])); put = getValue(words,colsFamily,rowkey.copyBytes()); context.write(rowkey, put); } /** * 获取Put值 * @param words * @param colsFamily * @param bs * @return */ private Put getValue(String[] words, ArrayList<byte[][]> colsFamily, byte[] bs) { Put put = new Put(bs); for(int i=0;i<colsFamily.size();i++){ if(colsFamily.get(i)==null){// rk 或ts continue;// 下一个 列 } if(words[i]==null || words[i].length()==0) { // 不添加,直接往下一个value continue; } // 日期异常的记录同样添加 if(hasTs){// 插入包含时间的数据 put.addColumn(colsFamily.get(i)[0], colsFamily.get(i)[1], getLongFromDate(words[tsIndex]), Bytes.toBytes(words[i])); }else{// 不包含时间的数据 put.addColumn(colsFamily.get(i)[0], colsFamily.get(i)[1], Bytes.toBytes(words[i])); } } return put; } private long getLongFromDate(String dateStr) { try{ return sf.parse(dateStr).getTime(); }catch(ParseException e){ System.out.println(dateStr+" 转换失败!"); return 0; } } /** * 获取rowkey byte数组 * @param rowKey * @return */ private byte[] getRowKey(String rowKey) { return Bytes.toBytes(rowKey); } }
Mapper是整个流程的核心,主要负责进行数据解析、并从HDFS导入到HBase表中的工作,其各个部分功能如下:
? setup():获取输入数据字段分隔符,获取列簇、列名,获取rowkey列标,获取ts格式及列标(如果没有的话,就按照插入数据的时间设置);
? map():解析、过滤并提取数据(需要的字段数据),生成Put对象,写入HBase;
使用Java API来操作HBase数据库,完成实时HBase数据库更新,包括冠字号查询、存取款等功能。
分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990
标签:
原文地址:http://blog.csdn.net/fansy1990/article/details/51583080