首页 > 其他好文 > 详细

HBase with MapReduce (MultiTable Read)

时间:2015-09-18 20:26:57      阅读:154      评论:0      收藏:0      [点我收藏+]




package com.datacenter.HbaseMapReduce.MultiReadTable;

import java.io.IOException;
import java.util.NavigableMap;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.datacenter.HbaseMapReduce.Read.ReadHbase;

public class MuliTableReadmapper extends TableMapper<Text, LongWritable> {

	private ResultScanner rs=null; 
	protected void map(ImmutableBytesWritable key, Result value, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Result temp=rs.next();//这个结果只是一个单元的结果,所谓一个单元可以理解成是一行的数据

	protected void setup(Context context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		HConnection hconn = MultiReadTableMain.HbaseUtil(
				MultiReadTableMain.rootdir, MultiReadTableMain.zkServer,

		HTableInterface ht = hconn.getTable("test");

		Scan scan = new Scan();
		scan.setCaching(500); // 1 is the default in Scan, which will be bad for
								// MapReduce jobs
		scan.setCacheBlocks(false); // don‘t set to true for MR jobs

		rs = ht.getScanner(scan);

	// 按顺序输出
	public void printResult(Result rs) {

		if (rs.isEmpty()) {
			System.out.println("result is empty!");

		NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs
		String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
		System.out.println("rowkey->" + rowkey);
		for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps
				.entrySet()) {
			System.out.print("\tfamily->" + Bytes.toString(temp.getKey()));
			for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp
					.getValue().entrySet()) {
				System.out.print("\tcol->" + Bytes.toString(value.getKey()));
				for (Entry<Long, byte[]> va : value.getValue().entrySet()) {
					System.out.print("\tvesion->" + va.getKey());
							+ Bytes.toString(va.getValue()));



package com.datacenter.HbaseMapReduce.MultiReadTable;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import com.datacenter.HbaseMapReduce.Read.ReadHbase;
import com.datacenter.HbaseMapReduce.Read.ReadHbaseMapper;

public class MultiReadTableMain {
	static public String rootdir = "hdfs://hadoop3:8020/hbase";
	static public String zkServer = "hadoop3";
	static public String port = "2181";

	private static Configuration conf;
	private static HConnection hConn = null;

	public static HConnection HbaseUtil(String rootDir, String zkServer, String port) {

		conf = HBaseConfiguration.create();// 获取默认配置信息
		conf.set("hbase.rootdir", rootDir);
		conf.set("hbase.zookeeper.quorum", zkServer);
		conf.set("hbase.zookeeper.property.clientPort", port);

		try {
			hConn = HConnectionManager.createConnection(conf);
		} catch (IOException e) {
			// TODO Auto-generated catch block
		return hConn;

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		HbaseUtil(rootdir, zkServer, port);

		// Configuration config = HBaseConfiguration.create();

		Job job = new Job(conf, "ExampleRead");
		job.setJarByClass(ReadHbase.class); // class that contains mapper

		Scan scan = new Scan();
		scan.setCaching(500); // 1 is the default in Scan, which will be bad for
								// MapReduce jobs
		scan.setCacheBlocks(false); // don‘t set to true for MR jobs
		// set other scan attrs

		TableMapReduceUtil.initTableMapperJob("score", // input HBase table name
				scan, // Scan instance to control CF and attribute selection
				MuliTableReadmapper.class, // mapper
				null, // mapper output key
				null, // mapper output value
		job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t
															// emitting anything
															// from mapper

		boolean b = job.waitForCompletion(true);
		if (!b) {
			throw new IOException("error with job!");



HBase with MapReduce (MultiTable Read)



评论 一句话评论(0
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com