标签:
package cn.tansun.bd.hbase;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.tansun.bd.utils.JDBCUtils;
/**
* @author 作者 E-mail:
* @version 创建时间:2016年7月5日 下午7:57:17 类说明
*/
public class HiveMySQl2HBaseMR {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
getDatas();
/*
* if (args.length != 3){ System.err.println(
* "Usage: HiveMySQl2HBaseMR <table_name><data_input_path><hfile_output_path>"
* ); System.exit( 2 ); }
*/
Configuration conf = new Configuration();
conf.addResource("hbase-site.xml");
String table = "2";
String input = "hdfs://node11:9000/datas/hivedata5";
String output = "hdfs://node11:9000/datas/out15";
HTable htable;
try {
// 运行前,删除已存在的中间输出目录
try {
FileSystem fs = FileSystem.get(URI.create(output), conf);
fs.delete(new Path(output), true);
fs.close();
} catch (IOException e1) {
e1.printStackTrace();
}
htable = new HTable(conf, table.getBytes());
Job job = new Job(conf);
job.setJobName("Generate HFile");
job.setJarByClass(HiveMySQl2HBaseMR.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(HiveMySQlMapper.class);
FileInputFormat.setInputPaths(job, input);
job.getConfiguration().set("mapred.mapoutput.key.class",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable");
job.getConfiguration().set("mapred.mapoutput.value.class",
"org.apache.hadoop.hbase.KeyValue");
FileOutputFormat.setOutputPath(job, new Path(output));
HFileOutputFormat2.configureIncrementalLoad(job, htable);
try {
job.waitForCompletion(true);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static String tableName;
public static String cf = null;
public static String strRowkey = null;
public static String strIndex = null;
public static String column_name = null;
public static String strColumn = null;
// mysql读取获得cf、rowKey、cloumn, qual
@SuppressWarnings("rawtypes")
public static List<Map> getDatas() {
// List<Map> listDatas = new ArrayList<Map>();
String sql = "SELECT DISTINCT s.tableName, ar.rowkey,af.column_family, aq.column_hive_index, aq.column_name FROM "
+ " archive_htable s, archive_hrowkey ar, archive_hfamily af, archive_hqualifier aq WHERE "
+ " s.rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id AND s.tableName = ‘2‘";
List<Map> selectDatas = JDBCUtils.selectDatas(sql);
for(Map<String,String> metaData:selectDatas){
if(null==tableName){
tableName=metaData.get("tableName");
}
if(null==cf){
cf=metaData.get("column_family");
}
if(null==strRowkey){
strRowkey=metaData.get("rowkey");
}
String strTempIndex = metaData.get("column_hive_index");
String strTempName = metaData.get("column_name");
if (null==strColumn||(null!=strColumn&&"".equals(strColumn))) {
strColumn = strTempIndex + " " + strTempName;
} else {
strColumn = strColumn + "," + strTempIndex + " "
+ strTempName;
}
}
return selectDatas;
}
}
class HiveMySQlMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
String tableName = HiveMySQl2HBaseMR.tableName;
String cf = HiveMySQl2HBaseMR.cf;
String rowKey = HiveMySQl2HBaseMR.strRowkey;
String strColumnName = HiveMySQl2HBaseMR.column_name;
String strColumn = HiveMySQl2HBaseMR.strColumn;
String split = "001";
// private String strRowKey;
/*
* @Override protected void setup(Context context) {
*
* tableName = context.getConfiguration().get("tableName"); cf =
* context.getConfiguration().get("cf"); rowKey =
* context.getConfiguration().get("rowkey"); split =
* context.getConfiguration().get("split"); }
*/
@Override
protected void map(
LongWritable key,
Text value,
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
throws IOException, InterruptedException {
// 将rowkey 是数字并且带有","的分隔符去掉,获得对应的数字
// get rkMaps indexa
String strRowKey = "";
String[] datas = value.toString().split("\t");
// get rowkey
/* for (String strIndex : rowKey.split(",")) {
strRowKey = datas[Integer.valueOf(strIndex)] + split;
}*/
for (String strIndex : rowKey.split(",")) {
if (null ==(strRowKey) || (null != strRowKey)&& "".equals(strRowKey)) {
strRowKey = datas[Integer.valueOf(strIndex)];
System.out.println(strRowKey +"strRowKey1");
} else {
strRowKey = strRowKey + split
+ datas[Integer.valueOf(strIndex)];
System.out.println(strRowKey +"strRowKey2");
}
}
for (String str : strColumn.split(",")) {
String[] columnTupe = str.split("\t");
String columnData = datas[Integer.valueOf(columnTupe[0])];
String columnName = columnTupe[1];
System.out.println(columnData + "columnDatacolumnData");
ImmutableBytesWritable rk = new ImmutableBytesWritable(
Bytes.toBytes(rowKey));
// byte[] row, byte[] family, byte[] qualifier, byte[] value
KeyValue kv = new KeyValue(Bytes.toBytes(strRowKey), // "a\001b\001\c\001"
cf.getBytes(), Bytes.toBytes(columnName),
Bytes.toBytes(columnData));
context.write(rk, kv);
}
}
}
JDBCUtils类:
package cn.tansun.bd.utils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
/**
* @author 作者 E-mail: zgl
* @version 创建时间:2016年6月23日 下午4:25:03 类说明
*/
public class JDBCUtils {
public JDBCUtils()
{
}
public static String PATH = "jdbc.properties";
public static Properties prop;
public static String url = null;
public static String username = null;
public static String password = null;
public static Connection conn;
public static Statement stmt;
public static ResultSet rs;
public static String fileName = null;
static {
try {
InputStream inputStream = JDBCUtils.class.getClassLoader().getResourceAsStream( PATH );
prop = new Properties();
prop.load( inputStream );
url = prop.getProperty( "jdbc.url" );
username = prop.getProperty( "jdbc.username" );
password = prop.getProperty( "jdbc.password" );
if ( inputStream != null ) {
inputStream.close();
}
}
catch ( IOException e ) {
e.printStackTrace();
}
}
public static void closeConnection( Connection conn ) {
if ( conn != null ) {
try {
conn.close();
}
catch ( SQLException e ) {
e.printStackTrace();
}
}
}
/**
* 根据sql语句查询
*
* @param sql
* @return
*/
@SuppressWarnings( "rawtypes" )
public static List<Map> selectDatas( String sql ) {
List<Map> listDatas = new ArrayList<Map>();
try {
conn = DriverManager.getConnection( url, username, password );
conn.setAutoCommit( false );
stmt =
conn.prepareStatement( "load data local infile ‘‘ " + "into table loadtest fields terminated by ‘,‘" );
StringBuilder sb = new StringBuilder();
InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
rs = stmt.executeQuery( sql );
if ( rs != null ) {
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
Map<String, Object> map = null;
while ( rs.next() ) {
map = new HashMap<String, Object>();
for ( int i = 1; i < count + 1; i++ ) {
map.put( metaData.getColumnName( i ), rs.getObject( i ) );
}
listDatas.add( map );
}
}
}
catch ( SQLException e ) {
e.printStackTrace();
}
return listDatas;
}
/**
*
* @param sql
* @return
*/
public static List<String> getStrMap( String sql) {
List<String> strList = new ArrayList<String>();
try {
conn = DriverManager.getConnection( url, username, password );
conn.setAutoCommit( false );
stmt =
conn.prepareStatement( "load data local infile ‘‘ " + "into table loadtest fields terminated by ‘,‘" );
StringBuilder sb = new StringBuilder();
InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
rs = stmt.executeQuery( sql );
if ( rs != null ) {
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
while (rs.next()){
for (int i = 1; i < count + 1; i++){
//String str1 = metaData.getColumnName( i );
String str2 = (String) rs.getObject( i );
strList.add(str2);
}
}
}
}
catch ( SQLException e ) {
e.printStackTrace();
}
return strList;
}
public static String table_name = null;
public static String rowkey = null;
public static String column_family = null;
public static String column_name = null;
private static String rows = null;
public static String sql = null;
public static String sql2 = null;
@SuppressWarnings( "rawtypes" )
public static void main( String[] args ) {
sql2 = "SELECT GROUP_CONCAT( DISTINCT aq.column_hive_index,‘ ‘, aq.column_name ,‘ ‘ ORDER BY "
+ " aq.column_hive_index SEPARATOR ‘,‘ ) AS column_names FROM archive_hqualifier aq "
+ "where aq.table_id = 77 GROUP BY aq.column_name ORDER BY aq.column_hive_index";
sql ="SELECT DISTINCT s.tableName, ar.rowkey, af.column_family, "
+ "aq.column_name FROM archive_htable s,archive_hrowkey ar,archive_hfamily af,"
+ " archive_hqualifier aq "
+ "WHERE s .rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id "
+ "AND af.qualifier_id = aq.qualifier_id;";
String datas = null;
List<String> strList = getStrMap(sql);
String substring = null;
for (int i = 0; i < strList.size(); i++){
datas = strList.get(i);
//datas = strList.get(i).substring(0, strList.get(i).length()-1);
System.out.print(datas);
}
}
}
标签:
原文地址:http://www.cnblogs.com/zhanggl/p/5658517.html