标签:hdfs for load htable leo creat 综合 tin simple
一、案例分析
常见避免数据热点问题的处理方式有:加盐、哈希、反转等方法结合预分区使用。
由于目前原数据第一字段为时间戳形式,第二字段为电话号码,直接存储容易引起热点问题,通过加随机列、组合时间戳、字段反转的方式来设计Rowkey,来实现既能高效查询又能避免热点问题。(由于案例数据量小未进行预分区)
二、代码部分
1 package beifeng.hadoop.hbase; 2 import java.io.IOException; 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.hbase.HBaseConfiguration; 9 import org.apache.hadoop.hbase.HColumnDescriptor; 10 import org.apache.hadoop.hbase.HTableDescriptor; 11 import org.apache.hadoop.hbase.MasterNotRunningException; 12 import org.apache.hadoop.hbase.TableName; 13 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 14 import org.apache.hadoop.hbase.client.HBaseAdmin; 15 import org.apache.hadoop.hbase.client.Mutation; 16 import org.apache.hadoop.hbase.client.Put; 17 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 18 import org.apache.hadoop.hbase.mapreduce.TableReducer; 19 import org.apache.hadoop.io.LongWritable; 20 import org.apache.hadoop.io.NullWritable; 21 import org.apache.hadoop.mapreduce.Job; 22 import org.apache.hadoop.mapreduce.Mapper; 23 import org.apache.hadoop.mapreduce.Reducer; 24 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 25 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 26 import org.apache.hadoop.util.Tool; 27 import org.apache.hadoop.util.ToolRunner; 28 import org.apache.hadoop.io.Text; 29 30 /** 31 * 遵循rowkey的设计原则 32 * 1.rowkey不能过长 33 * 2.唯一性,加随机列 md5 34 * 3.注意避免产生数据热点 35 * 4.满足更多的查询场景 36 * @author Administrator 37 * 38 */ 39 public class LoadData extends Configured implements Tool { 40 41 /** 42 * 综合考虑 使用时间和手机 做组合key,能更好的满足应用场景 43 * @author Administrator 44 * 45 */ 46 public static class LoadDataMapper extends Mapper<LongWritable, Text, LongWritable, Text> { 47 //专门处理时间戳 =》标准时间格式 48 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHsss"); 49 private Text mapOutputValue = new Text(); 50 @Override 51 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) 52 throws IOException, InterruptedException { 53 String line = value.toString(); 54 String[] splited = line.split("\t"); 55 56 //将切分的第一个字段转成标准时间 57 String formatDate = sdf.format(new Date(Long.parseLong(splited[0].trim()))); 58 //将手机号码反转 59 String phoneNumber = splited[1].toString(); 60 String reversePhoneNumber = new StringBuffer(phoneNumber).reverse().toString(); 61 62 String rowKeyString = reversePhoneNumber +"|"+formatDate; 63 //反转手机号+“|”+时间 +正行内容拼接 64 mapOutputValue.set(rowKeyString+"\t"+ line); 65 context.write(key, mapOutputValue); 66 } 67 } 68 69 public static class LoadDataReuducer extends TableReducer<LongWritable, Text, NullWritable>{ 70 71 //设置HBase的列簇 72 private static final String COLUMN_FAMAILY = "info"; 73 @Override 74 protected void reduce(LongWritable key, Iterable<Text> values, 75 Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) 76 throws IOException, InterruptedException { 77 for (Text value:values) { 78 String[] splited = value.toString().split("\t"); 79 String rowKey = splited[0]; 80 // System.err.println(rowKey); 81 Put put = new Put(rowKey.getBytes()); 82 //put.addColumn(COLUMN_FAMAILY.getBytes(),"row".getBytes(),value.getBytes()); 83 put.add(COLUMN_FAMAILY.getBytes(), "reportTime".getBytes(), splited[1].getBytes()); 84 put.add(COLUMN_FAMAILY.getBytes(), "apmac".getBytes(), splited[3].getBytes()); 85 put.add(COLUMN_FAMAILY.getBytes(), "acmac".getBytes(), splited[4].getBytes()); 86 put.add(COLUMN_FAMAILY.getBytes(), "host".getBytes(), splited[5].getBytes()); 87 put.add(COLUMN_FAMAILY.getBytes(), "siteType".getBytes(), splited[6].getBytes()); 88 put.add(COLUMN_FAMAILY.getBytes(), "upPackNum".getBytes(), splited[7].getBytes()); 89 put.add(COLUMN_FAMAILY.getBytes(), "downPackNum".getBytes(), splited[8].getBytes()); 90 put.add(COLUMN_FAMAILY.getBytes(), "unPayLoad".getBytes(), splited[9].getBytes()); 91 put.add(COLUMN_FAMAILY.getBytes(), "downPayLoad".getBytes(), splited[10].getBytes()); 92 put.add(COLUMN_FAMAILY.getBytes(),"httpStatus".getBytes(),splited[11].getBytes()); 93 context.write(NullWritable.get(), put); 94 95 } 96 } 97 } 98 99 public static void createTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { 100 Configuration conf = HBaseConfiguration.create(); 101 conf.set("hbase.zookeeper.quorum", "beifeng01"); 102 103 HBaseAdmin admin = new HBaseAdmin(conf); 104 105 TableName tName = TableName.valueOf(tableName); 106 107 HTableDescriptor htd = new HTableDescriptor(tName); 108 HColumnDescriptor hcd = new HColumnDescriptor("info"); 109 htd.addFamily(hcd); 110 111 if(admin.tableExists(tName)) { 112 System.out.println(tableName+"is exist,trying to recrate the table"); 113 admin.disableTable(tName); 114 admin.deleteTable(tName); 115 } 116 admin.createTable(htd); 117 System.out.println("create new table"+ " " + tableName); 118 119 } 120 121 public int run(String[] args) throws Exception { 122 123 Configuration conf = this.getConf(); 124 conf.set("hbase.zookeeper.quorum", "beifeng01"); 125 conf.set(TableOutputFormat.OUTPUT_TABLE, "phoneLog"); 126 127 createTable("phoneLog"); 128 129 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); 130 job.setJarByClass(this.getClass()); 131 job.setNumReduceTasks(1); 132 133 // map class 134 job.setMapperClass(LoadDataMapper.class); 135 job.setMapOutputKeyClass(LongWritable.class); 136 job.setMapOutputValueClass(Text.class); 137 138 // reduce class 139 job.setReducerClass(LoadDataReuducer.class); 140 job.setOutputFormatClass(TableOutputFormat.class); 141 142 Path inPath = new Path(args[0]); 143 FileInputFormat.addInputPath(job, inPath); 144 145 boolean isSucced = job.waitForCompletion(true); 146 147 return isSucced ? 0 : 1; 148 } 149 150 public static void main(String[] args) throws Exception { 151 Configuration conf = HBaseConfiguration.create(); 152 153 //指定HDFS数据地址 154 args = new String[] {"hdfs://hbase/data/input/HTTP_20130313143750.data"}; 155 int status = ToolRunner.run( 156 conf, 157 new LoadData(), 158 args); 159 160 System.exit(status); 161 } 162 }
运行完程序scan后Rowkey效果同设计一致
标签:hdfs for load htable leo creat 综合 tin simple
原文地址:https://www.cnblogs.com/perfectdata/p/10072433.html