标签:
maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014;
Hadoop2.6.4,HBase1.1.2
源码下载:https://github.com/fansy1990/ssh_v3/releases
部署参考:http://blog.csdn.net/fansy1990/article/details/51356583
数据下载:http://download.csdn.net/detail/fansy1990/9540865 或 http://pan.baidu.com/s/1dEVeJz7public void submitJob() { Map<String, Object> jsonMap = new HashMap<String, Object>(); if (HadoopUtils.getMrLock().equals(MRLock.NOTLOCKED)) {// 没有锁,则可以提交代码 // 先加锁 HadoopUtils.setMrLock(MRLock.LOCKED); // 清空MR任务缓存 HadoopUtils.initMRCache(); // 提交任务 new Thread(new Hdfs2HBaseRunnable(hdfsFile, tableName, colDescription, splitter, dateFormat)).start(); jsonMap.put("flag", "true"); jsonMap.put("jobId", HadoopUtils.getJobId()); } else { jsonMap.put("flag", "false"); jsonMap.put("msg", "已经在运行MR程序,请确认!"); } Utils.write2PrintWriter(JSON.toJSONString(jsonMap)); return; }这里提供一个MRLock,加此锁是防止在提交任务后,任务正在运行,而有其他程序重复提交任务(监控会有问题);
ret = callByAJax("hadoop/hadoop_submitJob.action", {hdfsFile:hdfs,tableName:table,colDescription:colDescription,splitter:splitter,dateFormat:dateFormat}) if(ret.flag=="false"){// 提交任务失败 $.messager.alert(‘提示‘,ret.msg,‘warn‘); return ; } $.messager.progress({ title:‘提示‘, msg:‘导入数据中...‘, interval:0 //disable auto update progress value }); // hadoop_submitJob.action 返回的ret中包含jobId , ret.jobId if(typeof(EventSource)!=="undefined") { console.info("jobId:"+ret.jobId); var source=new EventSource("hadoop/hadoop_getMRProgress.action"+"?jobId="+ ret.jobId ); source.onmessage=function(event) { console.info(event.data); // TODO 判断event.data indexOf error ,解析:后面的值,显示,同时提示任务错误 if(event.data.indexOf( "error")> -1){ source.close(); $.messager.progress(‘close‘); $.messager.alert(‘提示‘,"任务运行失败!",‘warn‘); } // TODO 判断 event.data 为success ,则提示任务成功, 其他清空则任务进度即可 if(event.data == "success"){ source.close(); $.messager.progress(‘close‘); $.messager.alert(‘提示‘,"任务运行成功!",‘warn‘); } var bar = $.messager.progress(‘bar‘); bar.progressbar(‘setValue‘, event.data); };
public static String getJobId() { long start = System.currentTimeMillis(); while (noJobId()) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } log.info(" Getting job id ..."); } long end = System.currentTimeMillis(); log.info("获取jobId,耗时:" + (end - start) * 1.0 / 1000 + "s"); return currJob.getJobID().toString(); }
private static boolean noJobId() { if (currJob == null || currJob.getJobID() == null) return true; return false; }
/** * 检查给定的冠字号是否存在疑似伪钞冠字号 * * @param stumbers * @return * @throws IllegalArgumentException * @throws IOException */ public Map<String, String> checkStumbersExist(String stumbers) throws IllegalArgumentException, IOException { String[] stumbersArr = StringUtils.split(stumbers, Utils.COMMA); Connection connection = HadoopUtils.getHBaseConnection(); Table table = connection.getTable(TableName .valueOf(Utils.IDENTIFY_RMB_RECORDS)); Map<String, String> map = new HashMap<>(); Get get = null; try { List<Get> gets = new ArrayList<>(); for (String stumber : stumbersArr) { get = new Get(stumber.trim().getBytes()); gets.add(get); } Result[] results = table.get(gets); String exist; StringBuffer existStr = new StringBuffer(); StringBuffer notExistStr = new StringBuffer(); for (int i = 0; i < results.length; i++) { exist = new String(results[i].getValue(Utils.FAMILY, Utils.COL_EXIST)); if ("1".equals(exist)) { existStr.append(stumbersArr[i]).append(Utils.COMMA); } else if ("0".equals(exist)) { notExistStr.append(stumbersArr[i]).append(Utils.COMMA); } else { log.info("冠字号:" + stumbersArr[i] + "值 exist字段值异常!"); } } if (existStr.length() > 0) { map.put("exist", existStr.substring(0, existStr.length() - 1)); } else { map.put("exist", "nodata"); } if (notExistStr.length() > 0) { map.put("notExist", notExistStr.substring(0, notExistStr.length() - 1)); } else { map.put("notExist", "nodata"); } } catch (Exception e) { e.printStackTrace(); } return map; }
/** * 根据rowkey和版本个数查询数据 * @param tableName * @param cfs * @param rowkeys * @param versions * @return * @throws IOException */ public List<HBaseTableData> getTableCertainRowKeyData(String tableName, String cfs, String rowkeys, int versions) throws IOException { String[] stumbersArr = StringUtils.split(rowkeys, Utils.COMMA); Connection connection = HadoopUtils.getHBaseConnection(); Table table = connection.getTable(TableName .valueOf(tableName)); List<HBaseTableData> list = new ArrayList<>(); Get get = null; try { List<Get> gets = new ArrayList<>(); for (String stumber : stumbersArr) { get = new Get(stumber.trim().getBytes()); get.setMaxVersions(versions); gets.add(get); } Result[] results = table.get(gets); Cell[] cells; for (int i = 0; i < results.length; i++) { cells = results[i].rawCells(); list.addAll(getHBaseTableDataListFromCells(cells)); } return list; } catch (Exception e) { e.printStackTrace(); } return null; }
1) 根据给定的取款冠字号个数num,随机查找冠字号(rowkey)对应的op_www:exist字段值为1的num*3条记录;
2) 使用HBase.checkAndPut进行更新,把op_www:exist字段值更新为0,并返回更新后的rowkey,即冠字号;
3) 如果在num*3条记录更新后,被更新的冠字号不足num条,则再次随机查找冠字号对应的op_www:exist字段值为1的记录,并更新,返回更新后的冠字号,直到返回的冠字号个数为num;
package stumer; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; public class ReadTest { // private static String FAMILY ="info"; public static void main(String[] args) throws IOException { long size =10000; get(Utils.getConn(),Utils.generateRowKey(size)); } public static void get(Connection connection,List<byte[]> rowkeys) throws IOException { System.out.println(new Date()+":开始读取记录..."); long start =System.currentTimeMillis(); Table table = connection.getTable(TableName.valueOf(Utils.TABLE)); Get get = null ; long count =0; try{ for(byte[] rowkey :rowkeys){ count ++; // get = new Get(Bytes.toBytes("")); get = new Get(rowkey); table.get(get); if(count%1000==0){ System.out.println("count:"+count); } } long end = System.currentTimeMillis(); System.out.println(new Date()+":"+rowkeys.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s"); }catch(Exception e){ }finally{ table.close(); } } }
package stumer; import java.util.Date; import java.util.List; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Table; public class ReadThread implements Runnable { private List<byte[]> rks; private Table table; public ReadThread(Table table ,List<byte[]> rks) { this.table = table; this.rks = rks; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" "+new Date()+":开始读取记录..."); long start =System.currentTimeMillis(); Get get = null ; long count =0; try{ for(byte[] rowkey :rks){ count ++; // get = new Get(Bytes.toBytes("")); get = new Get(rowkey); table.get(get); if(count%1000==0){ System.out.println(Thread.currentThread().getName()+" count:"+count); } } long end = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName()+" "+new Date() +":"+rks.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s"); }catch(Exception e){ } } }
package stumer; import java.io.IOException; public class ReadThreadTest { public static void main(String[] args) throws IOException { long dataSize =500; int threadSize = 20; for(int i=0;i<threadSize;i++){ new Thread(new ReadThread(Utils.getTable(), Utils.generateRowKey(dataSize))).start(); } } }
package stumer; import java.io.IOException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; public class Utils { public static String TABLE = "records"; private static DecimalFormat df = new DecimalFormat( "0000" ); public static String[] crownSizePrefixes =null; static Random random = new Random(); static { crownSizePrefixes = new String[26*2]; for (int i = 0; i < crownSizePrefixes.length/2; i++) { crownSizePrefixes[i] = "AAA" + (char) (65 + i); crownSizePrefixes[i+26] = "AAB" + (char) (65 + i); } } /** * 把0~9999 转为 0000~9999 * @param num * @return */ public static String formatCrownSizeSuffix(int num){ return df.format(num); } public static Table getTable() throws IOException{ return getConn().getTable(TableName.valueOf(TABLE)); } public static String getRandomCrownSize(){ return crownSizePrefixes[random.nextInt(crownSizePrefixes.length)] +formatCrownSizeSuffix(random.nextInt(10000)); } public static Connection getConn() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.master", "node2:16000");// 指定HMaster conf.set("hbase.rootdir", "hdfs://node1:8020/hbase");// 指定HBase在HDFS上存储路径 conf.set("hbase.zookeeper.quorum", "node2,node3,node4");// 指定使用的Zookeeper集群 conf.set("hbase.zookeeper.property.clientPort", "2181");// 指定使用Zookeeper集群的端口 Connection connection = ConnectionFactory.createConnection(conf);// 获取连 return connection; } public static List<byte[]> generateRowKey(long size){ System.out.println(new Date()+"开始生成"+size +"条记录..."); long start =System.currentTimeMillis(); List<byte[]> rowkeys = new ArrayList<>(); for(int i=0;i<size;i++){ rowkeys.add(Bytes.toBytes(Utils.getRandomCrownSize())); } long end =System.currentTimeMillis(); System.out.println(new Date()+":"+rowkeys.size()+"条记录,生成耗时:"+(end-start)*1.0/1000+"s"); return rowkeys; } }
标签:
原文地址:http://blog.csdn.net/fansy1990/article/details/51583401