码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce on HBase使用与集成

时间:2015-07-03 22:07:15      阅读:249      评论:0      收藏:0      [点我收藏+]

标签:hbase   mapreduce   

为什么需要MapReduce on HBase?

hbase本身并没有提供很好地二级索引方式。如果直接使用hbase提供的scan直接扫描方式,在数据量很大的情况下就会非常慢。

可以使用Mapreduce的方法操作hbase数据库。Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接。
API链接: http://hbase.apache.org/devapidocs/index.html

HBase与Hadoop的API对比

技术分享

相关类

TableMapper

package org.apache.hadoop.hbase.mapreduce;

/**
 * Extends the base <code>Mapper</code> class to add the required input key
 * and value classes.
 *
 * @param <KEYOUT>  The type of the key.
 * @param <VALUEOUT>  The type of the value.
 * @see org.apache.hadoop.mapreduce.Mapper
 */
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}

TableReducer

package org.apache.hadoop.hbase.mapreduce;
/**
 * Extends the basic <code>Reducer</code> class to add the required key and
 * value input/output classes. While the input key and value as well as the
 * output key can be anything handed in from the previous map phase the output
 * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
 * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
 * using the {@link TableOutputFormat} class.
 * <p>
 * This class is extended by {@link IdentityTableReducer} but can also be
 * subclassed to implement similar features or any custom code needed. It has
 * the advantage to enforce the output value to a specific basic type.
 * @param <KEYIN>  The type of the input key.
 * @param <VALUEIN>  The type of the input value.
 * @param <KEYOUT>  The type of the output key.
 * @see org.apache.hadoop.mapreduce.Reducer
 */
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}

TableMapReduceUtil

public class TableMapReduceUtil {
  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table  The table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job)
  throws IOException {
    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
        job, true);
  }

  /**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job)
  throws IOException {
    initTableReducerJob(table, reducer, job, null);
  }

}

Demo

package MapReduceHbase;

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;



public class HbaseMR {

    private String rootDir;
    private String zkServer;
    private String port;
    private Configuration conf; 
    private HConnection hConn = null;

    private HbaseMR(String rootDir,String zkServer,String port) throws IOException{
        this.rootDir = rootDir;
        this.zkServer = zkServer;
        this.port = port;

        conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", rootDir);
        conf.set("hbase.zookeeper.quorum", zkServer);
        conf.set("hbase.zookeeper.property.clientPort", port);
        hConn = HConnectionManager.createConnection(conf);  
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";

        HbaseMR conn = new HbaseMR(rootDir,zkServer,port);

        //Configuration conf = HBaseConfiguration.create();

        //Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conn.conf,"MapReduce on HBase");
        job.setJarByClass(HbaseMR.class);

        Scan scan = new Scan();
        scan.setCaching(1000);//事先读取多少条记录


        TableMapReduceUtil.initTableMapperJob("students",scan,
                MyMapper.class,Text.class,Text.class,job);

        TableMapReduceUtil.initTableReducerJob("students_age", MyReducer.class, job);

        job.waitForCompletion(true);
    }   
}


class MyMapper extends TableMapper<Text, Text>{

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        Text k = new Text(Bytes.toString(key.get()));
        Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"), 
                Bytes.toBytes("age")));
        //年龄  人名
        context.write(v, k);    
    }
}

class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{

    @Override
    protected void reduce(Text k2, Iterable<Text> v2s,
            Context context)
            throws IOException, InterruptedException {  
        Put put = new Put(Bytes.toBytes(k2.toString()));
        for (Text v2 : v2s) {//遍历获得所有的人名
            //列族 列  值
            put.add(Bytes.toBytes("f1"), Bytes.toBytes(v2.toString()), 
                    Bytes.toBytes(v2.toString()));
        }       
        context.write(null, put);
    }   
}

运行之前先创建一个students_age表,列族为f1。
运行结果:
技术分享

版权声明:本文为博主原创文章,未经博主允许不得转载。

MapReduce on HBase使用与集成

标签:hbase   mapreduce   

原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/46745375

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!