标签:reducer 去掉 stat input 定义 this integer 计算 on()
好友推荐的案例, 需要两个job, 第一个进行好友关系度计算, 第二个job将计算的关系进行推荐
1, fof关系类
package com.wenbronk.friend; import org.apache.hadoop.io.Text; /** * 定义fof关系 * @author root * */ public class Fof extends Text{ public Fof() { super(); } /**‘ * 不论谁在前,返回一致的顺序 * @param a * @param b */ public Fof(String a, String b) { super(getFof(a, b)); } /** * 按字典顺序排序, 保证两个fof为同一组输出 * @param a * @param b * @return */ public static String getFof(String a, String b) { int r = a.compareTo(b); if (r < 0) { return a + "\t" + b; }else { return b + "\t" + a; } } }
2, user类
package com.wenbronk.friend; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class User implements WritableComparable<User>{ private String uname; private int friedsCount; public String getUname() { return uname; } public void setUname(String uname) { this.uname = uname; } public int getFriedsCount() { return friedsCount; } public void setFriedsCount(int friedsCount) { this.friedsCount = friedsCount; } public User() { super(); } public User(String uname, int friedsCount) { super(); this.uname = uname; this.friedsCount = friedsCount; } @Override public void readFields(DataInput arg0) throws IOException { this.uname = arg0.readUTF(); this.friedsCount = arg0.readInt(); } @Override public void write(DataOutput arg0) throws IOException { arg0.writeUTF(uname); arg0.writeInt(friedsCount); } @Override public int compareTo(User o) { int result = this.uname.compareTo(o.getUname()); if (result == 0) { return Integer.compare(this.friedsCount, o.getFriedsCount()); } return result; } }
3, sort
package com.wenbronk.friend; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 排序 * @author root * */ public class FofSort extends WritableComparator { public FofSort() { super(User.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { User user1 = (User) a; User user2 = (User) b; int compareTo = user1.getUname().compareTo(user2.getUname()); if (compareTo == 0) { compareTo = Integer.compare(user1.getFriedsCount(), user2.getFriedsCount()); } return compareTo; } }
4, group
package com.wenbronk.friend; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定义分组 * @author root * */ public class FofGroup extends WritableComparator { public FofGroup() { super(User.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { User u1 = (User) a; User u2 = (User) b; return u1.getUname().compareTo(u2.getUname()); } }
5, job
package com.wenbronk.friend; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 1个mapreduce找到所有的fof关系 第二个mapreduce执行排序 * * @author root */ public class RunJob { public static void main(String[] args) throws IOException { Configuration configuration = new Configuration(); // configuration.set("mapred.jar", "C:/Users/wenbr/Desktop/fof.jar"); // 本地运行 configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020 "); configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106"); if (runFindFof(configuration)) { // 根据foffind进行排序 run2(configuration); } } /** * 找到所有的fof关系 * @throws IOException */ private static boolean runFindFof(Configuration conf) throws IOException { try { FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJobName("friend"); job.setJarByClass(RunJob.class); job.setMapperClass(FofMapper.class); job.setReducerClass(FofReduce.class); job.setMapOutputKeyClass(Fof.class); job.setMapOutputValueClass(IntWritable.class); // job.setJar("C:/Users/wenbr/Desktop/friend.jar"); job.setInputFormatClass(KeyValueTextInputFormat.class); // FileInputFormat.addInputPath(job, new Path("/usr/friend.txt")); FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\friend.txt")); Path path = new Path("/root/usr/fof/f1"); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job, path); return job.waitForCompletion(true); }catch(Exception e) { e.printStackTrace(); } return false; } static class FofMapper extends Mapper<Text, Text, Fof, IntWritable> { @Override protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context) throws IOException, InterruptedException { // super.map(key, value, context); String user = key.toString(); String[] frieds = StringUtils.split(value.toString(), ‘\t‘); for (int i = 0; i < frieds.length; i++) { String f1 = frieds[i]; // 去掉是直接好友的, 按组输出, 如果组中有value=0 的, 整组数据舍弃 context.write(new Fof(user, f1), new IntWritable(0)); for (int j = i + 1; j < frieds.length; j++) { String f2 = frieds[j]; Fof fof = new Fof(f1, f2); context.write(fof, new IntWritable(1)); } } } } static class FofReduce extends Reducer<Fof, IntWritable, Fof, IntWritable> { @Override protected void reduce(Fof arg0, Iterable<IntWritable> arg1, Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2) throws IOException, InterruptedException { boolean flag = false; int sum = 0; for (IntWritable count : arg1) { // 值有0的, 整组数据舍弃 if (count.get() == 0) { flag = true; break; } else { sum += count.get(); } } if (!flag) { arg2.write(arg0, new IntWritable(sum)); } } } /** * 向用户推荐好友 * @param config */ public static void run2(Configuration config) { try { FileSystem fileSystem = FileSystem.get(config); Job job = Job.getInstance(config); job.setJobName("fof2"); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReduce.class); job.setSortComparatorClass(FofSort.class); job.setGroupingComparatorClass(FofGroup.class); job.setMapOutputKeyClass(User.class); job.setMapOutputValueClass(User.class); job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置MR执行的输入文件 FileInputFormat.addInputPath(job, new Path("/usr/output/f1")); // 设置输出文件, 文件不可存在 Path path = new Path("/root/usr/fof/f2"); if (fileSystem.exists(path)) { fileSystem.delete(path, true); } FileOutputFormat.setOutputPath(job, path); boolean f = job.waitForCompletion(true); if (f) { System.out.println("job, 成功执行"); } }catch (Exception e) { e.printStackTrace(); } } static class SortMapper extends Mapper<Text, Text, User, User> { @Override protected void map(Text key, Text value, Mapper<Text, Text, User, User>.Context context) throws IOException, InterruptedException { String[] args = StringUtils.split(value.toString(), ‘\t‘); String other = args[0]; int friendsCount = Integer.parseInt(args[1]); // 输出两次, 同时给fof两个用户推荐好友 context.write(new User(key.toString(), friendsCount), new User(other, friendsCount)); context.write(new User(other, friendsCount), new User(key.toString(), friendsCount)); } } static class SortReduce extends Reducer<User, User, Text, Text>{ @Override protected void reduce(User arg0, Iterable<User> arg1, Reducer<User, User, Text, Text>.Context arg2) throws IOException, InterruptedException { String uname = arg0.getUname(); StringBuilder stringBuilder = new StringBuilder(); for (User user : arg1) { stringBuilder.append(user.getUname() + ": " + user.getFriedsCount()); stringBuilder.append(", "); } arg2.write(new Text(uname), new Text(stringBuilder.toString())); } } }
初始文档
小明 老王 如花 林志玲
老王 小明 凤姐
如花 小明 李刚 凤姐
林志玲 小明 李刚 凤姐 郭美美
李刚 如花 凤姐 林志玲
郭美美 凤姐 林志玲
凤姐 如花 老王 林志玲 郭美美
系列来自尚学堂视频
标签:reducer 去掉 stat input 定义 this integer 计算 on()
原文地址:http://www.cnblogs.com/wenbronk/p/7308716.html