标签:
hbase当中没有两表联查的操作,要实现两表联查或者在查询一个表的同时也需要访问另外一张表的时候,可以通过mapreduce的方式来实现,实现方式如下:由于查询是map过程,因此这个过程不需要设计reduce过程。
(1)map的实现
package com.datacenter.HbaseMapReduce.MultiReadTable; import java.io.IOException; import java.util.NavigableMap; import java.util.Map.Entry; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.datacenter.HbaseMapReduce.Read.ReadHbase; public class MuliTableReadmapper extends TableMapper<Text, LongWritable> { private ResultScanner rs=null; @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub printResult(value); //输出第二张表的内容 Result temp=rs.next();//这个结果只是一个单元的结果,所谓一个单元可以理解成是一行的数据 while(temp!=null){ printResult(temp); temp=rs.next(); } } @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub HConnection hconn = MultiReadTableMain.HbaseUtil( MultiReadTableMain.rootdir, MultiReadTableMain.zkServer, MultiReadTableMain.port); HTableInterface ht = hconn.getTable("test"); Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for // MapReduce jobs scan.setCacheBlocks(false); // don‘t set to true for MR jobs rs = ht.getScanner(scan); } // 按顺序输出 public void printResult(Result rs) { if (rs.isEmpty()) { System.out.println("result is empty!"); return; } NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs .getMap(); String rowkey = Bytes.toString(rs.getRow()); // actain rowkey System.out.println("rowkey->" + rowkey); for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps .entrySet()) { System.out.print("\tfamily->" + Bytes.toString(temp.getKey())); for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp .getValue().entrySet()) { System.out.print("\tcol->" + Bytes.toString(value.getKey())); for (Entry<Long, byte[]> va : value.getValue().entrySet()) { System.out.print("\tvesion->" + va.getKey()); System.out.print("\tvalue->" + Bytes.toString(va.getValue())); System.out.println(); } } } } }
(2)主类的实现
package com.datacenter.HbaseMapReduce.MultiReadTable; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import com.datacenter.HbaseMapReduce.Read.ReadHbase; import com.datacenter.HbaseMapReduce.Read.ReadHbaseMapper; public class MultiReadTableMain { static public String rootdir = "hdfs://hadoop3:8020/hbase"; static public String zkServer = "hadoop3"; static public String port = "2181"; private static Configuration conf; private static HConnection hConn = null; public static HConnection HbaseUtil(String rootDir, String zkServer, String port) { conf = HBaseConfiguration.create();// 获取默认配置信息 conf.set("hbase.rootdir", rootDir); conf.set("hbase.zookeeper.quorum", zkServer); conf.set("hbase.zookeeper.property.clientPort", port); try { hConn = HConnectionManager.createConnection(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return hConn; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub HbaseUtil(rootdir, zkServer, port); // Configuration config = HBaseConfiguration.create(); Job job = new Job(conf, "ExampleRead"); job.setJarByClass(ReadHbase.class); // class that contains mapper Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for // MapReduce jobs scan.setCacheBlocks(false); // don‘t set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob("score", // input HBase table name scan, // Scan instance to control CF and attribute selection MuliTableReadmapper.class, // mapper null, // mapper output key null, // mapper output value job); job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t // emitting anything // from mapper boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }
HBase with MapReduce (MultiTable Read)
标签:
原文地址:http://www.cnblogs.com/ljy2013/p/4820076.html