HBase –Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBse技术可在廉价PC Server上搭建起大规模结构化存储集群。HBase利用Hadoop HDFS作为文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具。
1.表(table),是存储管理数据的。
2.行键(row key),类似于MySQL中的主键。Mysql中的主键可有可无。
行键是HBase表天然自带的。
3.列族(colum family),列的集合。
HBASE列族是需要在定义表时指定的,只需要定义列族而不需要定义列。列是在插入数据时动态增加的。
4.时间戳(timestamp)
是64位整数
在我们数据库里它是一种日期类型可以精确到毫秒级别。
在HBASE,是列(也称作标签、修饰符)的一个属性。
行健和列确定的单元格,可以存储多个数据,每个数据还有时间戳属性。数据具有版本特性。
如果不指定时间戳或者版本,默认取最新的数据。
5.HDFS中存储的数据都是字节数组。
6.HBASE中表中的数据是按照行健的顺序物理存储的。MySQL中是按照插入的顺序存储的。
行健是按照ASIIC码排序的。
HBASE 是面向列的数据库,按照列族进行存储。关系型数据库是按照行存储的。
7.列必须用族来定义,任意一列有如下形式:族:标签。族和标签都可以是任意的字符串。
物理上将同族数据存储在一起。
数据可通过时间戳来区分版本。
修改$HBASE_HOME/conf/hbase-env.sh,修改内容如下:
export JAVA_HOME=/usr/local/jdk
export HBASE_MANAGES_ZK=true
修改$HBASE_HOME/conf/hbase-site.xml,修改内容如下:
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
启动hbase,执行命令start-hbase.sh
启动hbase之前,确保hadoop是运行正常的,并且可以写入文件
验证:(1)执行jps,发现新增加了3个java进程,分别是HMaster、HRegionServer、HQuorumPeer
(2)使用浏览器访问http://hadoop:60010
进入hbase shell,终端:
hbase shell
hbase shell操作:
创建表: create ‘表名称’, ‘列族名称1’,’列族名称2’,’列族名称N’
添加记录: put ‘表名称’, ‘行名称’, ‘列名称:’, ‘值’
查看记录: get ‘表名称’, ‘行名称’
查看表中的记录总数: count ‘表名称’
删除一张表: 先要屏蔽该表,才能对该表进行删除。
第一步 disable ‘表名称’ 第二步 drop ‘表名称’
查看所有记录: scan “表名称”
查看某个表某个列中所有数据: scan “表名称” {COLUMNS=>’列族名称:列名称’}
更新记录: 就是重写一遍进行覆盖
eg:
表users,有三个列族user_id,address,info:
create ‘users‘,‘user_id‘,‘address‘,‘info‘
列出全部表
list
得到表的描述
describe ‘users‘
删除表:
disable ‘user’
drop ‘user
添加记录:
put ‘users‘,‘xiaoming‘,‘info:age‘,‘24‘
获取一条记录
1.取得一个id的所有数据
get ‘users‘,‘xiaoming‘
2.获取一个id,一个列族的所有数据
get ‘users‘,‘xiaoming‘,‘info‘
3.获取一个id,一个列族中一个列的 所有数据
get ‘users‘,‘xiaoming‘,‘info:age‘
更新记录
put ‘users‘,‘xiaoming‘,‘info:age‘ ,‘29‘
get ‘users‘,‘xiaoming‘,‘info:age‘
put ‘users‘,‘xiaoming‘,‘info:age‘ ,‘30‘
get ‘users‘,‘xiaoming‘,‘info:age‘
获取单元格数据的版本数据
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,VERSIONS=>1}
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,VERSIONS=>2}
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,VERSIONS=>3}
获取单元格数据的版本数据
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,VERSIONS=>1}
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,VERSIONS=>2}
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,VERSIONS=>3}
获取单元格数据的某个版本数据
get ‘users‘,‘xiaoming‘,{COLUMN=>‘info:age‘,TIMESTAMP=>1364874937056}
全表扫描
scan ‘users‘
行数的判断是根据行健来判断的
HBase 中查找数据的方式:
1. get方式
2. scan方式
删除xiaoming值的’info:age’字段
delete ‘users‘,‘xiaoming‘,‘info:age‘
get ‘users‘,‘xiaoming‘
HBase列:put动态增加 delete动态减少
删除整行
deleteall ‘users‘,‘xiaoming‘
统计表的行数
count ‘users‘
清空表
truncate ‘users‘
1.
package hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
/*
* 创建表、插入记录、查询一条记录、遍历所有记录、删除表
* */
public class HBaseApp {
private static final String TABLE_NAME = "table1";
private static final String FAMILY_NAME = "family1";
private static final String ROW_KEY = "rowkey1";
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://hadoop:9000/hbase");
//使用eclipse时必须添加这个,否则无法定位
conf.set("hbase.zookeeper.quorum", "hadoop");
//创建表、删除表使用HBaseAdmin
final HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
createTable(hBaseAdmin);
//deleteTable(hBaseAdmin);
//插入记录、查询一条记录、遍历所有记录使用HTable
final HTable hTable = new HTable(conf, TABLE_NAME);
// putRecord(hTable);
// getRecord(hTable);
scanTable(hTable);
}
private static void scanTable(final HTable hTable) throws IOException {
Scan scan = new Scan();
final ResultScanner scanner = hTable.getScanner(scan);
for (Result result : scanner) {
final byte[] value = result.getValue(FAMILY_NAME.getBytes(),"age".getBytes());
System.out.println(result + "\t"+ new String(value));
}
}
//查询所有记录
//查询一条记录
private static void getRecord(final HTable hTable) throws IOException {
Get get = new Get(ROW_KEY.getBytes());
final Result result = hTable.get(get);
final byte[] value = result.getValue(FAMILY_NAME.getBytes(),"age".getBytes());
System.out.println(result + "\t"+ new String(value));
}
//插入一条记录
private static void putRecord(final HTable hTable) throws IOException {
Put put = new Put(ROW_KEY.getBytes());
put.add(FAMILY_NAME.getBytes(), "age".getBytes(), "25".getBytes());
hTable.put(put);
hTable.close();
}
//删除表
private static void deleteTable(final HBaseAdmin hBaseAdmin)
throws IOException {
hBaseAdmin.disableTable(TABLE_NAME);
hBaseAdmin.deleteTable(TABLE_NAME);
}
//创建表
private static void createTable(final HBaseAdmin hBaseAdmin)
throws IOException {
if(!hBaseAdmin.tableExists(TABLE_NAME)){
HTableDescriptor descriptor = new HTableDescriptor(TABLE_NAME); //表名
//添加列族
HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); descriptor.addFamily(family);
hBaseAdmin.createTable(descriptor);
}
}
}
2.
hdfs数据批量导入到hbase中
create ‘wlan_log‘, ‘cf‘
package hbase;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class BatchImport {
static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
Text v2 = new Text();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
try {
final Date date = new Date(Long.parseLong(splited[0].trim()));
final String dateFormat = dateformat1.format(date);
String rowKey = splited[1]+":"+dateFormat;
v2.set(rowKey+"\t"+value.toString());
context.write(key, v2);
} catch (NumberFormatException e) {
final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
counter.increment(1L);
System.out.println("出错了"+splited[0]+" "+e.getMessage());
}
};
}
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {
for (Text text : values) {
final String[] splited = text.toString().split("\t");
final Put put = new Put(Bytes.toBytes(splited[0]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));
//省略其他字段,调用put.add(....)即可
context.write(NullWritable.get(), put);
}
};
}
public static void main(String[] args) throws Exception {
final Configuration configuration = new Configuration();
//设置zookeeper
configuration.set("hbase.zookeeper.quorum", "hadoop");
//设置hbase表名称
configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
//将该值改大,防止hbase超时退出
configuration.set("dfs.socket.timeout", "180000");
final Job job = new Job(configuration, "HBaseBatchImport");
job.setMapperClass(BatchImportMapper.class);
job.setReducerClass(BatchImportReducer.class);
//设置map的输出,不设置reduce的输出类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
//不再设置输出路径,而是设置输出格式类型
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, "hdfs://hadoop:9000/input");
job.waitForCompletion(true);
}
}
1.hbase的集群搭建过程(在原来的hadoop上的hbase伪分布基础上进行搭建)
1.1 集群结构,主节点(hmaster)是hadoop,从节点(region server)是hadoop1和hadoop2
1.2 修改hadoop上的hbase的几个文件
(1)修改hbase-env.sh的最后一行export HBASE_MANAGES_ZK=false
(2)修改hbase-site.xml文件的hbase.zookeeper.quorum的值为hadoop0,hadoop1,hadoop2
(3)修改regionservers文件(存放的region server的hostname),内容修改为hadoop1、hadoop2
1.3 复制hadoop中的hbase文件夹到hadoop1、hadoop2中
复制hadoop中的/etc/profile到hadoop1、hadoop2中,在hadoop1、hadoop2上执行source /etc/profile
1.4 首先启动hadoop,然后启动zookeeper集群在各个节点上分别启动。
最后在hadoop上启动hbase集群。
因为在搭建集群的时候,hbase在运行状态下,修改了配置造成:
1) hbase web 界面进不去
2) hbase shell 操作没有结果
3) 运行stop-hbase.sh 关闭不了
解决办法:
1) 把所有节点关掉,再次启动
2) 还有就是网友提供的解决办法:杀死hbase进程 kill -9 pid 用ps –ef | grep 查看hbase进程
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/zwto1/article/details/47066523