码迷,mamicode.com
首页 > 数据库 > 详细

DBInputFormat的使用

时间:2015-08-12 16:40:44      阅读:297      评论:0      收藏:0      [点我收藏+]

标签:

  1 package InputFormat;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 import java.sql.PreparedStatement;
  8 import java.sql.ResultSet;
  9 import java.sql.SQLException;
 10 
 11 
 12 import org.apache.hadoop.conf.Configuration;
 13 import org.apache.hadoop.fs.FileSystem;
 14 import org.apache.hadoop.fs.Path;
 15 import org.apache.hadoop.io.LongWritable;
 16 import org.apache.hadoop.io.NullWritable;
 17 import org.apache.hadoop.io.Text;
 18 import org.apache.hadoop.io.Writable;
 19 import org.apache.hadoop.mapreduce.Job;
 20 import org.apache.hadoop.mapreduce.Mapper;
 21 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
 22 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
 23 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 24 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 25 
 26 /**
 27  * 将Mysql的驱动包放在TaskTracker的lib下 重启集群
 28  * @author Administrator
 29  *
 30  */
 31 
 32 public class DBInputFormatApp {
 33     public static void main(String[] args) throws Exception {
 34         
 35         Configuration conf = new Configuration();
 36         
 37         DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop:3306/test", "root", "123456");
 38         
 39         FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop:9000"), conf);
 40         boolean delete = fileSystem.delete(new Path("hdfs://hadoop:9000/out"), true);
 41         
 42         if (delete) {
 43             System.out.println("==============ooooooooookkkkkkkkkkkkkkkkkkkkkkks==================");
 44             
 45         }
 46         
 47         
 48         Job job = new Job(conf, DBInputFormatApp.class.getSimpleName());
 49         job.setJarByClass(DBInputFormatApp.class);
 50 
 51         job.setInputFormatClass(DBInputFormat.class);
 52         DBInputFormat.setInput(job, MyUser.class, "myuser", null, null, "id","name");
 53         job.setMapperClass(MyDBMapper.class);
 54         job.setMapOutputKeyClass(Text.class);
 55         job.setOutputValueClass(NullWritable.class);
 56         
 57         job.setNumReduceTasks(0);
 58         job.setOutputKeyClass(Text.class);
 59         job.setOutputValueClass(NullWritable.class);
 60         
 61         FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/out"));
 62         
 63         job.waitForCompletion(true);
 64         
 65         
 66     }
 67     
 68     public static class MyDBMapper extends Mapper<LongWritable, MyUser, Text, NullWritable>{
 69         @Override
 70         protected void map(LongWritable key, MyUser value,
 71                 org.apache.hadoop.mapreduce.Mapper<LongWritable, MyUser, Text, NullWritable>.Context context)
 72                 throws IOException, InterruptedException {
 73             
 74             context.write(new Text(value.toString()), NullWritable.get());
 75             
 76         }
 77         
 78     }
 79 
 80     
 81     public static class MyUser implements Writable,DBWritable{
 82         
 83         int id;
 84         String name;
 85 
 86         @Override
 87         public void write(PreparedStatement statement) throws SQLException {
 88             statement.setInt(1, id);
 89             statement.setString(2, name);
 90             
 91         }
 92 
 93         @Override
 94         public void readFields(ResultSet resultSet) throws SQLException {
 95             this.id=resultSet.getInt(1);
 96             this.name=resultSet.getString(2);
 97             
 98         }
 99 
100         @Override
101         public void write(DataOutput out) throws IOException {
102             out.writeInt(id);
103             Text.writeString(out, name);
104             
105         }
106 
107         @Override
108         public void readFields(DataInput in) throws IOException {
109             this.id=in.readInt();
110             this.name=Text.readString(in);
111             
112         }
113 
114         @Override
115         public String toString() {
116             
117             return  id + ", \t" + name ;
118         }
119         
120         
121         
122     }
123     
124 }    

 

DBInputFormat的使用

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/4724395.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!