标签:exce 个人 结果 exception 迭代 system reducer int asList
首先我们来看看需求,以下是某博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(好友关系是单向的):
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
public class FriendsMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// A:B,C,D,F,E,O
String line = value.toString();
// 切割数据 变成: key:A value:B,C,D,F,E,O
String[] fields = line.split(":");
// 封装k,v 写出数据
if (ArrayUtils.isNotEmpty(fields)) {
k.set(fields[0]);
v.set(fields[1]);
}
context.write(k, v);
}
}
public class FriendsReducer extends Reducer<Text, Text, Text, NullWritable> {
private final Map<String, String> oldMap = new HashMap<>();
private final Text k = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 将key与value封装成map
for (Text value : values) {
oldMap.put(key.toString(), value.toString());
}
}
/**
* cleanup方法发生在所有reduce执行完之后,故这时候的oldMap已经填充了所有人的数据
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// key:A value:B,C,D,F,E,O
// 使用一个list存储所有人的相同好友
List<String> sameFriends = new ArrayList<>();
// 获取所有用户集合的key
Set<String> keySet = oldMap.keySet();
// 开始迭代
for (String keyOne:keySet) {
for (String keyTwo:keySet) {
// 获取第一个人的好友
List<String> firstCommonList = getCommonList(keyOne);
// 获取第二个人的好友
List<String> secondCommonList = getCommonList(keyTwo);
// 求二者的交集,存储到firstCommonList
firstCommonList.retainAll(secondCommonList);
// 拼接要输出的字符串
StringBuilder sb = new StringBuilder();
for (String s : firstCommonList) {
sb.append(s).append(" ");
}
// 筛选掉没有共同好友的人,即sb没有拼接,为空串
if (!StringUtils.equals("",sb.toString())) {
sameFriends.add(keyOne + "和" + keyTwo + "的共同好友为:" + sb.toString());
}
}
}
// 拼接完毕,输出共同好友
for (String sameFriend : sameFriends) {
k.set(sameFriend);
context.write(k, NullWritable.get());
}
}
private List<String> getCommonList(String key) {
String friend = oldMap.get(key);
String[] common = friend.split(",");
List<String> commonListTmp = Arrays.asList(common);
return new ArrayList<>(commonListTmp);
}
}
public class FriendDriver {
public static void main(String[] args) throws Exception{
args = new String[]{"f:/hadoop/hadoopinput/friends.txt", "f:/hadoop/friendsouput"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FriendDriver.class);
job.setMapperClass(FriendsMapper.class);
job.setReducerClass(FriendsReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.out.println(result ? 1 : 0);
}
}
首先我们考虑用hadoop中的MapReduce技术解决问题,MapReduce主要分为两个阶段:map阶段和reduce阶段。map阶段负责聚集数据,reduce阶段负责计算数据。
我们在map阶段,先把数据按照冒号进行切割成如下格式:
key:A value:B,C,D,F,E,O
将其封装成reduce阶段的key和value,传入到reduce阶段。
在reduce阶段,我们把所有的key和value存储到一个Map键值对结构中,方便接下来的操作。
在Reduce的最后结束的cleanup阶段,我们已经有了一个以用户的ID(名字)为key,用户拥有的好友为value的Map,我们对所有人的ID进行一个循环,依次比对每个ID拥有的好友,比如第一个人拥有的好友为集合A,第二个人拥有的好友为集合B,我们对集合A和B取交集,就能获得共同好友了。之后我们对要输出的结果进行一顿拼接,拼接出要输出的结果。最后将其写出即可。
最后输出的结果为:
A和A的共同好友为:B C D F E O
A和B的共同好友为:C E
A和C的共同好友为:D F
A和D的共同好友为:F E
A和E的共同好友为:B C D
A和F的共同好友为:B C D E O
A和G的共同好友为:C D F E
A和H的共同好友为:C D E O
A和I的共同好友为:O
A和J的共同好友为:B O
A和K的共同好友为:C D
A和L的共同好友为:D F E
A和M的共同好友为:F E
B和A的共同好友为:C E
B和B的共同好友为:A C E K
B和C的共同好友为:A
B和D的共同好友为:A E
B和E的共同好友为:C
B和F的共同好友为:A C E
B和G的共同好友为:A C E
B和H的共同好友为:A C E
B和I的共同好友为:A
B和K的共同好友为:A C
B和L的共同好友为:E
B和M的共同好友为:E
B和O的共同好友为:A
C和A的共同好友为:F D
C和B的共同好友为:A
C和C的共同好友为:F A D I
C和D的共同好友为:F A
C和E的共同好友为:D
C和F的共同好友为:A D
C和G的共同好友为:F A D
C和H的共同好友为:A D
C和I的共同好友为:A
C和K的共同好友为:A D
C和L的共同好友为:F D
C和M的共同好友为:F
C和O的共同好友为:A I
D和A的共同好友为:E F
D和B的共同好友为:A E
D和C的共同好友为:A F
D和D的共同好友为:A E F L
D和E的共同好友为:L
D和F的共同好友为:A E
D和G的共同好友为:A E F
D和H的共同好友为:A E
D和I的共同好友为:A
D和K的共同好友为:A
D和L的共同好友为:E F
D和M的共同好友为:E F
D和O的共同好友为:A
E和A的共同好友为:B C D
E和B的共同好友为:C
E和C的共同好友为:D
E和D的共同好友为:L
E和E的共同好友为:B C D M L
E和F的共同好友为:B C D M
E和G的共同好友为:C D
E和H的共同好友为:C D
E和J的共同好友为:B
E和K的共同好友为:C D
E和L的共同好友为:D
F和A的共同好友为:B C D E O
F和B的共同好友为:A C E
F和C的共同好友为:A D
F和D的共同好友为:A E
F和E的共同好友为:B C D M
F和F的共同好友为:A B C D E O M
F和G的共同好友为:A C D E
F和H的共同好友为:A C D E O
F和I的共同好友为:A O
F和J的共同好友为:B O
F和K的共同好友为:A C D
F和L的共同好友为:D E
F和M的共同好友为:E
F和O的共同好友为:A
G和A的共同好友为:C D E F
G和B的共同好友为:A C E
G和C的共同好友为:A D F
G和D的共同好友为:A E F
G和E的共同好友为:C D
G和F的共同好友为:A C D E
G和G的共同好友为:A C D E F
G和H的共同好友为:A C D E
G和I的共同好友为:A
G和K的共同好友为:A C D
G和L的共同好友为:D E F
G和M的共同好友为:E F
G和O的共同好友为:A
H和A的共同好友为:C D E O
H和B的共同好友为:A C E
H和C的共同好友为:A D
H和D的共同好友为:A E
H和E的共同好友为:C D
H和F的共同好友为:A C D E O
H和G的共同好友为:A C D E
H和H的共同好友为:A C D E O
H和I的共同好友为:A O
H和J的共同好友为:O
H和K的共同好友为:A C D
H和L的共同好友为:D E
H和M的共同好友为:E
H和O的共同好友为:A
I和A的共同好友为:O
I和B的共同好友为:A
I和C的共同好友为:A
I和D的共同好友为:A
I和F的共同好友为:A O
I和G的共同好友为:A
I和H的共同好友为:A O
I和I的共同好友为:A O
I和J的共同好友为:O
I和K的共同好友为:A
I和O的共同好友为:A
J和A的共同好友为:B O
J和E的共同好友为:B
J和F的共同好友为:B O
J和H的共同好友为:O
J和I的共同好友为:O
J和J的共同好友为:B O
K和A的共同好友为:C D
K和B的共同好友为:A C
K和C的共同好友为:A D
K和D的共同好友为:A
K和E的共同好友为:C D
K和F的共同好友为:A C D
K和G的共同好友为:A C D
K和H的共同好友为:A C D
K和I的共同好友为:A
K和K的共同好友为:A C D
K和L的共同好友为:D
K和O的共同好友为:A
L和A的共同好友为:D E F
L和B的共同好友为:E
L和C的共同好友为:D F
L和D的共同好友为:E F
L和E的共同好友为:D
L和F的共同好友为:D E
L和G的共同好友为:D E F
L和H的共同好友为:D E
L和K的共同好友为:D
L和L的共同好友为:D E F
L和M的共同好友为:E F
M和A的共同好友为:E F
M和B的共同好友为:E
M和C的共同好友为:F
M和D的共同好友为:E F
M和F的共同好友为:E
M和G的共同好友为:E F
M和H的共同好友为:E
M和L的共同好友为:E F
M和M的共同好友为:E F G
O和B的共同好友为:A
O和C的共同好友为:A I
O和D的共同好友为:A
O和F的共同好友为:A
O和G的共同好友为:A
O和H的共同好友为:A
O和I的共同好友为:A
O和K的共同好友为:A
O和O的共同好友为:A H I J
可能我解释的不是很清楚,代码里都有相应的注释,这块确实有点难理解,也不好解释,如果需要的话就多看几遍多理解吧。
标签:exce 个人 结果 exception 迭代 system reducer int asList
原文地址:https://www.cnblogs.com/wushenjiang/p/13583293.html