标签:apr 级别 time 配置文件 textinput mil reflect ... 数据去重
1.按数据使用划分:
2.案例:天池大数据竞赛
我们会开放如下数据类型:
字 段 |
字段说明 |
提取说明 |
user_id |
用户标记 |
抽样&字段加密 |
Time |
行为时间 |
精度到天级别&隐藏年份 |
action_type |
用户对品牌的行为类型 |
包括点击、购买、加入购物车、收藏4种行为 |
brand_id |
品牌数字ID |
抽样&字段加密 |
提供的数据量,涉及千万级天猫用户,万级天猫品牌,时间跨度4个月的行为记录。
提供的训练数据在天池集群的表t_alibaba_bigdata_user_brand_total_1中,字段分别为:user_id,brand_id, type, visit_datetime。如图所示
3.用户4种行为类型(Type)对应代码分别为:
点击:0;购买:1;收藏:2;购物车:3
1、 对原数据去重
1 package com.oracle.www.TianChi_compition; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 /* 18 * 对原数据去重,去表头 19 */ 20 public class Step01 { 21 static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { 22 @Override 23 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) 24 throws IOException, InterruptedException { 25 if (key.get() > 0) { 26 context.write(value, NullWritable.get()); 27 } 28 } 29 } 30 31 static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 32 @Override 33 protected void reduce(Text key, Iterable<NullWritable> vlue, 34 Reducer<Text, NullWritable, Text, NullWritable>.Context context) 35 throws IOException, InterruptedException { 36 context.write(key, NullWritable.get()); 37 } 38 } 39 40 public static void main(String[] args) 41 throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { 42 Configuration conf = new Configuration(); 43 Job job = Job.getInstance(conf); 44 job.setJarByClass(Step01.class); 45 46 job.setMapperClass(MyMapper.class); 47 job.setReducerClass(MyReducer.class); 48 49 job.setMapOutputKeyClass(Text.class); 50 job.setMapOutputValueClass(NullWritable.class); 51 52 job.setOutputKeyClass(Text.class); 53 54 Path outPath = new Path("hdfs://192.168.9.13:8020/deweight"); 55 FileSystem fs = outPath.getFileSystem(conf); 56 if (fs.exists(outPath)) { 57 fs.delete(outPath, true); 58 } 59 FileInputFormat.addInputPath(job, new Path("hdfs://192.168.9.13:8020/TianmaoData")); 60 FileOutputFormat.setOutputPath(job, outPath); 61 job.waitForCompletion(true); 62 63 } 64 65 }
2、 获得所有物品之间的同现矩阵
1 package com.oracle.www.TianChi_compition; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 18 /* 19 * 生成同现(显)矩阵 20 * map端<商品1-商品2,1>拆分,发送 21 * reduce端<商品1-商品2,1,1,1...>统计 22 */ 23 public class Step03 { 24 25 static class MyMapper extends Mapper<Text, Text, Text, IntWritable> { 26 Text k = new Text(); 27 IntWritable v = new IntWritable(); 28 29 @Override 30 protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context) 31 throws IOException, InterruptedException { 32 ArrayList<String> itemList = new ArrayList<>(); 33 String line = value.toString(); 34 String[] datas = line.split("\t"); 35 for (String data : datas) {// 将用户购买过的商品添加到list集合中 36 String[] item_mark = data.split(":"); 37 itemList.add(item_mark[0]); 38 } 39 40 for (int i = 0; i < itemList.size(); i++) { 41 for (int j = 0; j < itemList.size(); j++) { 42 k.set(itemList.get(i) + "-" + itemList.get(j)); 43 v.set(1); 44 context.write(k, v); 45 } 46 47 } 48 } 49 } 50 51 static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 52 Text k = new Text(); 53 IntWritable v = new IntWritable(); 54 55 @Override 56 protected void reduce(Text key, Iterable<IntWritable> value, 57 Reducer<Text, IntWritable, Text, IntWritable>.Context context) 58 throws IOException, InterruptedException { 59 int sum = 0; 60 for (IntWritable val : value) { 61 sum += val.get(); 62 } 63 k.set(key.toString()); 64 v.set(sum); 65 context.write(k, v); 66 } 67 } 68 69 public static void main(String[] args) throws ClassNotFoundException, InterruptedException { 70 Configuration conf = new Configuration(); 71 try { 72 Job job = Job.getInstance(conf); 73 74 job.setJarByClass(Step03.class); 75 job.setMapperClass(MyMapper.class); 76 job.setReducerClass(MyReducer.class); 77 78 job.setMapOutputKeyClass(Text.class); 79 job.setMapOutputValueClass(IntWritable.class); 80 81 job.setOutputKeyClass(Text.class); 82 job.setOutputValueClass(IntWritable.class); 83 84 job.setInputFormatClass(KeyValueTextInputFormat.class); 85 86 // 判断output文件夹是否存在,如果存在则删除 87 Path outPath = new Path("hdfs://192.168.9.13:8020/implyCount");// 输出路径 88 FileSystem fs = outPath.getFileSystem(conf);// 根据输出路径找到文件,参数为配置文件 89 if (fs.exists(outPath)) { 90 fs.delete(outPath); 91 // fs.delete(outPath, true);true的意思是,就算output有东西,也一带删除,默认为true 92 93 } 94 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/gradeMarking")); 95 FileOutputFormat.setOutputPath(job, outPath); 96 job.waitForCompletion(true); 97 } catch (IOException e) { 98 // TODO Auto-generated catch block 99 e.printStackTrace(); 100 } 101 } 102 }
3、 权重矩阵(用户对同一件商品的不同行为操作得到的评分矩阵)
1 package com.oracle.www.TianChi_compition; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Iterator; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 /* 11 * 生成评分矩阵 12 * map端拆分,发送<用户 商品+":"+操作> 13 * reduce端统计生成<用户 商品1+":"+评分,商品2+":"+评分,...> 14 */ 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.Reducer; 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 23 public class Step02 { 24 static Text userId = new Text(); 25 static Text shopping_operate = new Text(); 26 27 static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { 28 @Override 29 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) 30 throws IOException, InterruptedException { 31 32 String line = value.toString(); 33 String[] datas = line.split("\t"); 34 userId.set(datas[1]); 35 shopping_operate.set(datas[0] + ":" + datas[2]); 36 context.write(userId, shopping_operate); 37 } 38 } 39 40 static class MyReducer extends Reducer<Text, Text, Text, Text> { 41 Text v = new Text(); 42 double click = 0; 43 double collect = 0; 44 double cart = 0; 45 double alipay = 0; 46 47 @Override 48 protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) 49 throws IOException, InterruptedException { 50 // shoppingOperate_counter<商品,<商品操作,操作次数>> 51 HashMap<String, HashMap<String, Integer>> shoppingOperate_counter = new HashMap<>(); 52 String[] temp_str = null; 53 String shoppingName = null; 54 String shoppingOperate = null; 55 HashMap<String, Integer> operate_counter = null;// 内层map,记录对商品的操作和操作次数 56 for (Text val : value) { 57 temp_str = val.toString().split(":"); 58 shoppingName = temp_str[0]; 59 shoppingOperate = temp_str[1]; 60 if (!shoppingOperate_counter.containsKey(shoppingName)) {// map中不存在此商品信息,添加并给予初始值 61 operate_counter = new HashMap<>(); 62 operate_counter.put(shoppingOperate, 1); 63 shoppingOperate_counter.put(shoppingName, operate_counter); 64 } else {// map中包含此商品 65 operate_counter = shoppingOperate_counter.get(shoppingName); 66 if (!operate_counter.containsKey(shoppingOperate)) {// 包含此商品不包含此操作 67 operate_counter.put(shoppingOperate, 1); 68 } else { 69 operate_counter.put(shoppingOperate, operate_counter.get(shoppingOperate) + 1); 70 } 71 } 72 } 73 // 通过对shoppingOperate_counter循环遍历,统计算分 74 Iterator<String> iter = shoppingOperate_counter.keySet().iterator(); 75 StringBuffer shopping_marking = new StringBuffer(); 76 while (iter.hasNext()) { 77 click = 0; 78 collect = 0; 79 cart = 0; 80 alipay = 0; 81 shoppingName = iter.next(); 82 operate_counter = shoppingOperate_counter.get(shoppingName); 83 Iterator<String> operateIter = operate_counter.keySet().iterator(); 84 int counter = 0;// 记录用户对单个商品操作过的次数 85 while (operateIter.hasNext()) { 86 counter++; 87 shoppingOperate = operateIter.next(); 88 if ("click".equals(shoppingOperate)) { 89 click = operate_counter.get(shoppingOperate); 90 } else if ("collect".equals(shoppingOperate)) { 91 collect = operate_counter.get(shoppingOperate); 92 } else if ("cart".equals(shoppingOperate)) { 93 cart = operate_counter.get(shoppingOperate); 94 } else { 95 alipay = operate_counter.get(shoppingOperate); 96 } 97 } 98 double sum = click / counter * 1.0 + collect / counter * 2.0 + cart / counter * 3.0 99 + alipay / counter * 4.0; 100 shopping_marking.append(shoppingName + ":" + sum + "\t"); 101 } 102 v.set(shopping_marking.toString()); 103 context.write(key, v); 104 } 105 } 106 107 public static void main(String[] args) throws ClassNotFoundException, InterruptedException { 108 Configuration conf = new Configuration(); 109 try { 110 Job job = Job.getInstance(conf); 111 job.setJarByClass(Step02.class); 112 job.setMapperClass(MyMapper.class); 113 job.setReducerClass(MyReducer.class); 114 115 job.setMapOutputKeyClass(Text.class); 116 job.setMapOutputValueClass(Text.class); 117 118 job.setOutputKeyClass(Text.class); 119 job.setOutputKeyClass(Text.class); 120 121 Path outPath = new Path("hdfs://192.168.9.13:8020/deweight"); 122 FileSystem fs = outPath.getFileSystem(conf); 123 if (fs.exists(outPath)) { 124 fs.delete(outPath); 125 } 126 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/deweight")); 127 FileOutputFormat.setOutputPath(job, outPath); 128 job.waitForCompletion(true); 129 } catch (IOException e) { 130 // TODO Auto-generated catch block 131 e.printStackTrace(); 132 } 133 134 } 135 136 }
4、 两个矩阵相乘得到三维矩阵
1 package com.oracle.www.TianChi_compition; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map.Entry; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.DoubleWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 17 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 20 public class Step04 { 21 static class MyMapper extends Mapper<Text, Text, Text, Text> { 22 String parentName = null; 23 Text k = new Text(); 24 Text v = new Text(); 25 26 @Override 27 protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { 28 FileSplit fs = (FileSplit) context.getInputSplit(); 29 parentName = fs.getPath().getParent().getName(); 30 } 31 32 @Override 33 protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) 34 throws IOException, InterruptedException { 35 String line = value.toString(); 36 String[] datas = null; 37 // 判断输入目录 38 if (parentName.equals("gradeMarking")) {// 评分 39 datas = line.split("\t"); 40 for (String data : datas) { 41 String[] item_mark = data.split(":"); 42 k.set(item_mark[0]); 43 v.set(key.toString() + ":" + item_mark[1]); 44 context.write(k, v); 45 } 46 } else { 47 datas = key.toString().split("-"); 48 k.set(datas[1]); 49 v.set(datas[0] + ":" + line); 50 context.write(k, v); 51 } 52 } 53 } 54 55 static class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> { 56 Text k = new Text(); 57 DoubleWritable v = new DoubleWritable(); 58 // <商品x 用户1:评分1,用户2:评分2,...,商品1:频次1,商品2:频次2,...>(频次值为两件商品同时出现的次数) 59 60 @Override 61 protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, DoubleWritable>.Context context) 62 throws IOException, InterruptedException { 63 HashMap<String, Double> user_mark = new HashMap<>(); 64 HashMap<String, Double> item_counter = new HashMap<>(); 65 // 将 用户:评分 , 商品:频次 添加到对应的map中 66 String[] datas = null; 67 for (Text val : value) { 68 datas = val.toString().split(":"); 69 if (datas[0].startsWith("u")) { 70 user_mark.put(datas[0], Double.parseDouble(datas[1])); 71 } else { 72 item_counter.put(datas[0], Double.parseDouble(datas[1])); 73 } 74 } 75 76 // 遍历循环相乘 77 String userName = null; 78 double userMark = 0.0; 79 String itemName = null; 80 double iterCounter = 0; 81 for (Entry<String, Double> entry1 : user_mark.entrySet()) { 82 userName = entry1.getKey(); 83 userMark = entry1.getValue(); 84 for (Entry<String, Double> entry2 : item_counter.entrySet()) { 85 itemName = entry2.getKey(); 86 iterCounter = entry2.getValue(); 87 k.set(userName + ":" + itemName); 88 v.set(userMark * iterCounter); 89 context.write(k, v); 90 } 91 } 92 93 } 94 } 95 96 public static void main(String[] args) throws ClassNotFoundException, InterruptedException { 97 Configuration conf = new Configuration(); 98 try { 99 Job job = Job.getInstance(conf); 100 101 job.setJarByClass(Step03.class); 102 job.setMapperClass(MyMapper.class); 103 job.setReducerClass(MyReducer.class); 104 105 job.setMapOutputKeyClass(Text.class); 106 job.setMapOutputValueClass(Text.class); 107 108 job.setOutputKeyClass(Text.class); 109 job.setOutputValueClass(DoubleWritable.class); 110 111 job.setInputFormatClass(KeyValueTextInputFormat.class); 112 113 // 判断output文件夹是否存在,如果存在则删除 114 Path outPath = new Path("hdfs://192.168.9.13:8020/mark&implyCount_multiply");// 输出路径 115 FileSystem fs = outPath.getFileSystem(conf);// 根据输出路径找到文件,参数为配置文件 116 if (fs.exists(outPath)) { 117 fs.delete(outPath); 118 // fs.delete(outPath, true);true的意思是,就算output有东西,也一带删除,默认为true 119 120 } 121 FileInputFormat.setInputPaths(job, new Path[] { new Path("hdfs://192.168.9.13:8020/gradeMarking"), 122 new Path("hdfs://192.168.9.13:8020/implyCount") }); 123 FileOutputFormat.setOutputPath(job, outPath); 124 job.waitForCompletion(true); 125 } catch (IOException e) { 126 // TODO Auto-generated catch block 127 e.printStackTrace(); 128 } 129 } 130 131 }
5、 三维矩阵的数据相加获得所有用户对所有物品的推荐值(二维矩阵)
1 package com.oracle.www.TianChi_compition; 2 3 /* 4 * 筛选掉用户购买过的商品,并求和 5 */ 6 import java.io.BufferedReader; 7 import java.io.FileReader; 8 import java.io.IOException; 9 import java.net.URI; 10 import java.net.URISyntaxException; 11 import java.util.ArrayList; 12 13 import org.apache.hadoop.conf.Configuration; 14 import org.apache.hadoop.fs.FileSystem; 15 import org.apache.hadoop.fs.Path; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.Reducer; 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 24 public class Step05 { 25 static class MyMapper extends Mapper<Text, Text, Text, Text> { 26 // boughtList集合用于存放哪些用户买过哪些商品,不能使用map集合存放, 27 // 同一个用户可能买过多件商品,同一件商品也有可能同时被好多人买过; 28 ArrayList<String> boughtList = new ArrayList<>(); 29 BufferedReader br = null; 30 31 // setup方法初始化boughtList集合 32 @Override 33 protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { 34 br = new BufferedReader(new FileReader("part-r-00000")); 35 String line = null; 36 String[] datas = null; 37 while ((line = br.readLine()) != null) { 38 datas = line.split("\t"); 39 if ("alipay".equals(datas[2])) { 40 boughtList.add(datas[1] + ":" + datas[0]); 41 } 42 } 43 } 44 45 // map方法排除掉用户购买过的商品,使其不推荐 46 @Override 47 protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) 48 throws IOException, InterruptedException { 49 // 判断向该用户推荐的商品是否被该用户购买过,如果购买过,则不推荐(即不向reduce端发送) 50 if (!boughtList.contains(key.toString())) { 51 context.write(key, value); 52 } 53 } 54 } 55 56 static class MyReducer extends Reducer<Text, Text, Text, Text> { 57 Text k = new Text(); 58 Text v = new Text(); 59 60 @Override 61 protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) 62 throws IOException, InterruptedException { 63 double rank = 0.0; 64 for (Text val : value) { 65 rank += Double.parseDouble(val.toString()); 66 } 67 k.set(key.toString().split(":")[0]); 68 v.set(key.toString().split(":")[1] + ":" + rank); 69 context.write(k, v); 70 } 71 } 72 73 public static void main(String[] args) 74 throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 75 Configuration conf = new Configuration(); 76 Job job = Job.getInstance(conf); 77 job.setJarByClass(Step05.class); 78 job.setMapperClass(MyMapper.class); 79 job.setReducerClass(MyReducer.class); 80 81 job.setMapOutputKeyClass(Text.class); 82 job.setMapOutputValueClass(Text.class); 83 84 job.setOutputKeyClass(Text.class); 85 job.setOutputValueClass(Text.class); 86 87 job.setInputFormatClass(KeyValueTextInputFormat.class); 88 89 job.addCacheFile(new URI("hdfs://192.168.9.13:8020/deweight/part-r-00000")); 90 91 Path outPath = new Path("hdfs://192.168.9.13:8020/shoppingRecommend"); 92 FileSystem fs = outPath.getFileSystem(conf); 93 if (fs.exists(outPath)) { 94 fs.delete(outPath, true); 95 } 96 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/mark&implyCount_multiply")); 97 FileOutputFormat.setOutputPath(job, outPath); 98 99 job.waitForCompletion(true); 100 101 } 102 103 }
6、 按照推荐值降序排序(筛选权重高的前十件商品)。
1 package com.oracle.www.TianChi_compition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.lang.reflect.InvocationTargetException; 7 import java.util.ArrayList; 8 import java.util.Collections; 9 10 import org.apache.commons.beanutils.BeanUtils; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.io.WritableComparable; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 23 /* 24 * 排序,选出向用户推荐的前十个商品 25 */ 26 public class Step06 { 27 // 将取到一行的内容拆分,以 <用户,商品j:权重r>发送到reduce端进行处理 28 static class MyMapper extends Mapper<Text, Text, Text, Sort> { 29 Sort sort = null; 30 31 @Override 32 protected void map(Text key, Text value, Mapper<Text, Text, Text, Sort>.Context context) 33 throws IOException, InterruptedException { 34 35 sort = new Sort(value.toString().split(":")[0], Double.parseDouble(value.toString().split(":")[1])); 36 context.write(key, sort); 37 } 38 } 39 40 // reduce端将同一用户的推荐商品按权值大小排序,将前十个拼接输出 41 static class MyReducer extends Reducer<Text, Sort, Text, Text> { 42 ArrayList<Sort> list = new ArrayList<>(); 43 Text v = new Text(); 44 45 @Override 46 protected void reduce(Text key, Iterable<Sort> value, Reducer<Text, Sort, Text, Text>.Context context) 47 throws IOException, InterruptedException { 48 StringBuffer sb = new StringBuffer(); 49 list.clear(); 50 // map端如果将自定义对象作为value发送到reduce端进行迭代时,需要将迭代器中的每个对象使用BeanUtils.copyProperties(dest,org)将属性拷贝到另外一个对象中; 51 for (Sort sort : value) { 52 Sort tempSort = new Sort(); 53 try { 54 BeanUtils.copyProperties(tempSort, sort); 55 list.add(tempSort); 56 } catch (IllegalAccessException e) { 57 // TODO Auto-generated catch block 58 e.printStackTrace(); 59 } catch (InvocationTargetException e) { 60 // TODO Auto-generated catch block 61 e.printStackTrace(); 62 } 63 } 64 65 Collections.sort(list); 66 for (int i = 0; i < list.size() && i < 10; i++) { 67 sb.append(list.get(i)); 68 } 69 v.set(sb.toString()); 70 context.write(key, v); 71 } 72 } 73 74 static public class Sort implements WritableComparable<Sort> { 75 private String shoppingName; 76 private double shoppingRank; 77 78 public Sort() { 79 } 80 81 public Sort(String shoppingName, double shoppingRank) { 82 this.shoppingName = shoppingName; 83 this.shoppingRank = shoppingRank; 84 } 85 86 public String getShoppingName() { 87 return shoppingName; 88 } 89 90 public void setShoppingName(String shoppingName) { 91 this.shoppingName = shoppingName; 92 } 93 94 public double getShoppingRank() { 95 return shoppingRank; 96 } 97 98 public void setShoppingRank(double shoppingRank) { 99 this.shoppingRank = shoppingRank; 100 } 101 102 @Override 103 public String toString() { 104 return shoppingName + ":" + shoppingRank + "\t"; 105 } 106 107 @Override 108 public void write(DataOutput out) throws IOException { 109 out.writeDouble(shoppingRank); 110 out.writeUTF(shoppingName); 111 } 112 113 @Override 114 public void readFields(DataInput in) throws IOException { 115 this.shoppingRank = in.readDouble(); 116 this.shoppingName = in.readUTF(); 117 } 118 119 @Override 120 public int compareTo(Sort o) { 121 int temp = 0; 122 if (this.getShoppingRank() - o.getShoppingRank() < 0) { 123 return 1; 124 } else if (this.getShoppingRank() - o.getShoppingRank() > 0) { 125 return -1; 126 } 127 return temp; 128 } 129 } 130 131 public static void main(String[] args) throws ClassNotFoundException, InterruptedException { 132 Configuration conf = new Configuration(); 133 try { 134 Job job = Job.getInstance(); 135 136 job.setJarByClass(Step06.class); 137 job.setMapperClass(MyMapper.class); 138 job.setReducerClass(MyReducer.class); 139 140 job.setMapOutputKeyClass(Text.class); 141 job.setMapOutputValueClass(Sort.class); 142 143 job.setOutputKeyClass(Text.class); 144 job.setOutputValueClass(Text.class); 145 146 job.setInputFormatClass(KeyValueTextInputFormat.class); 147 148 Path outPath = new Path("hdfs://192.168.9.13:8020/ShoppingRecommend_Sort"); 149 FileSystem fs = outPath.getFileSystem(conf); 150 if (fs.exists(outPath)) { 151 fs.delete(outPath); 152 } 153 154 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/shoppingRecommend")); 155 FileOutputFormat.setOutputPath(job, outPath); 156 157 job.waitForCompletion(true); 158 } catch (IOException e) { 159 // TODO Auto-generated catch block 160 e.printStackTrace(); 161 } 162 163 } 164 165 }
标签:apr 级别 time 配置文件 textinput mil reflect ... 数据去重
原文地址:http://www.cnblogs.com/le-ping/p/7783925.html