标签:hive mysql udf genericudf
前面我分别介绍了两种Hive将分析结果导入到MySQL表的两种方式:Sqoop导入方式和使用Hive、MySQL JDBC驱动,现在我介绍第三种,也是使用比较多的方式——使用Hive 自定义函数(UDF或GenericUDF)将每条记录插入到数据库表中。
一、使用UDF方式
使用UDF方式实现比较简单,只要继承UDF类,并重写evaluate方法即可
1、编写实现类
package com.gxnzx.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; import com.gxnzx.hive.util.DBSqlHelper; public class AnalyzeStatistics extends UDF{ public String evaluate(String clxxbh,String hphm){ //jtxx2数据库为目标数据库表 String sql="insert into jtxx2 values(?,?)"; //往数据库中插入记录 if(DBSqlHelper.addBatch(sql, clxxbh, hphm)){ return clxxbh+" SUCCESS "+hphm; }else{ return clxxbh+" faile "+hphm; } } }2、数据库操作方法
public static boolean addBatch(String sql,String clxxbh,String hphm){ boolean flag=false; try{ conn=DBSqlHelper.getConn(); //打开一个数据库连接 ps=(PreparedStatement) conn.prepareStatement(sql); ps.setString(1, clxxbh); ps.setString(2, hphm); System.out.println(ps.toString()); ps.execute(); flag=true; }catch(Exception e){ e.printStackTrace(); }finally{ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } return flag; }3、使用eclipse将该项目包打成jar包导入到hive类环境中
hive> add jar hiveudf2.jar4、将MySQL JDBC驱动包导入hive 类环境中
hive> add jar /home/hadoopUser/cloud/hive/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.18-bin.jar5、创建hive 临时函数
hive> create temporary function analyze as 'com.gxnzx.hive.udf.AnalyzeStatistics';6、测试
hive> select analyze(clxxbh,hphm) from transjtxx_hbase limit 10; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1428394594787_0034, Tracking URL = http://secondmgt:8088/proxy/application_1428394594787_0034/ Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job -kill job_1428394594787_0034 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2015-04-23 10:15:34,355 Stage-1 map = 0%, reduce = 0% 2015-04-23 10:15:51,032 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 7.14 sec MapReduce Total cumulative CPU time: 7 seconds 140 msec Ended Job = job_1428394594787_0034 MapReduce Jobs Launched: Job 0: Map: 1 Cumulative CPU: 7.14 sec HDFS Read: 256 HDFS Write: 532 SUCCESS Total MapReduce CPU Time Spent: 7 seconds 140 msec OK 32100017000000000220140317000015 SUCCESS 鲁Q58182 32100017000000000220140317000016 SUCCESS 鲁QV4662 32100017000000000220140317000019 SUCCESS 苏LL8128 32100017000000000220140317000020 SUCCESS 苏CAH367 32100017000000000220140317000023 SUCCESS 鲁Q7899W 32100017000000000220140317000029 SUCCESS 苏HN3819 32100017000000000220140317000038 SUCCESS 鲁C01576 32100017000000000220140317000044 SUCCESS 苏DT9178 32100017000000000220140317000049 SUCCESS 苏LZ1112 32100017000000000220140317000052 SUCCESS 苏K9795警 Time taken: 35.815 seconds, Fetched: 10 row(s)7、查看MySQL表中数据
mysql> select * from jtxx2; +----------------------------------+-------------+ | cllxbh | hphm | +----------------------------------+-------------+ | 32100017000000000220140317000015 | 鲁Q58182 | | 32100017000000000220140317000016 | 鲁QV4662 | | 32100017000000000220140317000019 | 苏LL8128 | | 32100017000000000220140317000020 | 苏CAH367 | | 32100017000000000220140317000023 | 鲁Q7899W | | 32100017000000000220140317000029 | 苏HN3819 | | 32100017000000000220140317000038 | 鲁C01576 | | 32100017000000000220140317000044 | 苏DT9178 | | 32100017000000000220140317000049 | 苏LZ1112 | | 32100017000000000220140317000052 | 苏K9795警 | +----------------------------------+-------------+ 10 rows in set (0.00 sec)二、使用GenericUDF方式
使用GenericUDF方式,实现比较复杂,我参考了别人的代码,如下:
1、编写调用函数
package com.gxnzx.hive.main; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.IntWritable; /** * AnalyzeGenericUDFDBOutput is designed to output data directly from Hive to a * JDBC datastore. This UDF is useful for exporting small to medium summaries * that have a unique key. * * Due to the nature of hadoop, individual mappers, reducers or entire jobs can * fail. If a failure occurs a mapper or reducer may be retried. This UDF has no * way of detecting failures or rolling back a transaction. Consequently, you * should only only use this to export to a table with a unique key. The unique * key should safeguard against duplicate data. * * To use this UDF ,you should follow below three steps First of all, you need * to packag the UDF into the jar file; Secondly, you should use hive add jar * feature to add the UDF jar file to current class path; Thirdly, you should * use hive add jar feature to add JDBC Driver jar file to current class path; * Fourthly, you should use hive create temporary function feature to create an * temporary function belong to the UDF class. * * Examples for MySQL: hive> add jar udf.jar hive> add jar * mysql-connector-java-5.1.18-bin.jar hive> create temporary function * analyzedboutput as 'com.gxnzx.hive.main.AnalyzeGenericUDFDBOutput' */ @Description(name = "analyzedboutput", value = "_FUNC_(jdbctring,username,password,preparedstatement,[arguments])" + " - sends data to a jdbc driver", extended = "argument 0 is the JDBC connection string\n" + "argument 1 is the database user name\n" + "argument 2 is the database user's password\n" + "argument 3 is an SQL query to be used in the PreparedStatement\n" + "argument (4-n) The remaining arguments must be primitive and are " + "passed to the PreparedStatement object\n") @UDFType(deterministic = false) public class AnalyzeGenericUDFDBOutput extends GenericUDF { private static final Log LOG = LogFactory .getLog(AnalyzeGenericUDFDBOutput.class.getName()); private transient ObjectInspector[] argumentOI; private transient Connection connection = null; private String url; private String user; private String pass; private final IntWritable result = new IntWritable(-1); /** * @param arguments * argument 0 is the JDBC connection string argument 1 is the * user name argument 2 is the password argument 3 is an SQL * query to be used in the PreparedStatement argument (4-n) The * remaining arguments must be primitive and are passed to the * PreparedStatement object */ @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentTypeException { argumentOI = arguments; // this should be connection // url,username,password,query,column1[,columnn]* for (int i = 0; i < 4; i++) { if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) { PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]); if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) { throw new UDFArgumentTypeException(i, "The argument of function should be \"" + Constants.STRING_TYPE_NAME + "\", but \"" + arguments[i].getTypeName() + "\" is found"); } } } for (int i = 4; i < arguments.length; i++) { if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(i, "The argument of function should be primative" + ", but \"" + arguments[i].getTypeName() + "\" is found"); } } return PrimitiveObjectInspectorFactory.writableIntObjectInspector; } /** * @return 0 on success -1 on failure */ @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { url = ((StringObjectInspector) argumentOI[0]) .getPrimitiveJavaObject(arguments[0].get()); user = ((StringObjectInspector) argumentOI[1]) .getPrimitiveJavaObject(arguments[1].get()); pass = ((StringObjectInspector) argumentOI[2]) .getPrimitiveJavaObject(arguments[2].get()); try { connection = DriverManager.getConnection(url, user, pass); } catch (SQLException ex) { LOG.error("Driver loading or connection issue", ex); result.set(2); } if (connection != null) { try { PreparedStatement ps = connection .prepareStatement(((StringObjectInspector) argumentOI[3]) .getPrimitiveJavaObject(arguments[3].get())); for (int i = 4; i < arguments.length; ++i) { PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]); ps.setObject(i - 3, poi.getPrimitiveJavaObject(arguments[i].get())); } ps.execute(); ps.close(); result.set(0); } catch (SQLException e) { LOG.error("Underlying SQL exception", e); result.set(1); } finally { try { connection.close(); } catch (Exception ex) { LOG.error("Underlying SQL exception during close", ex); } } } return result; } @Override public String getDisplayString(String[] children) { StringBuilder sb = new StringBuilder(); sb.append("dboutput("); if (children.length > 0) { sb.append(children[0]); for (int i = 1; i < children.length; i++) { sb.append(","); sb.append(children[i]); } } sb.append(")"); return sb.toString(); } }2、将程序打成jar包,导入到Hive class path下
hive> add jar hiveGenericUdf.jar;3、添加mysql JDBC驱动 JAR文件
hive> add jar /home/hadoopUser/cloud/hive/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.18-bin.jar4、创建临时函数
hive> create temporary function analyzedboutput as 'com.gxnzx.hive.main.AnalyzeGenericUDFDBOutput';5、测试
hive> select analyzedboutput('jdbc:mysql://192.168.2.133:3306/transport','hive','hive','insert into jtxx2 values(?,?)',clxxbh,hphm) from transjtxx_hbase limit 5; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1428394594787_0043, Tracking URL = http://secondmgt:8088/proxy/application_1428394594787_0043/ Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job -kill job_1428394594787_0043 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2015-04-23 22:01:46,205 Stage-1 map = 0%, reduce = 0% 2015-04-23 22:02:01,985 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 9.37 sec MapReduce Total cumulative CPU time: 9 seconds 370 msec Ended Job = job_1428394594787_0043 MapReduce Jobs Launched: Job 0: Map: 1 Cumulative CPU: 9.37 sec HDFS Read: 256 HDFS Write: 10 SUCCESS Total MapReduce CPU Time Spent: 9 seconds 370 msec OK 0 0 0 0 0 Time taken: 32.118 seconds, Fetched: 5 row(s)analyzedboutput六个参数分别表示:MySQL JDBC连接字符串、MYSQL数据用户名、密码、SQL插入语句、Hive表中对应的clxxbh,hphm两个查询字段。
6、查看MySQL数据库表数据
mysql> select * from jtxx2; Empty set (0.00 sec) mysql> select * from jtxx2; +----------------------------------+-----------+ | cllxbh | hphm | +----------------------------------+-----------+ | 32100017000000000220140317000015 | 鲁Q58182 | | 32100017000000000220140317000016 | 鲁QV4662 | | 32100017000000000220140317000019 | 苏LL8128 | | 32100017000000000220140317000020 | 苏CAH367 | | 32100017000000000220140317000023 | 鲁Q7899W | +----------------------------------+-----------+ 5 rows in set (0.00 sec)//此处结束
将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF
标签:hive mysql udf genericudf
原文地址:http://blog.csdn.net/niityzu/article/details/45227409