码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce批量操作HBase

时间:2015-09-04 14:27:39      阅读:266      评论:0      收藏:0      [点我收藏+]

标签:mapreduce批量操作hbase

欢迎访问:鲁春利的工作笔记,学习是一种信仰,让时间考验坚持的力量。



环境:

    hadoop-2.6.0

    hbase-1.0.1

    zookeeper-3.4.6

1、Hadoop集群配置过程略;

2、Zookeeper集群配置过程略;

3、HBase集群配置过程略;

4、HBase作为输入源示例

    查看当前hbase表m_domain中的数据

[hadoop@dnode1 conf]$ hbase shell
HBase Shell; enter ‘help<RETURN>‘ for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015

hbase(main):001:0> list
TABLE 
m_domain
t_domain
2 row(s) in 0.9270 seconds

=> ["m_domain", "t_domain"]
hbase(main):002:0> scan ‘m_domain‘
ROW                    COLUMN+CELL 
alibaba.com_19990415_20220523      column=cf:access_server, timestamp=1440947490018, value=\xE6\x9D\xAD\xE5\xB7\x9E
alibaba.com_19990415_20220523      column=cf:exp_date, timestamp=1440947490018, value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5
alibaba.com_19990415_20220523      column=cf:ipstr, timestamp=1440947490018, value=205.204.101.42
alibaba.com_19990415_20220523      column=cf:owner, timestamp=1440947490018, value=Hangzhou Alibaba Advertising Co.
alibaba.com_19990415_20220523      column=cf:reg_date, timestamp=1440947490018, value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5
baidu.com_19991011_20151011       column=cf:access_server, timestamp=1440947489956, value=\xE5\x8C\x97\xE4\xBA\xAC
baidu.com_19991011_20151011       column=cf:exp_date, timestamp=1440947489956, value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5       
baidu.com_19991011_20151011        column=cf:ipstr, timestamp=1440947489956, value=220.181.57.217
baidu.com_19991011_20151011       column=cf:reg_date, timestamp=1440947489956, value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5
2 row(s) in 1.4560 seconds

hbase(main):003:0> quit

    实现Mapper端

package com.invic.mapreduce.hbase;

import java.io.IOException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

/**
 * 
 * @author lucl
 *
 */
public class HBaseReaderMapper extends TableMapper<Writable, Writable> {
	private Text key = new Text();
	private Text value = new Text();
	
	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		super.setup(context);
	}
	
	@Override
	protected void map(ImmutableBytesWritable row, Result columns,Context context)
			throws IOException, InterruptedException {
		String rowKey = new String(row.get());
		byte [] columnFamily = null;
		byte [] columnQualifier = null;
		byte [] cellValue = null;
		
		StringBuffer sbf = new StringBuffer(1024);
		for (Cell cell : columns.listCells()) {
			columnFamily = CellUtil.cloneFamily(cell);
			columnQualifier = CellUtil.cloneQualifier(cell);
			cellValue = CellUtil.cloneValue(cell);
			
			sbf.append(Bytes.toString(columnFamily));
			sbf.append(".");
			sbf.append(Bytes.toString(columnQualifier));
			sbf.append(":");
			sbf.append(new String(cellValue, "UTF-8"));
		}
		
		key.set(rowKey);
		value.set(sbf.toString());
		context.write(key, value);
	}
	
	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException{
		super.cleanup(context);
	}
}

    实现MapReduce的Driver类

package com.invic.mapreduce.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * @author lucl
 * HBase作为输入源示例
 *
 */
public class HBaseASDataSourceDriver extends Configured implements Tool {
	/**
	 * 
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
	        // Eclipse中调用时需要配置
		// System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\");
		
		int exit = ToolRunner.run(new HBaseASDataSourceDriver(), args);
		System.out.println("receive exit : " + exit);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		// hadoop的参数配置
		/*conf.set("fs.defaultFS", "hdfs://cluster");
		conf.set("dfs.nameservices", "cluster");
		conf.set("dfs.ha.namenodes.cluster", "nn1,nn2");
		conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020");
		conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020");
		conf.set("dfs.client.failover.proxy.provider.cluster", 
				"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*/

		// hbase master
		// property "hbase.master" has been deprecated since 0.90
		// Just passing the ZK configuration makes your client auto-discover the master
		// conf.set("hbase.master", "nnode:60000");
		// zookeeper quorum
		getConf().set("hbase.zookeeper.property.clientport", "2181");
		getConf().set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2");
		// 是否对Map Task启用推测执行机制
		getConf().setBoolean("mapreduce.map.speculative", false);
		// 是否对Reduce Task启用推测执行机制
		getConf().setBoolean("mapreduce.reduce.speculative", false);
		
		Job job = Job.getInstance();
		job.setJobName("MyBaseReader");
		job.setJarByClass(HBaseASDataSourceDriver.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		Scan scan = new Scan();
		// scan.addFamily(family);
		// scan.addColumn(family, qualifier);
		
		byte [] tableName = Bytes.toBytes("m_domain");
		
		TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseReaderMapper.class, Text.class, Text.class, job);
		
		Path path = new Path("/" + System.currentTimeMillis());
		FileOutputFormat.setOutputPath(job, path);
		
		return job.waitForCompletion(true) ? 0 : 1;
	}
	
}

    查看结果:

技术分享    问题记录:

    a. 通过Eclipse执行时报错,但未分析出原因

技术分享    b. 放到集群环境中运行时Mapper类如果定义在Driver类中,则报错

ClassNotFound for HBaseASDataSourceDriver$HBaseReaderMapper init()

    c. zookeeper连接符总是显示连接的为127.0.0.1而非配置的zookeeper.quorum

技术分享    如果zookeeper集群环境与hbase环境在不同的机器不知道是否会出现问题。

5、Hbase作为输出源示例

    

6、HBase作为共享源示例

    

本文出自 “鲁春利的工作笔记” 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1691298

MapReduce批量操作HBase

标签:mapreduce批量操作hbase

原文地址:http://luchunli.blog.51cto.com/2368057/1691298

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!