标签:hadoop java mapreduce 数据库 自定义输出
转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40372189
hadoop源代码中的WordCount事例中实现了单词统计,但是输出到HDFS文件,线上程序想使用其计算结果还要再次写个程序,所以自己就研究一下关于MapReduce的输出问题,下面就通过一个简单的例子说明下如何将MapReduce的计算结果输出到数据库中。
需求描述:
分析网络服务器上的Apache日志,统计每个IP访问资源的次数,并将结果写入到mysql数据库中。
数据格式:
Apache日志数据如下图所示:
一行数据就是一条http请求记录,该事例只做简单的IP个数统计。
需求分析:
通过MapReduce对日志文件采用分布式计算,map主要对日志做简单的拆分计数,reduce对map的结果求和。
map程序对一行日志数据做简单的拆分,获取客户端IP,输出结果为 key为客户端IP,value为IP出现次数。结果样例如下图所示:
reduce程序对Key值下的values做求和计算,输出结果为 key为客户端IP,value为IP出现次数。结果样例如下图所示:
上面的MapReduce程序和WordCount程序类似,只是对IP做了简单的求和计算,下面就需要写reduce的输出格式,使计算结果写入到mysql数据库中。
MapReduce支持用户自定义的输出格式,定义的类只需要继承FileOutputFormat即可。实现如下图所示:
自定义输出需要实现getRecordWriter方法,这里通过内部类的方式,实现了自定义的RecordWriter,在MysqlRecordWriter类中实现相关的输出即可完成将reduce结果数据写入到数据库中,具体实现如下图所示:
在MapReduce程序中,在关于job的设置,只需要将输出格式指定为该输出格式即可完成将reduce的结果写入到数据库中。
job.setOutputFormatClass(MysqlOutputFormat.class);
代码实现:
日志一行记录分析类TextLine,该类实现了从日志记录中提取IP信息以及IP次数(一行数据就是1次),代码如下:
/** * 日志行分析 * @author lulei */ package com; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class TextLine { private String ip; private IntWritable one = new IntWritable(1); //标识数据是否为可用 private boolean right = true; public TextLine(String textLine){ //检验一行日志数据是否符合要求,如不符合,将其标识为不可用 if (textLine == null || "".equals(textLine)) { this.right = false; return; } String []strs = textLine.split(" "); if (strs.length < 2) { this.right = false; return; } this.ip = strs[0]; } public boolean isRight() { return right; } /** * 返回map的输出key值 * @return */ public Text getIPCountMapOutKey() { return new Text(this.ip); } /** * 返回map的输出value值 * @return */ public IntWritable getIPCountMapOutValue() { return this.one; } }
/** * 各IP出现次数统计 */ package com; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class IPCountMR extends Configured implements Tool{ /** * ip个数统计map * @author lulei */ public static class IPCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { TextLine textLine = new TextLine(value.toString()); if (textLine.isRight()) { context.write(textLine.getIPCountMapOutKey(), textLine.getIPCountMapOutValue()); } } } /** * ip个数统计reduce * @author lulei */ public static class IPCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } @SuppressWarnings("deprecation") @Override public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("ipcount"); job.setInputFormatClass(TextInputFormat.class); //将输出设置为MysqlOutputFormat job.setOutputFormatClass(MysqlOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(IPCountMap.class); job.setCombinerClass(IPCountReduce.class); job.setReducerClass(IPCountReduce.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); //个人认为下面应该可以不设置的,但是不设置就会报错,不知道是什么地方出了问题 MysqlOutputFormat.setOutputPath(job, new Path(arg0[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub try { int res = ToolRunner.run(new Configuration(), new IPCountMR(), args); System.exit(res); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package com; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @SuppressWarnings("hiding") public class MysqlOutputFormat<Text, IntWritable> extends FileOutputFormat<Text, IntWritable> { //Mysql RecordWriter私有类 private static class MysqlRecordWriter<Text, IntWritable> extends RecordWriter<Text, IntWritable> { private LogDB logdb; /** * 使用外部传进来的LogDB对象 * @param logdb */ MysqlRecordWriter(LogDB logdb){ this.logdb = logdb; } @Override public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { // TODO Auto-generated method stub } /** * 将key-value写入到数据库中 */ @Override public void write(Text key, IntWritable value) throws IOException, InterruptedException { // TODO Auto-generated method stub logdb.insert(key.toString(), value.toString()); } } @Override public RecordWriter<Text, IntWritable> getRecordWriter( TaskAttemptContext arg0) throws IOException, InterruptedException { // TODO Auto-generated method stub //返回MysqlRecordWriter对象 return new MysqlRecordWriter<Text, IntWritable>(new LogDB()); } }
package com; import java.sql.SQLException; import com.lulei.db.manager.DBServer; public class LogDB { //新建连接池 private DBServer dBServer = new DBServer("proxool.log"); /** * 将数据插入至数据库 * @param ip * @param num */ public void insert(String ip, String num) { try { dBServer.insert("insert into logmp(ip, num) values ('" + ip +"','" + num +"')"); } catch (SQLException e) { } finally { dBServer.close(); } } public static void main(String[] args) { // TODO Auto-generated method stub new LogDB().insert("127.0.0.2", "1"); } }上面程序中的DBServer类是基于连接池proxool-0.9.1.jar封装的数据库操作类,这里就不做详细的介绍,这里也可以不通过连接池,直接将数据写入到数据库中,这不是本事例的重点,就不做详细的介绍。
执行结果:
程序执行结束后,通过命令查看相应的数据表记录,计算结果已经正确写入到数据库中,如下图所示:
上面用的日志文件的客户端都是从内网访问的,所以记录中都是内网地址。
注:资源 http://download.csdn.net/detail/xiaojimanman/6920219 中有相关的数据库连接池代码
标签:hadoop java mapreduce 数据库 自定义输出
原文地址:http://blog.csdn.net/xiaojimanman/article/details/40372189