标签:action factory write job protected 程序打包 ble art 情况
http://www.cnblogs.com/MOBIN/p/5559575.html
摘要
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class GenerateHFile extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split( "\t" ); String ROWKEY = items[ 1 ] + items[ 2 ] + items[ 3 ]; ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes()); Put put = new Put(ROWKEY.getBytes()); //ROWKEY put.addColumn( "INFO" .getBytes(), "URL" .getBytes(), items[ 0 ].getBytes()); put.addColumn( "INFO" .getBytes(), "SP" .getBytes(), items[ 1 ].getBytes()); //出发点 put.addColumn( "INFO" .getBytes(), "EP" .getBytes(), items[ 2 ].getBytes()); //目的地 put.addColumn( "INFO" .getBytes(), "ST" .getBytes(), items[ 3 ].getBytes()); //出发时间 put.addColumn( "INFO" .getBytes(), "PRICE" .getBytes(), Bytes.toBytes(Integer.valueOf(items[ 4 ]))); //价格 put.addColumn( "INFO" .getBytes(), "TRAFFIC" .getBytes(), items[ 5 ].getBytes()); //交通方式 put.addColumn( "INFO" .getBytes(), "HOTEL" .getBytes(), items[ 6 ].getBytes()); //酒店 context.write(rowkey, put); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class GenerateHFileMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final String INPUT_PATH= "hdfs://master:9000/INFO/Input" ; final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output" ; Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "TRAVEL" ); Job job=Job.getInstance(conf); job.getConfiguration().set( "mapred.jar" , "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar" ); //预先将程序打包再将jar分发到集群上 job.setJarByClass(GenerateHFileMain. class ); job.setMapperClass(GenerateHFile. class ); job.setMapOutputKeyClass(ImmutableBytesWritable. class ); job.setMapOutputValueClass(Put. class ); job.setOutputFormatClass(HFileOutputFormat2. class ); HFileOutputFormat2.configureIncrementalLoad(job,table,table.getRegionLocator()); FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion( true )? 0 : 1 ); } } |
1
2
3
4
5
6
7
8
|
public class LoadIncrementalHFileToHBase { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(configuration); LoadIncrementalHFiles loder = new LoadIncrementalHFiles(configuration); loder.doBulkLoad( new Path( "hdfs://master:9000/HFILE/OutPut" ), new HTable(conf, "TRAVEL" )); } } |
标签:action factory write job protected 程序打包 ble art 情况
原文地址:http://www.cnblogs.com/yaohaitao/p/6995278.html